dagster-dbt library
Dagster orchestrates dbt alongside other technologies, so you can combine dbt with Spark, Python, and other tools in a single workflow. Dagster’s software-defined asset abstractions make it simple to define data assets that depend on specific dbt models, or define the computation required to compute the sources that your dbt models depend on.
For more information on using the dbt and dbt Cloud integrations, see the Dagster & dbt and Dagster & dbt Cloud docs.
DbtProjectComponent
classdagster_dbt.DbtProjectComponent [source]Expose a DBT project to Dagster as a set of assets.
This component assumes that you have already set up a dbt project, for example, the dbt Jaffle shop. Run git clone –depth=1 https://github.com/dbt-labs/jaffle-shop.git jaffle_shop && rm -rf jaffle_shop/.git to copy that project into your Dagster project directory.
Scaffold a DbtProjectComponent definition by running dg scaffold defs dagster_dbt.DbtProjectComponent –project-path path/to/your/existing/dbt_project in the Dagster project directory.
Example:
# defs.yaml
type: dagster_dbt.DbtProjectComponent
attributes:
project: "{{ project_root }}/path/to/dbt_project"
cli_args:
- build- execute [source]
Executes the dbt command for the selected assets.
This method can be overridden in a subclass to customize the execution behavior, such as adding custom logging, modifying CLI arguments, or handling events differently.
Parameters:
- context – The asset execution context provided by Dagster
- dbt – The DbtCliResource used to execute dbt commands
Yields: Events from the dbt CLI execution (e.g., AssetMaterialization, AssetObservation)
Example:
Override this method to add custom logging before and after execution:
from dagster_dbt import DbtProjectComponent
import dagster as dg
class CustomDbtProjectComponent(DbtProjectComponent):
def execute(self, context, dbt):
context.log.info("Starting custom dbt execution")
yield from super().execute(context, dbt)
context.log.info("Completed custom dbt execution")
- get_asset_spec [source]
Generates an AssetSpec for a given dbt node.
This method can be overridden in a subclass to customize how dbt nodes are converted to Dagster asset specs. By default, it delegates to the configured DagsterDbtTranslator.
Parameters:
- manifest – The dbt manifest dictionary containing information about all dbt nodes
- unique_id – The unique identifier for the dbt node (e.g., “model.my_project.my_model”)
- project – The DbtProject object, if available
Returns: An AssetSpec that represents the dbt node as a Dagster asset
Example:
Override this method to add custom tags to all dbt models:
from dagster_dbt import DbtProjectComponent
import dagster as dg
class CustomDbtProjectComponent(DbtProjectComponent):
def get_asset_spec(self, manifest, unique_id, project):
base_spec = super().get_asset_spec(manifest, unique_id, project)
return base_spec.replace_attributes(
tags={**base_spec.tags, "custom_tag": "my_value"}
)
To use the dbt component, see the dbt component integration guide.
Component YAML
When you scaffold a dbt component definition, the following defs.yaml configuration file will be created:
type: dagster_dbt.DbtProjectComponent
attributes:
project: '{{ context.project_root }}/dbt'
DbtCloudComponent
classdagster_dbt.DbtCloudComponent [source]Expose a dbt Cloud workspace to Dagster as a set of assets.
- get_asset_spec [source]
Generates an AssetSpec for a given dbt node.
This method can be overridden in a subclass to customize how dbt nodes are converted to Dagster asset specs. By default, it delegates to the configured DagsterDbtTranslator.
Parameters:
- manifest – The dbt manifest dictionary containing information about all dbt nodes.
- unique_id – The unique identifier for the dbt node (e.g., “model.my_project.my_model”).
- project – Always
Nonefor dbt Cloud (execution is remote).
Returns: An AssetSpec that represents the dbt node as a Dagster asset.
Example:
from dagster_dbt import DbtCloudComponent
import dagster as dg
class MyDbtCloudComponent(DbtCloudComponent):
def get_asset_spec(self, manifest, unique_id, project):
base_spec = super().get_asset_spec(manifest, unique_id, project)
return base_spec.replace_attributes(
tags={**base_spec.tags, "custom_tag": "my_value"}
)
dagster-dbt
dagster-dbt project
Commands for using a dbt project in Dagster.
dagster-dbt project [OPTIONS] COMMAND [ARGS]...
prepare-and-package
This command will invoke prepare_and_package on DbtProject found in the target module or file.
Note that this command runs dbt deps and dbt parse.
dagster-dbt project prepare-and-package [OPTIONS]
Options:
- --file <file>
The file containing DbtProject definitions to prepare.
- --components <components>
The path to a dg project directory containing DbtProjectComponents.
scaffold
This command will initialize a new Dagster project and create directories and files that load assets from an existing dbt project.
dagster-dbt project scaffold [OPTIONS]
Options:
- --project-name <project_name>
Required The name of the Dagster project to initialize for your dbt project.
- --dbt-project-dir <dbt_project_dir>
The path of your dbt project directory. This path must contain a dbt_project.yml file. By default, this command will assume that the current working directory contains a dbt project, but you can set a different directory by setting this option.
dbt Core
Here, we provide interfaces to manage dbt projects invoked by the local dbt command line interface (dbt CLI).
Assets (dbt Core)
- @dagster_dbt.dbt_assets [source]
Create a definition for how to compute a set of dbt resources, described by a manifest.json. When invoking dbt commands using
DbtCliResource’scli()method, Dagster events are emitted by callingyield fromon the event stream returned bystream().Parameters:
- manifest (Union[Mapping[str, Any], str, Path]) – The contents of a manifest.json file or the path to a manifest.json file. A manifest.json contains a representation of a dbt project (models, tests, macros, etc). We use this representation to create corresponding Dagster assets.
- select (str) – A dbt selection string for the models in a project that you want to include. Defaults to
fqn:*. - exclude (Optional[str]) – A dbt selection string for the models in a project that you want to exclude. Defaults to “”.
- selector (Optional[str]) – A dbt selector for the models in a project that you want to include. Cannot be combined with select or exclude. Defaults to None.
- name (Optional[str]) – The name of the op.
- io_manager_key (Optional[str]) – The IO manager key that will be set on each of the returned assets. When other ops are downstream of the loaded assets, the IOManager specified here determines how the inputs to those ops are loaded. Defaults to “io_manager”.
- partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the dbt assets.
- dagster_dbt_translator (Optional[DagsterDbtTranslator]) – Allows customizing how to map dbt models, seeds, etc. to asset keys and asset metadata.
- backfill_policy (Optional[BackfillPolicy]) – If a partitions_def is defined, this determines how to execute backfills that target multiple partitions. If a time window partition definition is used, this parameter defaults to a single-run policy.
- op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that computes the assets. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.
- required_resource_keys (Optional[Set[str]]) – Set of required resource handles.
- project (Optional[DbtProject]) – A DbtProject instance which provides a pointer to the dbt project location and manifest. Not required, but needed to attach code references from model code to Dagster assets.
- retry_policy (Optional[RetryPolicy]) – The retry policy for the op that computes the asset.
- pool (Optional[str]) – A string that identifies the concurrency pool that governs the dbt assets’ execution.
Examples:
Running
dbt buildfor a dbt project:from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()Running dbt commands with flags:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build", "--full-refresh"], context=context).stream()Running dbt commands with
--vars:import json
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_vars = {"key": "value"}
yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)], context=context).stream()Retrieving dbt artifacts after running a dbt command:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_build_invocation = dbt.cli(["build"], context=context)
yield from dbt_build_invocation.stream()
run_results_json = dbt_build_invocation.get_artifact("run_results.json")Running multiple dbt commands for a dbt project:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["run"], context=context).stream()
yield from dbt.cli(["test"], context=context).stream()Accessing the dbt event stream alongside the Dagster event stream:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_cli_invocation = dbt.cli(["build"], context=context)
# Each dbt event is structured: https://docs.getdbt.com/reference/events-logging
for dbt_event in dbt_invocation.stream_raw_events():
for dagster_event in dbt_event.to_default_asset_events(
manifest=dbt_invocation.manifest,
dagster_dbt_translator=dbt_invocation.dagster_dbt_translator,
context=dbt_invocation.context,
target_path=dbt_invocation.target_path,
):
# Manipulate `dbt_event`
...
# Then yield the Dagster event
yield dagster_eventCustomizing the Dagster asset definition metadata inferred from a dbt project using
DagsterDbtTranslator:from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
...
@dbt_assets(
manifest=Path("target", "manifest.json"),
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()Using a custom resource key for dbt:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, my_custom_dbt_resource_key: DbtCliResource):
yield from my_custom_dbt_resource_key.cli(["build"], context=context).stream()Using a dynamically generated resource key for dbt using required_resource_keys:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
dbt_resource_key = "my_custom_dbt_resource_key"
@dbt_assets(manifest=Path("target", "manifest.json"), required_resource_keys={my_custom_dbt_resource_key})
def my_dbt_assets(context: AssetExecutionContext):
dbt = getattr(context.resources, dbt_resource_key)
yield from dbt.cli(["build"], context=context).stream()Invoking another Dagster
ResourceDefinitionalongside dbt:from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
from dagster_slack import SlackResource
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource, slack: SlackResource):
yield from dbt.cli(["build"], context=context).stream()
slack_client = slack.get_client()
slack_client.chat_postMessage(channel="#my-channel", text="dbt build succeeded!")Defining and accessing Dagster
Configalongside dbt:from pathlib import Path
from dagster import AssetExecutionContext, Config
from dagster_dbt import DbtCliResource, dbt_assets
class MyDbtConfig(Config):
full_refresh: bool
@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource, config: MyDbtConfig):
dbt_build_args = ["build"]
if config.full_refresh:
dbt_build_args += ["--full-refresh"]
yield from dbt.cli(dbt_build_args, context=context).stream()Defining Dagster
PartitionDefinitionalongside dbt:import json
from pathlib import Path
from dagster import AssetExecutionContext, DailyPartitionDefinition
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(
manifest=Path("target", "manifest.json"),
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01")
)
def partitionshop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
time_window = context.partition_time_window
dbt_vars = {
"min_date": time_window.start.isoformat(),
"max_date": time_window.end.isoformat()
}
dbt_build_args = ["build", "--vars", json.dumps(dbt_vars)]
yield from dbt.cli(dbt_build_args, context=context).stream()
classdagster_dbt.DagsterDbtTranslator [source]Holds a set of methods that derive Dagster asset definition metadata given a representation of a dbt resource (models, tests, sources, etc).
This class is exposed so that methods can be overriden to customize how Dagster asset metadata is derived.
- get_asset_key [source]
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster asset key that represents that resource.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom asset key for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: The Dagster asset key for the dbt resource.Return type: AssetKey
Examples:
Adding a prefix to the default asset key generated for each dbt resource:
from typing import Any, Mapping
from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
return super().get_asset_key(dbt_resource_props).with_prefix("prefix")Adding a prefix to the default asset key generated for each dbt resource, but only for dbt sources:
from typing import Any, Mapping
from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
asset_key = super().get_asset_key(dbt_resource_props)
if dbt_resource_props["resource_type"] == "source":
asset_key = asset_key.with_prefix("my_prefix")
return asset_key
- get_auto_materialize_policy [source]
- beta
This API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster
dagster.AutoMaterializePolicyfor that resource.Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom auto-materialize policy for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: A Dagster auto-materialize policy.Return type: Optional[AutoMaterializePolicy]
Examples:
Set a custom auto-materialize policy for all dbt resources:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_auto_materialize_policy(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutoMaterializePolicy]:
return AutoMaterializePolicy.eager()Set a custom auto-materialize policy for dbt resources with a specific tag:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_auto_materialize_policy(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutoMaterializePolicy]:
auto_materialize_policy = None
if "my_custom_tag" in dbt_resource_props.get("tags", []):
auto_materialize_policy = AutoMaterializePolicy.eager()
return auto_materialize_policy
- get_automation_condition [source]
- beta
This API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster
dagster.AutoMaterializePolicyfor that resource.Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom AutomationCondition for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: A Dagster auto-materialize policy.Return type: Optional[AutoMaterializePolicy]
Examples:
Set a custom AutomationCondition for all dbt resources:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutomationCondition]:
return AutomationCondition.eager()Set a custom AutomationCondition for dbt resources with a specific tag:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutomationCondition]:
automation_condition = None
if "my_custom_tag" in dbt_resource_props.get("tags", []):
automation_condition = AutomationCondition.eager()
return automation_condition
- get_code_version [source]
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster code version for that resource.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom code version for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: A Dagster code version.Return type: Optional[str]
Examples:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_code_version(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
return dbt_resource_props["checksum"]["checksum"]
- get_description [source]
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster description for that resource.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom description for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: The description for the dbt resource.Return type: str
Examples:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str:
return "custom description"
- get_group_name [source]
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster group name for that resource.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom group name for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: A Dagster group name.Return type: Optional[str]
Examples:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
return "custom_group_prefix" + dbt_resource_props.get("config", {}).get("group")
- get_metadata [source]
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster metadata for that resource.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom metadata for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: A dictionary representing the Dagster metadata for the dbt resource.Return type: Mapping[str, Any]
Examples:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
return {"custom": "metadata"}
- get_owners [source]
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster owners for that resource.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide custom owners for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: A set of Dagster owners.Return type: Optional[Sequence[str]]
Examples:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_owners(self, dbt_resource_props: Mapping[str, Any]) -> Optional[Sequence[str]]:
return ["user@owner.com", "team:team@owner.com"]
- get_partition_mapping [source]
- beta
This API is currently in beta, and may have breaking changes in minor version releases, with behavior changes in patch releases.
A function that takes two dictionaries: the first, representing properties of a dbt resource; and the second, representing the properties of a parent dependency to the first dbt resource. The function returns the Dagster partition mapping for the dbt dependency.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom partition mapping for a dbt dependency.
Parameters:
- dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt child resource.
- dbt_parent_resource_props (Mapping[str, Any]) – A dictionary representing the dbt parent resource, in relationship to the child.
Returns: The Dagster partition mapping for the dbt resource. If None is returned, the default partition mapping will be used.Return type: Optional[PartitionMapping]
- get_tags [source]
A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster tags for that resource.
Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
dbt tags are strings, but Dagster tags are key-value pairs. To bridge this divide, the dbt tag string is used as the Dagster tag key, and the Dagster tag value is set to the empty string, “”.
Any dbt tags that don’t match Dagster’s supported tag key format (e.g. they contain unsupported characters) will be ignored.
This method can be overridden to provide custom tags for a dbt resource.
Parameters: dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.Returns: A dictionary representing the Dagster tags for the dbt resource.Return type: Mapping[str, str]
Examples:
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:
return {"custom": "tag"}
classdagster_dbt.DagsterDbtTranslatorSettings [source]Settings to enable Dagster features for your dbt project.
Parameters:
- enable_asset_checks (bool) – Whether to load dbt tests as Dagster asset checks. Defaults to True.
- enable_duplicate_source_asset_keys (bool) – Whether to allow dbt sources with duplicate Dagster asset keys. Defaults to False.
- enable_code_references (bool) – Whether to enable Dagster code references for dbt resources. Defaults to False.
- enable_dbt_selection_by_name (bool) – Whether to enable selecting dbt resources by name, rather than fully qualified name. Defaults to False.
- enable_source_tests_as_checks (bool) – Whether to load dbt source tests as Dagster asset checks. Defaults to False. If False, asset observations will be emitted for source tests.
- enable_source_metadata (bool) – Whether to include metadata on AssetDep objects for dbt sources. If set to True, enables the ability to remap upstream asset keys based on table name. Defaults to True.
- enable_dbt_views_as_virtual_assets (bool) – Whether to treat dbt models with
materialized: viewas virtual assets. When enabled, view models will haveis_virtual=Trueand"view"added to their kinds. Defaults to False.
classdagster_dbt.DbtManifestAssetSelection [source]Defines a selection of assets from a dbt manifest wrapper and a dbt selection string.
Parameters:
- manifest (Mapping[str, Any]) – The dbt manifest blob.
- select (str) – A dbt selection string to specify a set of dbt resources.
- exclude (Optional[str]) – A dbt selection string to exclude a set of dbt resources.
Examples:
import json
from pathlib import Path
from dagster_dbt import DbtManifestAssetSelection
manifest = json.loads(Path("path/to/manifest.json").read_text())
# select the dbt assets that have the tag "foo".
my_selection = DbtManifestAssetSelection(manifest=manifest, select="tag:foo")
- dagster_dbt.build_dbt_asset_selection [source]
Build an asset selection for a dbt selection string.
See https://docs.getdbt.com/reference/node-selection/syntax#how-does-selection-work for more information.
Parameters:
- dbt_select (str) – A dbt selection string to specify a set of dbt resources.
- dbt_exclude (Optional[str]) – A dbt selection string to exclude a set of dbt resources.
Returns: An asset selection for the selected dbt nodes.Return type: AssetSelection
Examples:
from dagster_dbt import dbt_assets, build_dbt_asset_selection
@dbt_assets(manifest=...)
def all_dbt_assets():
...
# Select the dbt assets that have the tag "foo".
foo_selection = build_dbt_asset_selection([dbt_assets], dbt_select="tag:foo")
# Select the dbt assets that have the tag "foo" and all Dagster assets downstream
# of them (dbt-related or otherwise)
foo_and_downstream_selection = foo_selection.downstream()Building an asset selection on a dbt assets definition with an existing selection:
from dagster_dbt import dbt_assets, build_dbt_asset_selection
@dbt_assets(
manifest=...
select="bar+",
)
def bar_plus_dbt_assets():
...
# Select the dbt assets that are in the intersection of having the tag "foo" and being
# in the existing selection "bar+".
bar_plus_and_foo_selection = build_dbt_asset_selection(
[bar_plus_dbt_assets],
dbt_select="tag:foo"
)
# Furthermore, select all assets downstream (dbt-related or otherwise).
bar_plus_and_foo_and_downstream_selection = bar_plus_and_foo_selection.downstream()
- dagster_dbt.build_schedule_from_dbt_selection [source]
Build a schedule to materialize a specified set of dbt resources from a dbt selection string.
See https://docs.getdbt.com/reference/node-selection/syntax#how-does-selection-work for more information.
Parameters:
- job_name (str) – The name of the job to materialize the dbt resources.
- cron_schedule (str) – The cron schedule to define the schedule.
- dbt_select (str) – A dbt selection string to specify a set of dbt resources.
- dbt_exclude (Optional[str]) – A dbt selection string to exclude a set of dbt resources.
- dbt_selector (str) – A dbt selector to select resources to materialize.
- schedule_name (Optional[str]) – The name of the dbt schedule to create.
- tags (Optional[Mapping[str, str]]) – A dictionary of tags (string key-value pairs) to attach to the scheduled runs.
- config (Optional[RunConfig]) – The config that parameterizes the execution of this schedule.
- execution_timezone (Optional[str]) – Timezone in which the schedule should run. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.
Returns: A definition to materialize the selected dbt resources on a cron schedule.Return type: ScheduleDefinition
Examples:
from dagster_dbt import dbt_assets, build_schedule_from_dbt_selection
@dbt_assets(manifest=...)
def all_dbt_assets():
...
daily_dbt_assets_schedule = build_schedule_from_dbt_selection(
[all_dbt_assets],
job_name="all_dbt_assets",
cron_schedule="0 0 * * *",
dbt_select="fqn:*",
)
- dagster_dbt.get_asset_key_for_model [source]
Return the corresponding Dagster asset key for a dbt model, seed, or snapshot.
Parameters:
- dbt_assets (AssetsDefinition) – An AssetsDefinition object produced by @dbt_assets.
- model_name (str) – The name of the dbt model, seed, or snapshot.
Returns: The corresponding Dagster asset key.Return type: AssetKey
Examples:
from dagster import asset
from dagster_dbt import dbt_assets, get_asset_key_for_model
@dbt_assets(manifest=...)
def all_dbt_assets():
...
@asset(deps={get_asset_key_for_model([all_dbt_assets], "customers")})
def cleaned_customers():
...
- dagster_dbt.get_asset_key_for_source [source]
Returns the corresponding Dagster asset key for a dbt source with a singular table.
Parameters: source_name (str) – The name of the dbt source.Raises: DagsterInvalidInvocationError – If the source has more than one table.Returns: The corresponding Dagster asset key.Return type: AssetKey
Examples:
from dagster import asset
from dagster_dbt import dbt_assets, get_asset_key_for_source
@dbt_assets(manifest=...)
def all_dbt_assets():
...
@asset(key=get_asset_key_for_source([all_dbt_assets], "my_source"))
def upstream_python_asset():
...
- dagster_dbt.get_asset_keys_by_output_name_for_source [source]
Returns the corresponding Dagster asset keys for all tables in a dbt source.
This is a convenience method that makes it easy to define a multi-asset that generates all the tables for a given dbt source.
Parameters: source_name (str) – The name of the dbt source.Returns: A mapping of the table name to corresponding Dagster asset key for all tables in the given dbt source.
Return type: Mapping[str, AssetKey]
Examples:
from dagster import AssetOut, multi_asset
from dagster_dbt import dbt_assets, get_asset_keys_by_output_name_for_source
@dbt_assets(manifest=...)
def all_dbt_assets():
...
@multi_asset(
outs={
name: AssetOut(key=asset_key)
for name, asset_key in get_asset_keys_by_output_name_for_source(
[all_dbt_assets], "raw_data"
).items()
},
)
def upstream_python_asset():
...
classdagster_dbt.DbtProject [source]Representation of a dbt project and related settings that assist with managing the project preparation.
Using this helps achieve a setup where the dbt manifest file and dbt dependencies are available and up-to-date:
- during development, pull the dependencies and reload the manifest at run time to pick up any changes.
- when deployed, expect a manifest that was created at build time to reduce start-up time.
The cli
dagster-dbt project prepare-and-packagecan be used as part of the deployment process to handle the project preparation.This object can be passed directly to
DbtCliResource.Parameters:
- project_dir (Union[str, Path]) – The directory of the dbt project.
- target_path (Union[str, Path]) – The path, relative to the project directory, to output artifacts. It corresponds to the target path in dbt. Default: “target”
- profiles_dir (Union[str, Path]) – The path to the directory containing your dbt profiles.yml. By default, the current working directory is used, which is the dbt project directory.
- profile (Optional[str]) – The profile from your dbt profiles.yml to use for execution, if it should be explicitly set.
- target (Optional[str]) – The target from your dbt profiles.yml to use for execution, if it should be explicitly set.
- packaged_project_dir (Optional[Union[str, Path]]) – A directory that will contain a copy of the dbt project and the manifest.json when the artifacts have been built. The prepare method will handle syncing the project_path to this directory. This is useful when the dbt project needs to be part of the python package data like when deploying using PEX.
- state_path (Optional[Union[str, Path]]) – The path, relative to the project directory, to reference artifacts from another run.
Examples:
Creating a DbtProject with by referencing the dbt project directory:
from pathlib import Path
from dagster_dbt import DbtProject
my_project = DbtProject(project_dir=Path("path/to/dbt_project"))Creating a DbtProject that changes target based on environment variables and uses manged state artifacts:
import os
from pathlib import Path
from dagster_dbt import DbtProject
def get_env():
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1":
return "BRANCH"
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "prod":
return "PROD"
return "LOCAL"
dbt_project = DbtProject(
project_dir=Path('path/to/dbt_project'),
state_path="target/managed_state",
target=get_env(),
)- prepare_if_dev [source]
Prepare a dbt project at run time during development, i.e. when dagster dev is used. This method has no effect outside this development context.
The preparation process ensures that the dbt manifest file and dbt dependencies are available and up-to-date. During development, it pulls the dependencies and reloads the manifest at run time to pick up any changes.
If this method returns successfully, self.manifest_path will point to a loadable manifest file. This method causes errors if the manifest file has not been correctly created by the preparation process.
Examples:
Preparing a DbtProject during development:
from pathlib import Path
from dagster import Definitions
from dagster_dbt import DbtProject
my_project = DbtProject(project_dir=Path("path/to/dbt_project"))
my_project.prepare_if_dev()
Definitions(
resources={
"dbt": DbtCliResource(project_dir=my_project),
},
...
)
Asset Checks (dbt Core)
- dagster_dbt.build_freshness_checks_from_dbt_assets [source]
- superseded
This API has been superseded. Create
FreshnessPolicyobjects for your dbt models by overridingget_asset_specin yourDagsterDbtTranslator, or by updating thetranslationconfiguration of yourDbtProjectComponentinstead..Returns a sequence of freshness checks constructed from the provided dbt assets.
Freshness checks can be configured on a per-model basis in the model schema configuration.
For assets which are not partitioned based on time, the freshness check configuration mirrors that of the
build_last_update_freshness_checks()function. lower_bound_delta is provided in terms of seconds, and deadline_cron is optional.For time-partitioned assets, the freshness check configuration mirrors that of the
build_time_partition_freshness_checks()function.Below is example of configuring a non-time-partitioned dbt asset with a freshness check. This code would be placed in the schema.yml file for the dbt model.
models:
- name: customers
...
meta:
dagster:
freshness_check:
lower_bound_delta_seconds: 86400 # 1 day
deadline_cron: "0 0 * * *" # Optional
severity: "WARN" # Optional, defaults to "WARN"Below is an example of configuring a time-partitioned dbt asset with a freshness check. This code would be placed in the schema.yml file for the dbt model.
models:
- name: customers
...
meta:
dagster:
freshness_check:
deadline_cron: "0 0 * * *"
severity: "WARN" # Optional, defaults to "WARN"Parameters: dbt_assets (Sequence[AssetsDefinition]) – A sequence of dbt assets to construct freshness checks from.Returns: A sequence of asset checks definitions representing the freshness checks for the provided dbt assets.
Return type: Sequence[AssetChecksDefinition]
Resources (dbt Core)
CLI resource
classdagster_dbt.DbtCliResource [source]A resource used to execute dbt CLI commands.
Parameters:
- project_dir (str) – The path to the dbt project directory. This directory should contain a dbt_project.yml. See https://docs.getdbt.com/reference/dbt_project.yml for more information.
- global_config_flags (List[str]) – A list of global flags configuration to pass to the dbt CLI invocation. Invoke dbt –help to see a full list of global flags.
- profiles_dir (Optional[str]) – The path to the directory containing your dbt profiles.yml. By default, the current working directory is used, which is the dbt project directory. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information.
- profile (Optional[str]) – The profile from your dbt profiles.yml to use for execution. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information.
- target (Optional[str]) – The target from your dbt profiles.yml to use for execution. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information.
- dbt_executable (str) – The path to the dbt executable. By default, this is dbt.
- state_path (Optional[str]) – The path, relative to the project directory, to a directory of dbt artifacts to be used with