Using resources in schedules
This example demonstrates how to use resources in schedules. To specify a resource dependency, annotate the resource as a parameter to the schedule's function.
note
This article assumes familiarity with resources, code locations and definitions, and schedule testing.
All Dagster definitions, including schedules and resources, must be attached to a Definitions
call.
import dagster as dg
from datetime import datetime
class DateFormatter(dg.ConfigurableResource):
format: str
def strftime(self, dt: datetime) -> str:
return dt.strftime(self.format)
@dg.job
def process_data(): ...
@dg.schedule(job=process_data, cron_schedule="* * * * *")
def process_data_schedule(
context: dg.ScheduleEvaluationContext,
date_formatter: DateFormatter,
):
formatted_date = date_formatter.strftime(context.scheduled_execution_time)
return dg.RunRequest(
run_key=None,
tags={"date": formatted_date},
)
defs = dg.Definitions(
jobs=[process_data],
schedules=[process_data_schedule],
resources={"date_formatter": DateFormatter(format="%Y-%m-%d")},
)
APIs in this guide
Name | Description |
---|---|
@dg.schedule | Decorator that defines a schedule that executes according to a given cron schedule. |
ConfigurableResource | |
@dg.job | The decorator used to define a job. |
RunRequest | A class that represents all the information required to launch a single run. |
RunConfig | |
Definitions |