Skip to main content

Transformations with dbt (Part 2/3)

Not availableCloud AvailableSelf-Managed Community (OSS)AvailableSelf-Managed Enterprise
warning

Normalization and Custom Transformation are deprecated features. Destinations using Normalization will be replaced by Typing and Deduping. Custom Transformation will be removed on March 31. For more information, visit here.

This tutorial will describe how to integrate SQL based transformations with Airbyte syncs using specialized transformation tool: dbt.

This tutorial is the second part of the previous tutorial Transformations with SQL. Next, we'll wrap-up with a third part on submitting transformations back in Airbyte: Transformations with Airbyte.

(Example outputs are updated with Airbyte version 0.23.0-alpha from May 2021)

Transformations with dbt

The tool in charge of transformation behind the scenes is actually called dbt (Data Build Tool).

Before generating the SQL files as we've seen in the previous tutorial, Airbyte sets up a dbt Docker instance and automatically generates a dbt project for us. This is created as specified in the dbt project documentation page with the right credentials for the target destination. The dbt models are then run afterward, thanks to the dbt CLI. However, for now, let's run through working with the dbt tool.

Validate dbt project settings

Let's say we identified our workspace (as shown in the previous tutorial Transformations with SQL), and we have a workspace ID of:

NORMALIZE_WORKSPACE="5/0/"

We can verify that the dbt project is properly configured for that workspace:

#!/usr/bin/env bash
docker run --rm -i -v airbyte_workspace:/data -w /data/$NORMALIZE_WORKSPACE/normalize --network host --entrypoint /usr/local/bin/dbt airbyte/normalization debug --profiles-dir=. --project-dir=.

Example Output:

Running with dbt=0.19.1
dbt version: 0.19.1
python version: 3.8.8
python path: /usr/local/bin/python
os info: Linux-5.10.25-linuxkit-x86_64-with-glibc2.2.5
Using profiles.yml file at ./profiles.yml
Using dbt_project.yml file at /data/5/0/normalize/dbt_project.yml

Configuration:
profiles.yml file [OK found and valid]
dbt_project.yml file [OK found and valid]

Required dependencies:
- git [OK found]

Connection:
host: localhost
port: 3000
user: postgres
database: postgres
schema: quarantine
search_path: None
keepalives_idle: 0
sslmode: None
Connection test: OK connection ok

Compile and build dbt normalization models

If the previous command does not show any errors or discrepancies, it is now possible to invoke the CLI from within the docker image to trigger transformation processing:

#!/usr/bin/env bash
docker run --rm -i -v airbyte_workspace:/data -w /data/$NORMALIZE_WORKSPACE/normalize --network host --entrypoint /usr/local/bin/dbt airbyte/normalization run --profiles-dir=. --project-dir=.

Example Output:

Running with dbt=0.19.1
Found 4 models, 0 tests, 0 snapshots, 0 analyses, 364 macros, 0 operations, 0 seed files, 1 source, 0 exposures

Concurrency: 32 threads (target='prod')

1 of 1 START table model quarantine.covid_epidemiology....................................................... [RUN]
1 of 1 OK created table model quarantine.covid_epidemiology.................................................. [SELECT 35822 in 0.47s]

Finished running 1 table model in 0.74s.

Completed successfully

Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

Exporting dbt normalization project outside Airbyte

As seen in the tutorial on exploring workspace folder, it is possible to browse the normalize folder and examine further logs if an error occurs.

In particular, we can also take a look at the dbt models generated by Airbyte and export them to the local host filesystem:

#!/usr/bin/env bash

TUTORIAL_DIR="$(pwd)/tutorial/"
rm -rf $TUTORIAL_DIR/normalization-files
mkdir -p $TUTORIAL_DIR/normalization-files

docker cp airbyte-server:/tmp/workspace/$NORMALIZE_WORKSPACE/normalize/ $TUTORIAL_DIR/normalization-files

NORMALIZE_DIR=$TUTORIAL_DIR/normalization-files/normalize
cd $NORMALIZE_DIR
cat $NORMALIZE_DIR/models/generated/**/*.sql

Example Output:

{{ config(alias="covid_epidemiology_ab1", schema="_airbyte_quarantine", tags=["top-level-intermediate"]) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
{{ json_extract_scalar('_airbyte_data', ['key']) }} as {{ adapter.quote('key') }},
{{ json_extract_scalar('_airbyte_data', ['date']) }} as {{ adapter.quote('date') }},
{{ json_extract_scalar('_airbyte_data', ['new_tested']) }} as new_tested,
{{ json_extract_scalar('_airbyte_data', ['new_deceased']) }} as new_deceased,
{{ json_extract_scalar('_airbyte_data', ['total_tested']) }} as total_tested,
{{ json_extract_scalar('_airbyte_data', ['new_confirmed']) }} as new_confirmed,
{{ json_extract_scalar('_airbyte_data', ['new_recovered']) }} as new_recovered,
{{ json_extract_scalar('_airbyte_data', ['total_deceased']) }} as total_deceased,
{{ json_extract_scalar('_airbyte_data', ['total_confirmed']) }} as total_confirmed,
{{ json_extract_scalar('_airbyte_data', ['total_recovered']) }} as total_recovered,
_airbyte_emitted_at
from {{ source('quarantine', '_airbyte_raw_covid_epidemiology') }}
-- covid_epidemiology

{{ config(alias="covid_epidemiology_ab2", schema="_airbyte_quarantine", tags=["top-level-intermediate"]) }}
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
cast({{ adapter.quote('key') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('key') }},
cast({{ adapter.quote('date') }} as {{ dbt_utils.type_string() }}) as {{ adapter.quote('date') }},
cast(new_tested as {{ dbt_utils.type_float() }}) as new_tested,
cast(new_deceased as {{ dbt_utils.type_float() }}) as new_deceased,
cast(total_tested as {{ dbt_utils.type_float() }}) as total_tested,
cast(new_confirmed as {{ dbt_utils.type_float() }}) as new_confirmed,
cast(new_recovered as {{ dbt_utils.type_float() }}) as new_recovered,
cast(total_deceased as {{ dbt_utils.type_float() }}) as total_deceased,
cast(total_confirmed as {{ dbt_utils.type_float() }}) as total_confirmed,
cast(total_recovered as {{ dbt_utils.type_float() }}) as total_recovered,
_airbyte_emitted_at
from {{ ref('covid_epidemiology_ab1_558') }}
-- covid_epidemiology

{{ config(alias="covid_epidemiology_ab3", schema="_airbyte_quarantine", tags=["top-level-intermediate"]) }}
-- SQL model to build a hash column based on the values of this record
select
*,
{{ dbt_utils.surrogate_key([
adapter.quote('key'),
adapter.quote('date'),
'new_tested',
'new_deceased',
'total_tested',
'new_confirmed',
'new_recovered',
'total_deceased',
'total_confirmed',
'total_recovered',
]) }} as _airbyte_covid_epidemiology_hashid
from {{ ref('covid_epidemiology_ab2_558') }}
-- covid_epidemiology

{{ config(alias="covid_epidemiology", schema="quarantine", tags=["top-level"]) }}
-- Final base SQL model
select
{{ adapter.quote('key') }},
{{ adapter.quote('date') }},
new_tested,
new_deceased,
total_tested,
new_confirmed,
new_recovered,
total_deceased,
total_confirmed,
total_recovered,
_airbyte_emitted_at,
_airbyte_covid_epidemiology_hashid
from {{ ref('covid_epidemiology_ab3_558') }}
-- covid_epidemiology from {{ source('quarantine', '_airbyte_raw_covid_epidemiology') }}

If you have dbt installed locally on your machine, you can then view, edit, version, customize, and run the dbt models in your project outside Airbyte syncs.

#!/usr/bin/env bash 

dbt deps --profiles-dir=$NORMALIZE_DIR --project-dir=$NORMALIZE_DIR
dbt run --profiles-dir=$NORMALIZE_DIR --project-dir=$NORMALIZE_DIR --full-refresh

Example Output:

Running with dbt=0.19.1
Installing https://github.com/fishtown-analytics/dbt-utils.git@0.6.4
Installed from revision 0.6.4

Running with dbt=0.19.1
Found 4 models, 0 tests, 0 snapshots, 0 analyses, 364 macros, 0 operations, 0 seed files, 1 source, 0 exposures

Concurrency: 32 threads (target='prod')

1 of 1 START table model quarantine.covid_epidemiology....................................................... [RUN]
1 of 1 OK created table model quarantine.covid_epidemiology.................................................. [SELECT 35822 in 0.44s]

Finished running 1 table model in 0.63s.

Completed successfully

Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

Now, that you've exported the generated normalization models, you can edit and tweak them as necessary.

If you want to know how to push your modifications back to Airbyte and use your updated dbt project during Airbyte syncs, you can continue with the following tutorial on importing transformations into Airbyte...