Defining dependencies with asset factories
When building asset factories with components, you often need to define dependencies between the generated assets. For example, you may have ETL tables where some tables depend on others.
This guide shows how to:
- Define dependencies between component-generated assets
- Create regular assets that depend on component-generated assets
This guide assumes familiarity with asset factories.
Defining inter-dependencies between component assets
To define dependencies between assets generated by a component, include a deps field in your component's Model. This allows users to specify dependencies in the YAML configuration.
First, define the Model with a deps field:
class EtlTable(dg.Model):
name: str
deps: list[str] = []
query: str
Then, use the deps field when creating assets in the component:
class EtlTableFactory(dg.Component, dg.Model, dg.Resolvable):
etl_tables: list[EtlTable]
def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
assets = []
for table in self.etl_tables:
def create_etl_asset(table_config: EtlTable):
@dg.asset(name=table_config.name, deps=table_config.deps)
def etl_table(snowflake: SnowflakeResource):
with snowflake.get_connection() as conn:
conn.cursor.execute(table_config.query)
return etl_table
assets.append(create_etl_asset(table))
return dg.Definitions(assets=assets)
Now you can define assets with dependencies in YAML. In this example, risky_transactions depends on both cleaned_transactions and risky_customers:
type: my_project.components.etl_table_component.EtlTableFactory
attributes:
etl_tables:
- name: cleaned_transactions
deps:
- transactions
query: |
create or replace table cleaned_transactions as (
SELECT * FROM transactions WHERE amount IS NOT NULL
)
- name: risky_customers
deps:
- customers
query: |
create or replace table risky_customers as (
SELECT * FROM customers WHERE risk_score > 0.8
)
- name: risky_transactions
deps:
- cleaned_transactions
- risky_customers
query: |
create or replace table risky_transactions as (
SELECT *
FROM cleaned_transactions JOIN risky_customers
ON cleaned_transactions.customer_id = risky_customers.customer_id
)
Defining regular assets downstream of component assets
You can create regular assets that depend on component-generated assets by referencing their asset keys:
import dagster as dg
@dg.asset(
deps=[
"cleaned_transactions",
"risky_customers",
"risky_transactions",
]
)
def aggregated_metrics():
"""An asset that depends on all ETL tables generated by the component."""
...