Prefect to Dagster
Prefect flows are collections of tasks with an implicit execution order. The migration to Dagster maps each @task to an @asset and makes the dependency chain explicit with deps.
- Before
- After
A Prefect flow with three tasks:
from prefect import flow, task
@task
def fetch_data():
return {"records": [{"id": i, "value": i * 10} for i in range(1, 6)]}
@task
def process_data(data):
return {"total": sum(r["value"] for r in data["records"]), "count": len(data["records"])}
@task
def save_results(results):
print(f"Saving {results['count']} records with total {results['total']}")
return results
@flow(name="data-processing-flow")
def data_processing_flow():
data = fetch_data()
results = process_data(data)
save_results(results)
Three Dagster assets with a schedule:
"""Migration of a Prefect flow to Dagster assets.
Before (Prefect data-processing-flow):
from prefect import flow, task
@task
def fetch_data(): ... # returns {"records": [...]}
@task
def process_data(data): ... # returns {"total": ..., "count": ...}
@task
def save_results(results): ... # persists to storage
@flow(name="data-processing-flow")
def data_processing_flow():
data = fetch_data()
results = process_data(data)
save_results(results)
Each Prefect @task becomes a @dg.asset. The flow's implicit dependency chain
(fetch → process → save) is expressed via deps=. Schedules are attached the
same way as any other Dagster job.
"""
import dagster as dg
@dg.asset(group_name="prefect_migration")
def extract_records(context: dg.AssetExecutionContext):
"""Fetch records from an API. Mirrors Prefect @task fetch_data."""
context.log.info("Fetching data from API...")
records = [{"id": i, "value": i * 10} for i in range(1, 6)]
context.log.info(f"Fetched {len(records)} records")
return {"records": records}
@dg.asset(deps=[extract_records], group_name="prefect_migration")
def transform_records(context: dg.AssetExecutionContext):
"""Process extracted records. Mirrors Prefect @task process_data."""
# In production, load from storage or upstream asset output
records = [{"id": i, "value": i * 10} for i in range(1, 6)]
context.log.info(f"Processing {len(records)} records...")
return {
"total": sum(r["value"] for r in records),
"count": len(records),
}
@dg.asset(deps=[transform_records], group_name="prefect_migration")
def load_records(context: dg.AssetExecutionContext):
"""Persist transformed records. Mirrors Prefect @task save_results."""
context.log.info("Saving results to storage...")
# In production, write to your data store here
return dg.MaterializeResult(
metadata={
"rows_written": 5,
}
)
_etl_job = dg.define_asset_job(
"prefect_etl_job",
selection=[extract_records, transform_records, load_records],
)
@dg.schedule(cron_schedule="0 9 * * *", job=_etl_job)
def prefect_etl_schedule():
"""Daily run replacing Prefect's deployment schedule."""
yield dg.RunRequest()
Changes
Tasks become assets, return values become deps. Each Prefect @task becomes a @dg.asset. The implicit execution order inside the @flow function (fetch → process → save) is made explicit with deps. Note that unlike Prefect, where task return values flow directly into the next task's arguments, deps in Dagster only controls execution order—each asset reads its inputs from your storage layer rather than from an in-memory return value. If you want Dagster to manage the data handoff automatically, use an I/O manager.
Schedule. The @flow decorator and its deployment schedule are replaced by a @dg.schedule wrapping a define_asset_job. Unlike a Prefect deployment, the schedule is version-controlled alongside the asset definitions and visible in the Dagster UI without a separate deployment step.
Prefect Blocks become Dagster resources. Prefect Blocks (database connections, API credentials, storage configs) become Dagster resources. With Dagster, you can define a resource class, configure it per-environment in your Definitions, and inject it into assets in the function signature.
Prefect automations become Dagster @sensor definitions. Prefect automations that trigger flows based on events or external state become Dagster @sensor definitions. A sensor polls for a condition on each tick and yields a RunRequest when it's met.