ELT made easier!
Over the past few decades, data engineering has undergone a remarkable evolution, transforming from traditional ETL processes to modern ELT approaches. This evolution has been driven by the need for more efficient data integration processes, as well as advancements in data storage and processing technologies. In this article, we will explore some challenges we faced and how we used some key features of our Modern Data Stack to ease our operational ELT processes.
From ETL…
ETL stands for Extract, Transform, and Load, which refers to the process of extracting data from various sources, transforming it to fit specific requirements, and loading it into a target data storage system. One of the main challenges data engineers faced when implementing ETL processes was the long development cycles using custom code, which was time-consuming. Additionally, changes to data sources or business requirements would often require significant modifications to the ETL pipelines, which further prolonged development cycles.
Another challenge was the difficulty in maintaining ETL pipelines. As data sources and business requirements changed over time, ETL pipelines needed to be updated accordingly. However, changes to ETL pipelines could introduce errors or cause unexpected behavior, making it challenging to maintain their integrity and reliability.
To ELT!
The rise of big data played a significant role in the evolution of data engineering. As data volumes increased exponentially, traditional ETL processes were no longer sufficient for handling the scale and complexity of big data. New technologies such as Snowflake or Spark emerged, enabling data engineers to process and analyze large datasets in a distributed and parallelized manner.
To address new challenges, data engineers began adopting a new approach to data integration called ELT, which stands for Extract, Load, and Transform. In ELT, data is extracted from various sources and loaded into a target data storage system. The transformation is then applied to the data in the target system, rather than during the ETL process. This approach reduces the complexity and development cycles of data integration pipelines, making them easier to build and maintain.
What we used at Jolimoi
In our case we rely on what we call a Modern Data Stack. Apply to our own ELT processes is it what our stack look like:
- Airbyte to extract raw data. Airbyte proposes a large out-of-box library of connectors to extract data and moved it where you want
- In our case, this raw data will be loaded on s3 without any further transformation or changes (except using compressed Parquet). We try to stick to “vanilla” Airbyte the most we can. Here, Airbyte can spot for us any non-breaking changes on the data schema (for example an API or a database) and can easily be fired and scheduled by using Airflow
- Once this data resides on s3, and before using dbt to transform them, we used Databricks auto-loader.
And so, what is Databricks auto-loader? It's a feature which helps us to automatically infer data schema and load data with the minimum amount of work possible into our lakehouse.
In order to help us reduce the maximum amount of operational work, we have also adapted the dbt-codegen to generate dbt sources for us and really simplify our EL(T) framework to the minimum set of tools.
How does schema inference work in Databricks Autoloader?
To explain more in detail what Databricks Autoloader is and how we leverage it, let’s talk about a common issue. To make it simple, Autoloader is an optimized cloud file source for Apache Spark that loads data continuously and efficiently from cloud storage as new data arrives. It also stores Metadata about what has been read.
Sometimes you have control over your data source. Most of the time you don’t and the schema’s data source can change. The issue we often face can happen after an update of a data source. So now, Imagine you have data injected on an s3 on a daily basis with such columns.
Here you can set up the Databricks Autoloader to get data from your S3 bucket and insert it in a Delta Lake table. Alright, you work with it, everything looks fine for a few weeks but suddenly, you need to add an email address column.
So now your data looks like :
Note: Airbyte has a schema inference system, but it won’t be covered in this article.
But first, let’s look at how to set up the Autloader. Configuring it isn’t hard, the main challenge here is to know your data source well and configure it accordingly.
Here is an Autoloader configuration example in Python adapted with Airbyte sink (here output are in JSON but it works in many other format):
(spark.readStream
.format("cloudFiles") # Autoloader use the cloudFiles format
.option("cloudFiles.format", "json") # Data source is in json
.option("cloudFiles.schemaLocation", target_table_path) # Path where to write the data
.option("cloudFiles.inferColumnTypes", True) # Very important to add this line, otherwise _airbyte_data field is considered as a string and not a JSON
.option("cloudFiles.schemaEvolutionMode","addNewColumns") # We talk about it right after
.load(source_table_path) # Data source path
.select("_airbyte_data.*", "year", "month", "day", input_file_name().alias("source_file")) # Flatten data from airbyte JSON into columns (including the hive folders structure)
.writeStream
.option("checkpointLocation", target_table_path)
.option("mergeSchema", True) # Whenever a schema change is detected, we merge it
.partitionBy("year", "month", "day") # In order to query the data faster, we partition our data by year/month/day
.trigger(availableNow=True) # This line trigger it only once (not continuously)
.toTable("bronze." + schema + "." + table) # We specify the table we want to insert the data
)The default behavior of the Databricks Autoloader is to fail once it detects a schema update, but it can be changed with the following configuration.
So if you come back to our earlier example with the new email address, we need to setup Autloader with addNewColumns or failOnNewColumns. Changes will fail first, then don’t forget to relaunch the Autoloader in order to make it work (this is the normal behavior).
And that’s it, after that you can, like us, set up a trigger in Databricks to run the Autoloader, let’s say once a day. In our case, it made our life easier by automating the data ingestion.
How to bind Autoloader inferred Delta tables and dbt with the least minimum workload ?
Once you have your data loaded inside your lakehouse, you are able to start playing with dbt and doing your transformation to answer business needs.
But like any sources, dbt needs some manual work prior declaration. But like what we do with Autloader and Airbyte, we really want to simplify our Extract-Load framework and for doing this, we adapt dbt-codegen dbt macro to make it work with Databricks.
And just a little bit of context. We start looking at this mainly in regard to the ingestion of a database with more than 500 tables. Nobody ever wanted to write every source table by hand on dbt, so we had to find a way to import it in bulk. Also we are using Unity Catalog on Databricks and want a solution to comply with it.
Hence, we looked for https://github.com/dbt-labs/dbt-codegen, which is a package with macros to import every table from a database to a dbt .yml definition file. Since it doesn’t support (yet) Unity Catalog at this time, the macro can’t list the table from a Databricks Catalog.
So instead of using the whole package, we picked up the importing macro and tweaked it to make it work for us (the macro uses the dbt-utils package).
To do it, we set the dispatch option in our dbt_project.yml as follows.
dispatch:
- macro_namespace: dbt_utils
search_order: [‘our_project’, 'dbt_utils']-- get_tables_by_pattern.sql
{% macro default__get_tables_by_pattern_sql(schema_pattern, table_pattern, exclude='', database=target.database) %}
show tables in {{ schema_pattern }}
{% endmacro %}Using this configuration, the engine will start by looking in our own macros before looking for dbt-utils one.
Then we created 2 additional macros in order to get tables from Databricks.
-- get_relations_by_pattern.sql
{% macro default__get_relations_by_pattern(schema_pattern, table_pattern, exclude='', database=target.database) %}
{%- call statement('get_tables', fetch_result=True) %}
{{ dbt_utils.get_tables_by_pattern_sql(schema_pattern, table_pattern, exclude, database) }}
{%- endcall -%}
{%- set table_list = load_result('get_tables') -%}
{%- if table_list and table_list['table'] -%}
{%- set tbl_relations = [] -%}
{%- for row in table_list['table'] -%}
{%- set tbl_relation = api.Relation.create(
schema=row.database,
identifier=row.tableName
) -%}
{%- do tbl_relations.append(tbl_relation) -%}
{%- endfor -%}
{{ return(tbl_relations) }}
{%- else -%}
{{ return([]) }}
{%- endif -%}
{% endmacro %}Then, we rewrote generate_source.sql macro to add additional tests and make it fit our needs (such as customizing the dbt .yml output).
In order to finish the process, we need to split it into multiple files (one file per table) and use yq (like jq but for yaml) to rename filename the way we want.
sed -i "0,/---/d" all_tables.yaml
yq -s '.table_name' --no-doc all_tables.yaml && sed -i '2d' *.ymlThe final commands look like this:
dbt run-operation generate_source --args '{"schema_name": "schema_name", "catalog_name": "catalog_name", "include_data_types": "true", "generate_columns": "true", "include_descriptions": "true", "include_unit_test": "true"}' > all_tables.yaml && sed -i "0,/---/d" all_tables.yaml && yq -s '.table_name' --no-doc all_tables.yaml && sed -i '2d' *.ymlAnd voilà ! All our dbt table definitions are properly set up!
A final note
In conclusion, imagine you have unstructured data on s3 pushed by Airbyte. Here we use autoloader to infer schema automatically (and manage the breaking changes), then we modify codegen to handle it and automatically create the sources on dbt to make ELT easier.
This framework let us reduce time needed to ingest needed data sources, reduce inherent maintenance here and there definitely made our life easier.