Dagster’s software-defined assets (SDAs) bear several similarities to dbt models. A software-defined asset contains an asset key, a set of upstream asset keys, and an operation that is responsible for computing the asset from its upstream dependencies. Models defined in a dbt project can be interpreted as Dagster SDAs:
The asset key for a dbt model is (by default) the name of the model.
The upstream dependencies of a dbt model are defined with ref or source calls within the model's definition.
The computation required to compute the asset from its upstream dependencies is the SQL within the model's definition.
These similarities make it natural to interact with dbt models as SDAs. Let’s take a look at a dbt model and an SDA, in code:
Here's what's happening in this example:
The first code block is a dbt model
As dbt models are named using file names, this model is named orders
The data for this model comes from a dependency named raw_orders
The second code block is a Dagster asset
The asset key corresponds to the name of the dbt model, orders
raw_orders is provided as an argument to the asset, defining it as a dependency
This creates a directory called project_dagster/ inside the current directory. The project_dagster/ directory contains a set of files that define a Dagster project that loads the dbt project at the path defined by --dbt-project-dir. The path to the dbt project must contain a dbt_project.yml.
The dagster-dbt library offers @dbt_assets to define Dagster assets for dbt models. It requires a dbt manifest, or manifest.json, to be created from your dbt project to parse your dbt project's representation.
The manifest can be created in two ways:
At run time: A dbt manifest is generated when your Dagster definitions are loaded, or
At build time: A dbt manifest is generated before loading your Dagster definitions and is included as part of your Python package.
When deploying your Dagster project to production, we recommend generating the manifest at build time to avoid the overhead of recompiling your dbt project every time your Dagster code is executed. A manifest.json should be precompiled and included in the Python package for your Dagster code.
In the Dagster project created by the dagster-dbt project scaffold command line interface, we offer you both ways to load your dbt models:
import os
from pathlib import Path
from dagster_dbt import DbtCliResource
dbt_project_dir = Path(__file__).joinpath("..","..","..").resolve()
dbt = DbtCliResource(project_dir=os.fspath(dbt_project_dir))# If DAGSTER_DBT_PARSE_PROJECT_ON_LOAD is set, a manifest will be created at runtime.# Otherwise, we expect a manifest to be present in the project's target directory.if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"):
dbt_manifest_path =(
dbt.cli(["--quiet","parse"],
target_path=Path("target"),).wait().target_path.joinpath("manifest.json"))else:
dbt_manifest_path = dbt_project_dir.joinpath("target","manifest.json")
As the comment explains, the code gives you a choice about how to create this dbt manifest. Based on the DAGSTER_DBT_PARSE_PROJECT_ON_LOAD environment variable, either:
At run time: This code generates the manifest.json for you. This is the easiest option during development because you never need to worry about the file being out-of-date with your dbt project, or
At build time: This code leaves it up to you to generate the manifest.json file on your own, and this code just reads it.
When developing locally, you can run the following command to generate the manifest at run time for your dbt and Dagster project:
DAGSTER_DBT_PARSE_PROJECT_ON_LOAD=1 dagster dev
In production, DAGSTER_DBT_PARSE_PROJECT_ON_LOAD should be unset so that your project uses the precompiled manifest.
Got questions about our recommendations or something to add? Join our GitHub discussion to share how you deploy your Dagster code with your dbt project.
When deploying your Dagster project to production, your dbt project must be present alongside your Dagster project so that dbt commands can be executed. As a result, we recommend that you set up your continuous integration and continuous deployment (CI/CD) workflows to package the dbt project with your Dagster project.
Deploying a dbt project from a separate git repository#
If you are managing your Dagster project in a separate git repository from your dbt project, you should include the following steps in your CI/CD workflows.
In your CI/CD workflows for your Dagster project:
Include any secrets that are required by your dbt project in your CI/CD environment.
Clone the dbt project repository as a subdirectory of your Dagster project.
Run dbt deps to build your dbt project's dependencies.
Run dbt parse to create a dbt manifest for your Dagster project.
In the CI/CD workflows for your dbt project, set up a dispatch action to trigger a deployment of your Dagster project when your dbt project changes.
With Dagster Cloud, we streamline this option. As part of our Dagster Cloud onboarding for dbt users, we can automatically create a Dagster project in an existing dbt project repository.
If you are managing your Dagster project in the same git repository as your dbt project, you should include the following steps in your CI/CD workflows.
In your CI/CD workflows for your Dagster and dbt project:
Include any secrets that are required by your dbt project in your CI/CD environment.
Run dbt deps to build your dbt project's dependencies.
Run dbt parse to create a dbt manifest for your Dagster project.
In this example, we use the build_schedule_from_dbt_selection function to create a job, daily_dbt_models, as well as a schedule which will execute this job once a day. We define the set of models we'd like to execute using dbt's selection syntax, in this case selecting only the models with the tag daily.
Scheduling jobs that contain dbt assets and non-dbt assets#
In many cases, it's useful to be able to schedule dbt assets alongside non-dbt assets. In this example, we build an AssetSelection of dbt assets using build_dbt_asset_selection, then select all assets (dbt-related or not) which are downstream of these dbt models. From there, we create a job that targets that selection of assets and schedule it to run daily.
from dagster import define_asset_job, ScheduleDefinition
from dagster_dbt import build_dbt_asset_selection, dbt_assets
@dbt_assets(manifest=manifest)defmy_dbt_assets():...# selects all models tagged with "daily", and all their downstream asset dependencies
daily_selection = build_dbt_asset_selection([my_dbt_assets], dbt_select="tag:daily").downstream()
daily_dbt_assets_and_downstream_schedule = ScheduleDefinition(
job=define_asset_job("daily_assets", selection=daily_selection),
cron_schedule="@daily",)
In Dagster, each asset definition has attributes. Dagster automatically generates these attributes for each software-defined asset loaded from the dbt project. These attributes can optionally be overridden by the user.
Overriding Dagster's asset key generation by implementing a custom DagsterDbtTranslator.
To override an asset key generated by Dagster for a dbt node, you can define a meta key on your dbt node's .yml file. The following example overrides the asset key for a source and table as snowflake/jaffle_shop/orders:
Alternatively, to override the asset key generation for all dbt nodes in your dbt project, you can create a custom DagsterDbtTranslator and implement DagsterDbtTranslator.get_asset_key. The following example adds a snowflake prefix to the default generated asset key:
from pathlib import Path
from dagster import AssetKey, AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping
manifest_path = Path("path/to/dbt_project/target/manifest.json")classCustomDagsterDbtTranslator(DagsterDbtTranslator):defget_asset_key(self, dbt_resource_props: Mapping[str, Any])-> AssetKey:returnsuper().get_asset_key(dbt_resource_props).with_prefix("snowflake")@dbt_assets(
manifest=manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),)defmy_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):yieldfrom dbt.cli(["build"], context=context).stream()
Overriding Dagster's group name generation by implementing a custom DagsterDbtTranslator
To override the group name generated by Dagster for a dbt node, you can define a meta key in your dbt project file, on your dbt node's property file, or on the node's in-file config block. The following example overrides the Dagster group name for the following model as marketing:
Alternatively, to override the Dagster group name generation for all dbt nodes in your dbt project, you can create a custom DagsterDbtTranslator and implement DagsterDbtTranslator.get_group_name. The following example defines snowflake as the group name for all dbt nodes:
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping, Optional
manifest_path = Path("path/to/dbt_project/target/manifest.json")classCustomDagsterDbtTranslator(DagsterDbtTranslator):defget_group_name(
self, dbt_resource_props: Mapping[str, Any])-> Optional[str]:return"snowflake"@dbt_assets(
manifest=manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),)defmy_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):yieldfrom dbt.cli(["build"], context=context).stream()
For dbt models, seeds, and snapshots, the default Dagster description will be the dbt node's description.
To override the Dagster description for all dbt nodes in your dbt project, you can create a custom DagsterDbtTranslator and implement DagsterDbtTranslator.get_description. The following example defines the raw SQL of the dbt node as the Dagster description:
import textwrap
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping
manifest_path = Path("path/to/dbt_project/target/manifest.json")classCustomDagsterDbtTranslator(DagsterDbtTranslator):defget_description(self, dbt_resource_props: Mapping[str, Any])->str:return textwrap.indent(dbt_resource_props.get("raw_sql",""),"\t")@dbt_assets(
manifest=manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),)defmy_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):yieldfrom dbt.cli(["build"], context=context).stream()
For dbt models, seeds, and snapshots, the default Dagster metadata will be the dbt node's declared column schema.
To override the Dagster metadata for all dbt nodes in your dbt project, you can create a custom DagsterDbtTranslator and implement DagsterDbtTranslator.get_metadata. The following example defines the metadata of the dbt node as the Dagster metadata, using MetadataValue:
from pathlib import Path
from dagster import MetadataValue, AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping
manifest_path = Path("path/to/dbt_project/target/manifest.json")classCustomDagsterDbtTranslator(DagsterDbtTranslator):defget_metadata(
self, dbt_resource_props: Mapping[str, Any])-> Mapping[str, Any]:return{"dbt_metadata": MetadataValue.json(dbt_resource_props.get("meta",{}))}@dbt_assets(
manifest=manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),)defmy_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):yieldfrom dbt.cli(["build"], context=context).stream()
Overriding Dagster's auto-materialize policy generation by implementing a custom DagsterDbtTranslator.
To add an AutoMaterializePolicy to a dbt node, you can define a meta key in your dbt project file, on your dbt node's property file, or on the node's in-file config block. This policy may be one of two types, eager or lazy. The following example provides an eager AutoMaterializePolicy for the following model:
Overriding Dagster's freshness policy generation by implementing a custom DagsterDbtTranslator.
To add a FreshnessPolicy to a dbt node, you can define a meta key in your dbt project file, on your dbt node's property file, or on the node's in-file config block. This config accepts identical arguments to the FreshnessPolicy class. The following example applies a FreshnessPolicy for the following model:
Alternatively, to override the Dagster freshness policy generation for all dbt nodes in your dbt project, you can create a custom DagsterDbtTranslator and implement DagsterDbtTranslator.get_freshness_policy. The following example defines a FreshnessPolicy with maximum_lag_minutes=60 as the freshness policy for all dbt nodes:
from pathlib import Path
from dagster import AssetExecutionContext, FreshnessPolicy
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from typing import Any, Mapping, Optional
manifest_path = Path("path/to/dbt_project/target/manifest.json")classCustomDagsterDbtTranslator(DagsterDbtTranslator):defget_freshness_policy(
self, dbt_resource_props: Mapping[str, Any])-> Optional[FreshnessPolicy]:return FreshnessPolicy(maximum_lag_minutes=60)@dbt_assets(
manifest=manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),)defmy_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):yieldfrom dbt.cli(["build"], context=context).stream()
Note that Dagster allows the optional specification of a code_version for each software-defined asset, which are used to track changes. The code_version for an asset arising from a dbt model is defined automatically as the hash of the SQL defining the DBT model. This allows the asset graph in the UI to indicate which dbt models have new SQL since they were last materialized.
Dagster asset checks are currently an experimental feature. To provide feedback, join our GitHub discussion to share your use case with Dagster asset checks and dbt.
Dagster allows you to model your existing dbt tests as asset checks.
Dagster allows you to emit column schema materialization metadata, which includes the column names and data types of your materialized dbt models, seeds, and snapshots.
With this metadata, you can view documentation in Dagster for all columns, not just columns described in your dbt project.
To collect this metadata for your dbt project, specify the dagster dbt package in your packages.yml or dependencies.yml, and then run dbt deps.
packages:-git:"https://github.com/dagster-io/dagster.git"subdirectory:"python_modules/libraries/dagster-dbt/dbt_packages/dagster"revision: DAGSTER_VERSION # replace with the version of `dagster` you are using.
Then, enable the dagster.log_columns_in_relation() macro as a post-hook for the dbt resources that should emit column schema metadata. For example, adding the following snippet in dbt_project.yml enables this macro for all dbt models, seeds, and snapshots:
Defining an asset as an upstream dependency of a dbt model#
Dagster allows you to define existing assets as upstream dependencies of dbt models. For example, say you have the following asset with asset key upstream:
from dagster import asset
@assetdefupstream():...
In order to define this asset as an upstream dependency for a dbt model, you'll need to first declare it as a data source in the sources.yml file. Here, you can explicitly provide your asset key to a source table:
Dagster parses information about assets that are upstream of specific dbt models from the dbt project itself. Whenever a model is downstream of a dbt source, that source will be parsed as an upstream asset.
For example, if you defined a source in your sources.yml file like this:
sources:-name: jaffle_shop
tables:-name: orders
and use it in a model:
select*from {{ source("jaffle_shop","orders") }}
where foo=1
Then this model has an upstream source with the jaffle_shop/orders asset key.
In order to manage this upstream asset with Dagster, you can define it by passing the key into an asset definition via get_asset_key_for_source:
from dagster import asset, AssetExecutionContext
from dagster_dbt import DbtCliResource, get_asset_key_for_source, dbt_assets
@dbt_assets(manifest=MANIFEST_PATH)defmy_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):...@asset(key=get_asset_key_for_source([my_dbt_assets],"jaffle_shop"))deforders():return...
This allows you to change asset keys within your dbt project without having to update the corresponding Dagster definitions.
The get_asset_key_for_source method is used when a source has only one table. However, if a source contains multiple tables, like this example:
sources:-name: clients_data
tables:-name: names
-name: history
Dagster allows you to define assets that are downstream of specific dbt models via get_asset_key_for_model. The below example defines my_downstream_asset as a downstream dependency of my_dbt_model:
from dagster_dbt import get_asset_key_for_model
from dagster import asset
@asset(deps=[get_asset_key_for_model([my_dbt_assets],"my_dbt_model")])defmy_downstream_asset():...
In the downstream asset, you may want direct access to the contents of the dbt model. To do so, you can customize the code within your @asset-decorated function to load upstream data.
Dagster alternatively allows you to delegate loading data to an I/O manager. For example, if you wanted to consume a dbt model with the asset key my_dbt_model as a Pandas dataframe, that would look something like the following:
from dagster_dbt import get_asset_key_for_model
from dagster import AssetIn, asset
@asset(
ins={"my_dbt_model": AssetIn(
input_manager_key="pandas_df_manager",
key=get_asset_key_for_model([my_dbt_assets],"my_dbt_model"),)},)defmy_downstream_asset(my_dbt_model):# my_dbt_model is a Pandas dataframereturn my_dbt_model.where(foo="bar")
You can define a Dagster PartitionDefinition alongside dbt in order to build incremental models.
Partitioned assets will be able to access the TimeWindow's start and end dates, and these can be passed to dbt's CLI as variables which can be used to filter incremental models.
When a partition definition to passed to the @dbt_assets decorator, all assets are defined to operate on the same partitions. With this in mind, we can retrieve any time window from AssetExecutionContext.partition_time_window property in order to get the current start and end partitions.
import json
from pathlib import Path
from dagster import DailyPartitionDefinition, OpExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(
manifest=Path("target","manifest.json"),
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"))defpartitionshop_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
start, end = context.partition_time_window
dbt_vars ={"min_date": start.isoformat(),"max_date": end.isoformat()}
dbt_build_args =["build","--vars", json.dumps(dbt_vars)]yieldfrom dbt.cli(dbt_build_args, context=context).stream()
With the variables defined, we can now reference min_date and max_date in our SQL and configure the dbt model as incremental. Here, we define an incremental run to operate on rows with order_date that is between our min_date and max_date.
-- Configure the model as incremental
{{ config(materialized='incremental') }}
select*from {{ ref('my_model') }}
-- Use the Dagster partition variables to filter rows on an incremental run
{%if is_incremental()%}
where order_date >='{{ var('min_date') }}'and order_date <='{{ var('max_date') }}'
{% endif %}