Components ETL pipeline tutorial
Setup
1. Install project dependencies
To complete this tutorial, you must install uv
and dg
.
First, install duckdb
for a local database and tree
to visualize project structure:
- Mac
- Windows
- Linux
tree
is optional and is only used to produce a nicely formatted representation of the project structure on the comand line. You can also use find
, ls
, dir
, or any other directory listing command.
2. Scaffold a new project
After installing dependencies, scaffold a components-ready project:
dg scaffold project jaffle-platform
Creating a Dagster project at /.../jaffle-platform.
Scaffolded files for Dagster project at /.../jaffle-platform.
...
The dg scaffold project
command builds a project at jaffle-platform
and initializes a new Python
virtual environment inside it. When you use dg
's default environment management behavior, you won't need to worry about activating this virtual environment yourself.
To learn more about the files, directories, and default settings in a project scaffolded with dg scaffold project
, see "Creating a project with components".
Ingest data
1. Add the Sling component type to your environment
To ingest data, you must set up Sling. However, if you list the available component types in your environment at this point, the Sling component won't appear, since the basic dagster-components
package that was installed when you scaffolded your project doesn't include components for specific integrations (like Sling):
dg list component-type
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Component Type ┃ Summary ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ definitions@dagster_components │ Wraps an arbitrary set of │
│ │ Dagster definitions. │
│ pipes_subprocess_script_collection@dagster_components │ Assets that wrap Python │
│ │ scripts executed with │
│ │ Dagster's │
│ │ PipesSubprocessClient. │
└───────────────────────────────────────────────────────┴────────────────────────────────┘
To make the Sling component available in your environment, install the sling
extra of dagster-components
:
uv add 'dagster-components[sling]'
dg
always operates in an isolated environment, but it is able to access the set of component types available in your project environment because it attempts to resolve a project root whenever it is run. If dg
finds a pyproject.toml
file with a tool.dg.is_project = true
setting, then it will expect a uv
-managed virtual environment to be present in the same directory. (This can be confirmed by the presence of a uv.lock
file.)
When you run commands like dg list component-type
, dg
obtains the results by identifying the in-scope project environment and querying it. In this case, the project environment was set up as part of the dg scaffold project
command.
2. Confirm availability of the Sling component type
To confirm that the dagster_components.sling_replication
component type is now available, run the dg list component-type
command again:
dg list component-type
Using /.../jaffle-platform/.venv/bin/dagster-components
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Component Type ┃ Summary ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ definitions@dagster_components │ Wraps an arbitrary set of │
│ │ Dagster definitions. │
│ pipes_subprocess_script_collection@dagster_components │ Assets that wrap Python │
│ │ scripts executed with │
│ │ Dagster's │
│ │ PipesSubprocessClient. │
│ sling_replication_collection@dagster_components │ Expose one or more Sling │
│ │ replications to Dagster as │
│ │ assets. │
└───────────────────────────────────────────────────────┴────────────────────────────────┘
3. Create a new instance of the Sling component
Next, create a new instance of this component type:
dg scaffold component 'sling_replication_collection@dagster_components' ingest_files
Creating a Dagster component instance folder at /.../jaffle-platform/jaffle_platform/components/ingest_files.
Using /.../jaffle-platform/.venv/bin/dagster-components
This adds a component instance to the project at jaffle_platform/components/ingest_files
:
tree jaffle_platform
jaffle_platform
├── __init__.py
├── components
│ └── ingest_files
│ └── component.yaml
├── definitions.py
└── lib
└── __init__.py
4 directories, 4 files
A single file, component.yaml
, was created in the component folder. The component.yaml
file is common to all Dagster components, and specifies the component type and any parameters used to scaffold definitions from the component at runtime.
type: sling_replication_collection@dagster_components
attributes:
replications:
- path: replication.yaml
Right now the parameters define a single "replication"-- this is a Sling concept that specifies how data should be replicated from a source to a target. The details are specified in a replication.yaml
file that is read by Sling. This file does not yet exist-- we are going to create it shortly.
The path
parameter for a replication is relative to the same folder containing component.yaml. This is a convention for components.
4. Set up DuckDB
Set up and test DuckDB:
uv run sling conns set DUCKDB type=duckdb instance=/tmp/jaffle_platform.duckdb
9:00AM INF connection `DUCKDB` has been set in /.../.sling/env.yaml. Please test with `sling conns test DUCKDB`
uv run sling conns test DUCKDB
9:00AM INF success!
5. Download files for Sling source
Next, you will need to download some files locally to use your Sling source, since Sling doesn't support reading from the public internet:
curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_customers.csv &&
curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_orders.csv &&
curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_payments.csv
Finally, create a replication.yaml
file that references the downloaded files:
source: LOCAL
target: DUCKDB
defaults:
mode: full-refresh
object: "{stream_table}"
streams:
file://raw_customers.csv:
object: "main.raw_customers"
file://raw_orders.csv:
object: "main.raw_orders"
file://raw_payments.csv:
object: "main.raw_payments"
6. View and materialize assets in the Dagster UI
Load your project in the Dagster UI to see what you've built so far. To materialize assets and load tables in the DuckDB instance, click Materialize All:
dg dev
Verify the DuckDB tables on the command line:
duckdb /tmp/jaffle_platform.duckdb -c "SELECT * FROM raw_customers LIMIT 5;"
┌───────┬────────────┬───────────┬──────────────────┐
│ id │ first_name │ last_name │ _sling_loaded_at │
│ int32 │ varchar │ varchar │ int64 │
├───────┼────────────┼───────────┼──────────────────┤
│ 1 │ Michael │ P. │ ... |
│ 2 │ Shawn │ M. │ ... |
│ 3 │ Kathleen │ P. │ ... |
│ 4 │ Jimmy │ C. │ ... |
│ 5 │ Katherine │ R. │ ... |
└───────┴────────────┴───────────┴──────────────────┘
Transform data
To transform the data, you will need to download a sample dbt project from GitHub and use the data ingested with Sling as an input for the dbt project.
1. Clone a sample dbt project from GitHub
First, clone the project and delete the embedded git repo:
git clone --depth=1 https://github.com/dagster-io/jaffle-platform.git dbt && rm -rf dbt/.git
2. Install the dbt project component type
To interface with the dbt project, you will need to instantiate a Dagster dbt project component. To access the dbt project component type, install dagster-components[dbt]
and dbt-duckdb
:
uv add 'dagster-components[dbt]' dbt-duckdb
To confirm that the dagster_components.dbt_project
component type is now available, run dg list component-type
:
dg list component-type
Using /.../jaffle-platform/.venv/bin/dagster-components
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Component Type ┃ Summary ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ dbt_project@dagster_components │ Expose a DBT project to │
│ │ Dagster as a set of assets. │
│ definitions@dagster_components │ Wraps an arbitrary set of │
│ │ Dagster definitions. │
│ pipes_subprocess_script_collection@dagster_components │ Assets that wrap Python │
│ │ scripts executed with │
│ │ Dagster's │
│ │ PipesSubprocessClient. │
│ sling_replication_collection@dagster_components │ Expose one or more Sling │
│ │ replications to Dagster as │
│ │ assets. │
└───────────────────────────────────────────────────────┴────────────────────────────────┘
3. Scaffold a new instance of the dbt project component
Next, scaffold a new instance of the dagster_components.dbt_project
component, providing the path to the dbt project you cloned earlier as the project_path
scaffold parameter:
dg scaffold component dbt_project@dagster_components jdbt --project-path dbt/jdbt
Creating a Dagster component instance folder at /.../jaffle-platform/jaffle_platform/components/jdbt.
Using /.../jaffle-platform/.venv/bin/dagster-components
This creates a new component instance in the project at jaffle_platform/components/jdbt
. To see the component configuration, open component.yaml
in that directory:
type: dbt_project@dagster_components
attributes:
dbt:
project_dir: ../../../dbt/jdbt
4. Update the dbt project component configuration
Let’s see the project in the Dagster UI:
dg dev
You can see that there appear to be two copies of the raw_customers
, raw_orders
, and raw_payments
tables. If you click on the assets, you can see their full asset keys. The keys generated by the dbt component are of the form main/*
, whereas the keys generated by the Sling component are of the form target/main/*
.
We need to update the configuration of the dagster_components.dbt_project
component to match the keys generated by the Sling component. Update components/jdbt/component.yaml
with the configuration below:
type: dagster_components.dbt_project
attributes:
dbt:
project_dir: ../../../dbt/jdbt
asset_attributes:
key: "target/main/{{ node.name }}
You might notice the typo in the above file--after updating a component file, it's useful to validate that the changes match the component's schema. You can do this by running dg check yaml
:
dg check yaml
/.../jaffle-platform/jaffle_platform/components/jdbt/component.yaml:7 - Unable to parse YAML: while scanning a quoted scalar, found unexpected end of stream
|
6 | asset_attributes:
7 | key: "target/main/{{ node.name }}
| ^ Unable to parse YAML: while scanning a quoted scalar, found unexpected end of stream
|
You can see that the error message includes the filename, line number, and a code snippet showing the exact nature of the error. Let's fix the typo:
type: dbt_project@dagster_components
attributes:
dbt:
project_dir: ../../../dbt/jdbt
asset_attributes:
key: "target/main/{{ node.name }}"
Finally, run dg check yaml
again to validate the fix:
dg check yaml
All components validated successfully.
Reload the project in Dagster UI to verify that the keys load properly:
Now the keys generated by the Sling and dbt project components match, and the asset graph is correct. To materialize the new assets defined via the dbt project component, click Materialize All.
To verify the fix, you can view a sample of the newly materialized assets in DuckDB from the command line:
duckdb /tmp/jaffle_platform.duckdb -c "SELECT * FROM orders LIMIT 5;"
┌──────────┬─────────────┬────────────┬───┬───────────────┬──────────────────────┬──────────────────┬────────┐
│ order_id │ customer_id │ order_date │ … │ coupon_amount │ bank_transfer_amount │ gift_card_amount │ amount │
│ int32 │ int32 │ date │ │ double │ double │ double │ double │
├──────────┼─────────────┼────────────┼───┼───────────────┼──────────────────────┼──────────────────┼────────┤
│ 1 │ 1 │ 2018-01-01 │ … │ 0.0 │ 0.0 │ 0.0 │ 10.0 │
│ 2 │ 3 │ 2018-01-02 │ … │ 0.0 │ 0.0 │ 0.0 │ 20.0 │
│ 3 │ 94 │ 2018-01-04 │ … │ 1.0 │ 0.0 │ 0.0 │ 1.0 │
│ 4 │ 50 │ 2018-01-05 │ … │ 25.0 │ 0.0 │ 0.0 │ 25.0 │
│ 5 │ 64 │ 2018-01-05 │ … │ 0.0 │ 17.0 │ 0.0 │ 17.0 │
├──────────┴─────────────┴────────────┴───┴───────────────┴──────────────────────┴───────────── ─────┴────────┤
│ 5 rows 9 columns (7 shown) │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Automate the pipeline
1. Automate Sling ingestion
Now that you've defined some assets, you can automate them to keep them up to date by using declarative automation directly in the component.yaml
file.
Navigate to components/ingest_files/component.yaml
and add the automation_condition
below to automatically pull in data with Sling every day:
type: dagster_components.sling_replication_collection
attributes:
replications:
- path: replication.yaml
asset_attributes:
- target: "*"
attributes:
automation_condition: "{{ automation_condition.on_cron('@daily') }}"
metadata:
automation_condition: "on_cron(@daily)"
2. Automate dbt transformation
Next, update the dbt project so it executes after the Sling replication runs. Navigate to components/jdbt/component.yaml
and add the automation_condition
below:
type: dagster_components.dbt_project
attributes:
dbt:
project_dir: ../../../dbt/jdbt
asset_attributes:
key: "target/main/{{ node.name }}"
transforms:
- target: "*"
attributes:
automation_condition: "{{ automation_condition.eager() }}"
metadata:
automation_condition: "eager"
Next steps
To continue your journey with components, you can add more components to your project or learn how to manage multiple projects with components.