Skip to main content

Freshness policies

info

Freshness policies are under active development. You may encounter feature gaps and the APIs may change.

Freshness policies help you understand which of your assets have materialized recently and which ones are running behind - a key component of asset health. Freshness policies also communicate expectations for data freshness, allowing downstream asset consumers to determine how often assets are expected to be updated.

For example, freshness policies can help identify stale assets caused by:

  • Misconfigured AutomationConditions
  • Runs not being scheduled due to an upstream failure
  • Runs taking longer than expected to complete

Enabling freshness policies

Freshness policies are not enabled by default while in preview. To use them in open source and local development, add the following to your dagster.yaml:

freshness:
enabled: True

To use freshness policies in Dagster+, sign up for the Observability update early access program.

Relationship to existing FreshnessPolicy

There is an existing FreshnessPolicy API that has been deprecated since version 1.6. We're opting to reuse the name for the new freshness APIs, and have renamed the deprecated functionality to LegacyFreshnessPolicy. To continue using the deprecated functionality, follow the instructions in the 1.11 migration guide.

Relationship to freshness checks

Freshness policies are not yet recommended for production use, but are intended to supersede freshness checks in a future release. Freshness checks are still and will continue to be supported for the foreseeable future. A migration guide will be provided.

Freshness policy types

Currently, we support time window-based freshness policies, which are suitable for most use cases. We plan to add more policy types in the future.

Time window

A time window freshness policy is useful when you expect an asset to have new data or be recalculated with some frequency. An asset that does not meet this condition will be considered failing its freshness policy. You can set an optional warning window on a freshness policy; if the asset does not successfully materialize within this window, it will enter a warning freshness state.

For example, the policy below states that there should be a successful materialization of the asset at least every 24 hours for it to be considered fresh, with a warning window of 12 hours:

from datetime import timedelta

from dagster.preview.freshness import FreshnessPolicy

# Create a freshness policy that requires a materialization at least once every 24 hours,
# and warns if the latest materialization is older than 12 hours.
policy = FreshnessPolicy.time_window(
fail_window=timedelta(hours=24), warn_window=timedelta(hours=12)
)
info
  • fail_window and warn_window cannot be shorter than 60 seconds.
  • warn_window must be less than fail_window.

Cron

A cron freshness policy is useful when you expect an asset to have new data or be recalculated on a known schedule.

The policy defines a cron schedule deadline_cron that denotes the deadline for the asset materialization. To account for the time it takes to materialize the asset, a lower_bound_delta time delta is also specified, which denotes an amount of time prior to each cron tick. Together, deadline_cron and lower_bound_delta define a recurring time window in which the asset is expected to materialize.

The asset is fresh if it materializes in this time window, and will remain fresh until at least the next deadline. If the asset has not materialized in the window after the deadline passes, it will fail freshness until it materializes again.

Example:

from datetime import timedelta

from dagster import asset
from dagster.preview.freshness import FreshnessPolicy


@asset(
freshness_policy=FreshnessPolicy.cron(
deadline_cron="0 10 * * *",
lower_bound_delta=timedelta(hours=1),
timezone="America/Los_Angeles",
)
)
def daily_asset():
"""Expected to materialize every day between 9:00am and 10:00am Pacific Time.

If the asset materializes between 9am and 10am, it is fresh, and will remain fresh until at least the next deadline (10am the next day).

If the asset has not materialized by 10am, it fails the freshness policy. It will remain in the fail state until it materializes again.
Once it materializes, it will become fresh and remain fresh until at least the next deadline (10am the next day).

"""
pass
info
  • deadline_cron must be a valid cron string and has a minimum resolution of 1 minute.
  • lower_bound_delta cannot be shorter than 1 minute, and must fit within the smallest interval of deadline_cron. Example: for deadline_cron="0 10 * * 1-5" (weekdays at 10am), lower_bound_delta must be between 1 minute and 24 hours.
  • timezone is optional. IANA timezones are supported. If not provided, defaults to UTC.

Setting freshness policies

On individual assets

You can configure a freshness policy directly on an asset:

from datetime import timedelta

from dagster import AssetSpec, asset
from dagster.preview.freshness import FreshnessPolicy

policy = FreshnessPolicy.time_window(fail_window=timedelta(hours=24))


@asset(freshness_policy=policy)
def my_asset():
pass


# Or on an asset spec
spec = AssetSpec("my_asset", freshness_policy=policy)

Across multiple assets

To apply freshness policies to multiple or all assets in your deployment, you can use map_asset_specs. Use map_resolved_asset_specs to apply a policy to an asset selection.

from datetime import timedelta

from dagster import Definitions, asset
from dagster.preview.freshness import FreshnessPolicy, apply_freshness_policy


@asset
def parent_asset():
pass


@asset(deps=[parent_asset])
def child_asset():
pass


@asset
def asset_2():
pass


policy = FreshnessPolicy.time_window(fail_window=timedelta(hours=24))

defs = Definitions(assets=[parent_asset, child_asset, asset_2])

# Apply the policy to multiple assets - in this case, all assets in defs
defs = defs.map_asset_specs(func=lambda spec: apply_freshness_policy(spec, policy))

# Use map_resolved_asset_specs to apply the policy to a selection
defs = defs.map_resolved_asset_specs(
func=lambda spec: apply_freshness_policy(spec, policy),
selection='key:"parent_asset"+', # will apply policy to parent_asset and its downstream dependencies
)

You can also use map_asset_specs directly on the asset specs before creating a Definitions object:

from datetime import timedelta

from dagster import asset, map_asset_specs
from dagster._core.definitions.definitions_class import Definitions
from dagster.preview.freshness import FreshnessPolicy, apply_freshness_policy


@asset
def asset_1():
pass


@asset
def asset_2():
pass


policy = FreshnessPolicy.time_window(fail_window=timedelta(hours=24))

assets = [asset_1, asset_2]
assets_with_policies = map_asset_specs(
func=lambda spec: apply_freshness_policy(spec, policy), iterable=assets
)

defs = Definitions(assets=assets_with_policies)
caution

Applying a freshness policy in this way to an asset with an existing freshness policy (for example, if it was defined in the @asset decorator) will overwrite the existing policy.

Setting a default freshness policy

Often, it's useful to set a default freshness policy across all assets, and override the default on individual assets.

To do so, you can use map_asset_specs with overwrite_existing set to False on the mapped function to avoid overwriting any pre-defined freshness policies:

from datetime import timedelta

from dagster import Definitions, asset
from dagster.preview.freshness import FreshnessPolicy, apply_freshness_policy


@asset
def default_asset():
"""This asset will have the default time window freshness policy applied to it."""
pass


@asset(
freshness_policy=FreshnessPolicy.cron(
deadline_cron="0 10 * * *",
lower_bound_delta=timedelta(hours=1),
)
)
def override_asset():
"""This asset will override the default policy with a cron freshness policy."""
pass


defs = Definitions(assets=[default_asset, override_asset])

default_policy = FreshnessPolicy.time_window(fail_window=timedelta(hours=24))

# This will apply default_policy to default_asset, but retain the cron policy on override_asset
defs = defs.map_asset_specs(
func=lambda spec: apply_freshness_policy(
spec, default_policy, overwrite_existing=False
),
)

Limitations

Freshness policies are not currently supported for source observable assets (SourceAssets) and cacheable assets (CacheableAssetsDefinition).

Future enhancements

  • More freshness policy types, including:
    • Anomaly detection-based
    • Custom (user-defined) freshness
  • Support for source observable assets