Basic Airflow migration
This guide covers manually rewriting Airflow DAGs as native Dagster assets. This approach works well for a small number of DAGs and produces first-class Dagster assets with materialization history and staleness tracking.
For larger migrations, the dagster-airflow library can convert Airflow operators directly to Dagster assets, letting you run your existing DAG code inside Dagster with minimal changes. The tradeoff: you get Dagster run history and scheduling, but assets produced this way may lack full native features like fine-grained materialization history and staleness tracking compared to a manual rewrite.
- Before
- After
An Airflow DAG with three tasks and a daily schedule:
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@task
def fetch_data(source: str = "api") -> dict:
return {"source": source, "data": [1, 2, 3, 4, 5]}
@task
def process_data(data: dict) -> dict:
return {"source": data["source"], "processed": [x * 2 for x in data["data"]]}
@task
def save_results(processed: dict) -> str:
return f"Saved {len(processed['processed'])} items from {processed['source']}"
@dag(
dag_id="simple_sequential_pipeline",
start_date=datetime(2024, 1, 1),
schedule="0 9 * * *",
catchup=False,
tags=["example", "sequential"],
default_args={"retries": 3, "retry_delay": timedelta(minutes=5)},
)
def simple_sequential_pipeline():
data = fetch_data()
processed = process_data(data)
save_results(processed)
dag_instance = simple_sequential_pipeline()
Three Dagster assets with a schedule:
"""Migration of an Airflow DAG to Dagster assets.
Before (Airflow simple_sequential_pipeline):
from airflow.decorators import dag, task
@task
def fetch_data(source: str = "api") -> dict: ...
@task
def process_data(data: dict) -> dict: ...
@task
def save_results(processed: dict) -> str: ...
@dag(schedule="0 9 * * *", tags=["example", "sequential"])
def simple_sequential_pipeline():
data = fetch_data()
processed = process_data(data)
save_results(processed)
Each Airflow @task becomes a @dg.asset. Dependencies are expressed via the
`deps=` argument instead of XCom return values. The DAG schedule becomes a
@dg.schedule wrapping a define_asset_job.
"""
import dagster as dg
@dg.asset(group_name="airflow_migration", tags={"example": "", "sequential": ""})
def fetch_data(context: dg.AssetExecutionContext):
"""Fetch data from a source. Mirrors Airflow @task fetch_data."""
source = "api"
context.log.info(f"Fetching data from {source}")
return {"source": source, "data": [1, 2, 3, 4, 5]}
@dg.asset(
deps=[fetch_data],
group_name="airflow_migration",
tags={"example": "", "sequential": ""},
)
def process_data(context: dg.AssetExecutionContext):
"""Process fetched data. Mirrors Airflow @task process_data."""
context.log.info("Processing data from fetch_data")
return {"source": "api", "processed": [x * 2 for x in [1, 2, 3, 4, 5]]}
@dg.asset(
deps=[process_data],
group_name="airflow_migration",
tags={"example": "", "sequential": ""},
retry_policy=dg.RetryPolicy(max_retries=3, delay=300),
)
def save_results(context: dg.AssetExecutionContext):
"""Save processed results. Mirrors Airflow @task save_results.
retry_policy replaces Airflow's default_args retries/retry_delay.
"""
context.log.info("Saving 5 processed items from api")
return "Saved 5 items from api"
_sequential_job = dg.define_asset_job(
"sequential_pipeline_job",
selection=[fetch_data, process_data, save_results],
)
@dg.schedule(cron_schedule="0 9 * * *", job=_sequential_job)
def daily_9am_schedule():
"""Run the sequential pipeline daily at 9am. Mirrors Airflow schedule='0 9 * * *'."""
yield dg.RunRequest()
Changes
Tasks become assets. Each Airflow @task becomes a @dg.asset.
** XCom becomes deps.** Airflow passes data between tasks through XCom (serialized values in the metadata database). Dagster uses deps, which declare execution order, not data transfer. When the process_data asset declares deps=[fetch_data], Dagster ensures the fetch_data asset materializes first, but process_data reads its inputs from your storage layer (a database, S3, etc.), not from a return value. If you want Dagster to handle the handoff automatically, declare the upstream asset as a function parameter and configure an I/O manager.
Retry policy moves to the asset. The DAG-level default_args retries become a RetryPolicy attached directly to the asset that needs it.
Airflow DAG tags become Dagster group names and tags. DAG tags map to group_name and tags on each asset, giving you the same grouping in the Dagster UI.
Schedule. The @dag(schedule="0 9 * * *") becomes a @dg.schedule wrapping a define_asset_job that selects the three assets.
Airflow Connections become Dagster resources. Airflow Connections (database URLs, API credentials stored in the Airflow metadata DB) become Dagster resources. You can define a resource class and inject it into your assets in the function signature—the equivalent of Variable.get() or BaseHook.get_connection() is a resource attribute read at runtime.
Airflow sensors become Dagster @sensor definitions. Airflow ExternalTaskSensor and FileSensor become Dagster @sensor definitions. A sensor polls for a condition and yields a RunRequest when it's met—the same pattern, but version-controlled alongside your asset code.
Backfill behavior. Airflow's catchup=True is the default; Dagster doesn't have a direct equivalent. If you need historical backfills, trigger them explicitly using backfill.