Dagster allows you to define and execute data quality checks on your Software-defined Assets. Each asset check verifies some property of a data asset, e.g. that there are no null values in a particular column.
When viewing an asset in Dagster’s UI, you can see all of its checks, and whether they’ve passed, failed, or haven’t run.
The following code defines an asset named orders and an asset check named orders_id_has_no_nulls. When executed, the check verifies a property of the orders asset: that all the values in its primary key column are unique.
The orders_id_has_no_nulls check runs in its own op. That means that if you launch a run that materializes the orders asset and also executes the orders_id_has_no_nulls check and you’re using the multiprocess_executor, the check will execute in a separate process from the process that materializes the asset.
Checks that execute in the same op that materializes the asset#
Sometimes, it makes more sense for a single function to both materialize an asset and execute a check on it.
When defining an asset using the @asset or @multi_asset decorators, you can set the check_specs argument. Each provided AssetCheckSpec declares a check that the decorated function should yield an AssetCheckResult for.
import pandas as pd
from dagster import(
AssetCheckResult,
AssetCheckSpec,
AssetExecutionContext,
Definitions,
Output,
asset,)@asset(check_specs=[AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders")])deforders(context: AssetExecutionContext):
orders_df = pd.DataFrame({"order_id":[1,2],"item_id":[432,878]})# save the output and indicate that it's been saved
orders_df.to_csv("orders")yield Output(value=None)# check it
num_null_order_ids = orders_df["order_id"].isna().sum()yield AssetCheckResult(
passed=bool(num_null_order_ids ==0),)
defs = Definitions(assets=[orders])
You can optionally set AssetCheckSeverity on check results. The default severity is ERROR. Severity determines how the check result appears in the UI. If a check fails with ERROR severity, the asset will appear red in the lineage graph.
You can add information why a check passed or failed using the metadata argument on AssetCheckResult. We'll add num_null_order_ids as metadata to the orders_id_has_no_nulls example:
If you want to define many checks that are similar, you can use the factory pattern. Here's an example factory that accepts a list of sql statements and turns them in to asset checks.
from typing import Any, Mapping, Sequence
from mock import MagicMock
from dagster import(
AssetCheckResult,
AssetChecksDefinition,
Definitions,
asset,
asset_check,)@assetdeforders():...@assetdefitems():...defmake_check(check_blob: Mapping[str,str])-> AssetChecksDefinition:@asset_check(
name=check_blob["name"],
asset=check_blob["asset"],
required_resource_keys={"db_connection"},)def_check(context):
rows = context.resources.db_connection.execute(check_blob["sql"])return AssetCheckResult(passed=len(rows)==0, metadata={"num_rows":len(rows)})return _check
check_blobs =[{"name":"orders_id_has_no_nulls","asset":"orders","sql":"select * from orders where order_id is null",},{"name":"items_id_has_no_nulls","asset":"items","sql":"select * from items where item_id is null",},]
defs = Definitions(
assets=[orders, items],
asset_checks=[make_check(check_blob)for check_blob in check_blobs],
resources={"db_connection": MagicMock()},)
Materializing an asset from the UI will also execute any checks that are defined for that asset. You can also execute checks without materializing the asset from the Checks tab of the asset’s detail page.
You can use define_asset_job to define jobs that execute sets of both assets and checks, and then trigger those jobs via sensors or schedules. By default, checks are included with the assets they check. You can also define jobs that include only checks, or only assets.
from dagster import(
AssetSelection,
Definitions,
ScheduleDefinition,
asset,
asset_check,
define_asset_job,)@assetdefmy_asset():...@asset_check(asset=my_asset)defcheck_1():...@asset_check(asset=my_asset)defcheck_2():...# includes my_asset and both checks
my_job = define_asset_job("my_job", selection=AssetSelection.assets(my_asset))# includes only my_asset
my_asset_only_job = define_asset_job("my_asset_only_job",
selection=AssetSelection.assets(my_asset).without_checks(),)# includes check_1 and check_2, but not my_asset
checks_only_job = define_asset_job("checks_only_job", selection=AssetSelection.checks_for_assets(my_asset))# includes only check_1
check_1_job = define_asset_job("check_1_job", selection=AssetSelection.checks(check_1))# schedule my_job to run every day at midnight
basic_schedule = ScheduleDefinition(job=my_job, cron_schedule="0 0 * * *")
defs = Definitions(
assets=[my_asset],
asset_checks=[check_1, check_2],
jobs=[my_job, my_asset_only_job, checks_only_job, check_1_job],
schedules=[basic_schedule],)
Dagster's UI is tested with a maximum of 1000 checks per asset. It's designed with the expectation that most assets will have fewer than 50 checks. If you have a use case that doesn't fit these limits, you can reach out to discuss.
Checks are currently only supported per-asset, not per-partition. See this issue for updates.