You are viewing an unreleased or outdated version of the documentation

Source code for dagster_dbt.core.resources_v2

import contextlib
import copy
import os
import shutil
import signal
import subprocess
import sys
import uuid
from contextlib import suppress
from dataclasses import InitVar, dataclass, field
from pathlib import Path
from typing import (
    Any,
    Dict,
    Iterator,
    List,
    Mapping,
    Optional,
    Sequence,
    Union,
    cast,
)

import dateutil.parser
import orjson
from dagster import (
    AssetCheckResult,
    AssetCheckSeverity,
    AssetExecutionContext,
    AssetMaterialization,
    AssetObservation,
    AssetsDefinition,
    ConfigurableResource,
    OpExecutionContext,
    Output,
    get_dagster_logger,
)
from dagster._annotations import public
from dagster._config.pythonic_config.pydantic_compat_layer import compat_model_validator
from dagster._core.errors import DagsterExecutionInterruptedError, DagsterInvalidPropertyError
from dbt.contracts.results import NodeStatus, TestStatus
from dbt.node_types import NodeType
from dbt.version import __version__ as dbt_version
from packaging import version
from pydantic import Field, validator
from typing_extensions import Literal

from ..asset_utils import (
    dagster_name_fn,
    default_metadata_from_dbt_resource_props,
    get_manifest_and_translator_from_dbt_assets,
)
from ..dagster_dbt_translator import (
    DagsterDbtTranslator,
    validate_opt_translator,
    validate_translator,
)
from ..dbt_manifest import (
    DbtManifestParam,
    validate_manifest,
)
from ..errors import DagsterDbtCliRuntimeError
from ..utils import ASSET_RESOURCE_TYPES, get_dbt_resource_props_by_dbt_unique_id_from_manifest

logger = get_dagster_logger()


DBT_EXECUTABLE = "dbt"
DBT_PROJECT_YML_NAME = "dbt_project.yml"
DBT_PROFILES_YML_NAME = "profiles.yml"
PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
DAGSTER_DBT_TERMINATION_TIMEOUT_SECONDS = 2


def _get_dbt_target_path() -> Path:
    return Path(os.getenv("DBT_TARGET_PATH", "target"))


[docs]@dataclass class DbtCliEventMessage: """The representation of a dbt CLI event. Args: raw_event (Dict[str, Any]): The raw event dictionary. See https://docs.getdbt.com/reference/events-logging#structured-logging for more information. event_history_metadata (Dict[str, Any]): A dictionary of metadata about the current event, gathered from previous historical events. """ raw_event: Dict[str, Any] event_history_metadata: InitVar[Dict[str, Any]] def __post_init__(self, event_history_metadata: Dict[str, Any]): self._event_history_metadata = event_history_metadata def __str__(self) -> str: return self.raw_event["info"]["msg"] @property def log_level(self) -> str: """The log level of the event.""" return self.raw_event["info"]["level"]
[docs] @public def to_default_asset_events( self, manifest: DbtManifestParam, dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(), context: Optional[OpExecutionContext] = None, ) -> Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]: """Convert a dbt CLI event to a set of corresponding Dagster events. Args: manifest (Union[Mapping[str, Any], str, Path]): The dbt manifest blob. dagster_dbt_translator (DagsterDbtTranslator): Optionally, a custom translator for linking dbt nodes to Dagster assets. Returns: Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]: A set of corresponding Dagster events. In a Dagster asset definition, the following are yielded: - Output for refables (e.g. models, seeds, snapshots.) - AssetCheckResult for dbt test results that are enabled as asset checks. - AssetObservation for dbt test results that are not enabled as asset checks. In a Dagster op definition, the following are yielded: - AssetMaterialization for dbt test results that are not enabled as asset checks. - AssetObservation for dbt test results. """ if self.raw_event["info"]["name"] != "NodeFinished": return adapter_response = self.raw_event["data"].get("run_result", {}).get("adapter_response", {}) adapter_response_metadata = self._process_adapter_response_metadata(adapter_response) event_node_info: Dict[str, Any] = self.raw_event["data"].get("node_info") if not event_node_info: return dagster_dbt_translator = validate_translator(dagster_dbt_translator) manifest = validate_manifest(manifest) if not manifest: logger.info( "No dbt manifest was provided. Dagster events for dbt tests will not be created." ) unique_id: str = event_node_info["unique_id"] invocation_id: str = self.raw_event["info"]["invocation_id"] dbt_resource_props = manifest["nodes"][unique_id] default_metadata = { **default_metadata_from_dbt_resource_props(self._event_history_metadata), "unique_id": unique_id, "invocation_id": invocation_id, } has_asset_def: bool = bool(context and context.has_assets_def) node_resource_type: str = event_node_info["resource_type"] node_status: str = event_node_info["node_status"] node_materialization: str = self.raw_event["data"]["node_info"]["materialized"] is_node_ephemeral = node_materialization == "ephemeral" is_node_successful = node_status == NodeStatus.Success is_node_finished = bool(event_node_info.get("node_finished_at")) if ( node_resource_type in NodeType.refable() and is_node_successful and not is_node_ephemeral ): started_at = dateutil.parser.isoparse(event_node_info["node_started_at"]) finished_at = dateutil.parser.isoparse(event_node_info["node_finished_at"]) duration_seconds = (finished_at - started_at).total_seconds() if has_asset_def: yield Output( value=None, output_name=dagster_name_fn(event_node_info), metadata={ **default_metadata, "Execution Duration": duration_seconds, **adapter_response_metadata, }, ) else: dbt_resource_props = manifest["nodes"][unique_id] asset_key = dagster_dbt_translator.get_asset_key(dbt_resource_props) yield AssetMaterialization( asset_key=asset_key, metadata={ **default_metadata, "Execution Duration": duration_seconds, **adapter_response_metadata, }, ) elif manifest and node_resource_type == NodeType.Test and is_node_finished: upstream_unique_ids: List[str] = manifest["parent_map"][unique_id] test_resource_props = manifest["nodes"][unique_id] metadata = { **default_metadata, "status": node_status, **adapter_response_metadata, } is_asset_check = dagster_dbt_translator.settings.enable_asset_checks attached_node_unique_id = test_resource_props.get("attached_node") is_generic_test = bool(attached_node_unique_id) if has_asset_def and is_asset_check and is_generic_test: is_test_successful = node_status == TestStatus.Pass severity = ( AssetCheckSeverity.WARN if node_status == TestStatus.Warn else AssetCheckSeverity.ERROR ) attached_node_resource_props: Dict[str, Any] = manifest["nodes"].get( attached_node_unique_id ) or manifest["sources"].get(attached_node_unique_id) attached_node_asset_key = dagster_dbt_translator.get_asset_key( attached_node_resource_props ) yield AssetCheckResult( passed=is_test_successful, asset_key=attached_node_asset_key, check_name=event_node_info["node_name"], metadata=metadata, severity=severity, ) else: for upstream_unique_id in upstream_unique_ids: upstream_resource_props: Dict[str, Any] = manifest["nodes"].get( upstream_unique_id ) or manifest["sources"].get(upstream_unique_id) upstream_asset_key = dagster_dbt_translator.get_asset_key( upstream_resource_props ) yield AssetObservation( asset_key=upstream_asset_key, metadata=metadata, )
def _process_adapter_response_metadata( self, adapter_response: Dict[str, Any] ) -> Dict[str, Any]: """Process the adapter response metadata for a dbt CLI event. The main interface for AdapterResponse is found at https://github.com/dbt-labs/dbt-adapters. Currently, we pre-process the following dbt adapters, which have custom responses: - https://github.com/dbt-labs/dbt-bigquery: BigQueryAdapterResponse - https://github.com/databricks/dbt-databricks: DatabricksAdapterResponse - https://github.com/dbt-labs/dbt-snowflake: SnowflakeAdapterResponse - https://github.com/starburstdata/dbt-trino: TrinoAdapterResponse """ allowlisted_adapter_response_keys = [ # AdapterResponse *[ "rows_affected", ], # BigQueryAdapterResponse *[ "bytes_processed", "bytes_billed", "job_id", "slot_ms", ], # DatabricksAdapterResponse, SnowflakeAdapterResponse, TrinoAdapterResponse *[ "query_id", ], ] processed_adapter_response = { key: value for key, value in adapter_response.items() if (key in allowlisted_adapter_response_keys and value) } return processed_adapter_response
[docs]@dataclass class DbtCliInvocation: """The representation of an invoked dbt command. Args: process (subprocess.Popen): The process running the dbt command. manifest (Mapping[str, Any]): The dbt manifest blob. project_dir (Path): The path to the dbt project. target_path (Path): The path to the dbt target folder. raise_on_error (bool): Whether to raise an exception if the dbt command fails. """ process: subprocess.Popen manifest: Mapping[str, Any] dagster_dbt_translator: DagsterDbtTranslator project_dir: Path target_path: Path raise_on_error: bool log_level: Literal["info", "debug"] context: Optional[OpExecutionContext] = field(default=None, repr=False) termination_timeout_seconds: float = field( init=False, default=DAGSTER_DBT_TERMINATION_TIMEOUT_SECONDS ) _stdout: List[str] = field(init=False, default_factory=list) _error_messages: List[str] = field(init=False, default_factory=list) @classmethod def run( cls, args: List[str], env: Dict[str, str], manifest: Mapping[str, Any], dagster_dbt_translator: DagsterDbtTranslator, project_dir: Path, target_path: Path, raise_on_error: bool, log_level: Literal["info", "debug"], context: Optional[OpExecutionContext], ) -> "DbtCliInvocation": # Attempt to take advantage of partial parsing. If there is a `partial_parse.msgpack` in # in the target folder, then copy it to the dynamic target path. # # This effectively allows us to skip the parsing of the manifest, which can be expensive. # See https://docs.getdbt.com/reference/programmatic-invocations#reusing-objects for more # details. current_target_path = _get_dbt_target_path() partial_parse_file_path = ( current_target_path.joinpath(PARTIAL_PARSE_FILE_NAME) if current_target_path.is_absolute() else project_dir.joinpath(current_target_path, PARTIAL_PARSE_FILE_NAME) ) partial_parse_destination_target_path = target_path.joinpath(PARTIAL_PARSE_FILE_NAME) if partial_parse_file_path.exists() and not partial_parse_destination_target_path.exists(): logger.info( f"Copying `{partial_parse_file_path}` to `{partial_parse_destination_target_path}`" " to take advantage of partial parsing." ) partial_parse_destination_target_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy(partial_parse_file_path, partial_parse_destination_target_path) # Create a subprocess that runs the dbt CLI command. process = subprocess.Popen( args=args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env, cwd=project_dir, ) dbt_cli_invocation = cls( process=process, manifest=manifest, dagster_dbt_translator=dagster_dbt_translator, project_dir=project_dir, target_path=target_path, raise_on_error=raise_on_error, log_level=log_level, context=context, ) logger.info(f"Running dbt command: `{dbt_cli_invocation.dbt_command}`.") return dbt_cli_invocation
[docs] @public def wait(self) -> "DbtCliInvocation": """Wait for the dbt CLI process to complete. Returns: DbtCliInvocation: The current representation of the dbt CLI invocation. Examples: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource(project_dir="/path/to/dbt/project") dbt_cli_invocation = dbt.cli(["run"]).wait() """ list(self.stream_raw_events()) return self
[docs] @public def is_successful(self) -> bool: """Return whether the dbt CLI process completed successfully. Returns: bool: True, if the dbt CLI process returns with a zero exit code, and False otherwise. Examples: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource(project_dir="/path/to/dbt/project") dbt_cli_invocation = dbt.cli(["run"], raise_on_error=False) if dbt_cli_invocation.is_successful(): ... """ self._stdout = list(self._stream_stdout()) return self.process.wait() == 0
[docs] @public def stream( self, ) -> Iterator[ Union[ Output, AssetMaterialization, AssetObservation, AssetCheckResult, ] ]: """Stream the events from the dbt CLI process and convert them to Dagster events. Returns: Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]: A set of corresponding Dagster events. In a Dagster asset definition, the following are yielded: - Output for refables (e.g. models, seeds, snapshots.) - AssetCheckResult for dbt test results that are enabled as asset checks. - AssetObservation for dbt test results that are not enabled as asset checks. In a Dagster op definition, the following are yielded: - AssetMaterialization for dbt test results that are not enabled as asset checks. - AssetObservation for dbt test results. Examples: .. code-block:: python from pathlib import Path from dagster_dbt import DbtCliResource, dbt_assets @dbt_assets(manifest=Path("target", "manifest.json")) def my_dbt_assets(context, dbt: DbtCliResource): yield from dbt.cli(["run"], context=context).stream() """ for event in self.stream_raw_events(): yield from event.to_default_asset_events( manifest=self.manifest, dagster_dbt_translator=self.dagster_dbt_translator, context=self.context, )
[docs] @public def stream_raw_events(self) -> Iterator[DbtCliEventMessage]: """Stream the events from the dbt CLI process. Returns: Iterator[DbtCliEventMessage]: An iterator of events from the dbt CLI process. """ event_history_metadata_by_unique_id: Dict[str, Dict[str, Any]] = {} for log in self._stdout or self._stream_stdout(): try: raw_event: Dict[str, Any] = orjson.loads(log) unique_id: Optional[str] = raw_event["data"].get("node_info", {}).get("unique_id") event_history_metadata: Dict[str, Any] = {} if unique_id and raw_event["info"]["name"] == "NodeFinished": event_history_metadata = copy.deepcopy( event_history_metadata_by_unique_id.get(unique_id, {}) ) event = DbtCliEventMessage( raw_event=raw_event, event_history_metadata=event_history_metadata ) is_error_message = event.log_level == "error" is_debug_message = event.log_level == "debug" is_debug_user_log_level = self.log_level == "debug" # Parse the error message from the event, if it exists. if is_error_message: self._error_messages.append(str(event)) # Attempt to parse the columns metadata from the event message. # If it exists, save it as historical metadata to attach to the NodeFinished event. if event.raw_event["info"]["name"] == "JinjaLogInfo": with contextlib.suppress(orjson.JSONDecodeError): columns = orjson.loads(event.raw_event["info"]["msg"]) event_history_metadata_by_unique_id[cast(str, unique_id)] = { "columns": columns } # Don't show this message in stdout continue # Only write debug logs to stdout if the user explicitly set # the log level to debug. if not is_debug_message or is_debug_user_log_level: sys.stdout.write(str(event) + "\n") sys.stdout.flush() yield event except: # If we can't parse the log, then just emit it as a raw log. sys.stdout.write(log + "\n") sys.stdout.flush() # Ensure that the dbt CLI process has completed. self._raise_on_error()
[docs] @public def get_artifact( self, artifact: Union[ Literal["manifest.json"], Literal["catalog.json"], Literal["run_results.json"], Literal["sources.json"], ], ) -> Dict[str, Any]: """Retrieve a dbt artifact from the target path. See https://docs.getdbt.com/reference/artifacts/dbt-artifacts for more information. Args: artifact (Union[Literal["manifest.json"], Literal["catalog.json"], Literal["run_results.json"], Literal["sources.json"]]): The name of the artifact to retrieve. Returns: Dict[str, Any]: The artifact as a dictionary. Examples: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource(project_dir="/path/to/dbt/project") dbt_cli_invocation = dbt.cli(["run"]).wait() # Retrieve the run_results.json artifact. run_results = dbt_cli_invocation.get_artifact("run_results.json") """ artifact_path = self.target_path.joinpath(artifact) return orjson.loads(artifact_path.read_bytes())
@property def dbt_command(self) -> str: """The dbt CLI command that was invoked.""" return " ".join(cast(Sequence[str], self.process.args)) def _stream_stdout(self) -> Iterator[str]: """Stream the stdout from the dbt CLI process.""" try: if not self.process.stdout or self.process.stdout.closed: return with self.process.stdout: for raw_line in self.process.stdout or []: log: str = raw_line.decode().strip() yield log except DagsterExecutionInterruptedError: logger.info(f"Forwarding interrupt signal to dbt command: `{self.dbt_command}`.") self.process.send_signal(signal.SIGINT) self.process.wait(timeout=self.termination_timeout_seconds) raise def _format_error_messages(self) -> str: """Format the error messages from the dbt CLI process.""" if not self._error_messages: return "" error_description = "\n".join(self._error_messages) return f"\n\nErrors parsed from dbt logs:\n{error_description}" def _raise_on_error(self) -> None: """Ensure that the dbt CLI process has completed. If the process has not successfully completed, then optionally raise an error. """ is_successful = self.is_successful() logger.info(f"Finished dbt command: `{self.dbt_command}`.") if not is_successful and self.raise_on_error: log_path = self.target_path.joinpath("dbt.log") extra_description = "" if log_path.exists(): extra_description = f", or view the dbt debug log: {log_path}" raise DagsterDbtCliRuntimeError( description=( f"The dbt CLI process with command `{self.dbt_command}` failed with exit code" f" {self.process.returncode}. Check the stdout in the Dagster compute logs for" f" the full information about the error{extra_description}." f"{self._format_error_messages()}" ), )
[docs]class DbtCliResource(ConfigurableResource): """A resource used to execute dbt CLI commands. Attributes: project_dir (str): The path to the dbt project directory. This directory should contain a `dbt_project.yml`. See https://docs.getdbt.com/reference/dbt_project.yml for more information. global_config_flags (List[str]): A list of global flags configuration to pass to the dbt CLI invocation. See https://docs.getdbt.com/reference/global-configs for a full list of configuration. profiles_dir (Optional[str]): The path to the directory containing your dbt `profiles.yml`. By default, the current working directory is used, which is the dbt project directory. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information. profile (Optional[str]): The profile from your dbt `profiles.yml` to use for execution. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information. target (Optional[str]): The target from your dbt `profiles.yml` to use for execution. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information. dbt_executable (str): The path to the dbt executable. By default, this is `dbt`. Examples: Creating a dbt resource with only a reference to ``project_dir``: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource(project_dir="/path/to/dbt/project") Creating a dbt resource with a custom ``profiles_dir``: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource( project_dir="/path/to/dbt/project", profiles_dir="/path/to/dbt/project/profiles", ) Creating a dbt resource with a custom ``profile`` and ``target``: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource( project_dir="/path/to/dbt/project", profiles_dir="/path/to/dbt/project/profiles", profile="jaffle_shop", target="dev", ) Creating a dbt resource with global configs, e.g. disabling colored logs with ``--no-use-color``: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource( project_dir="/path/to/dbt/project", global_config_flags=["--no-use-color"], ) Creating a dbt resource with custom dbt executable path: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource( project_dir="/path/to/dbt/project", dbt_executable="/path/to/dbt/executable", ) """ project_dir: str = Field( description=( "The path to your dbt project directory. This directory should contain a" " `dbt_project.yml`. See https://docs.getdbt.com/reference/dbt_project.yml for more" " information." ), ) global_config_flags: List[str] = Field( default=[], description=( "A list of global flags configuration to pass to the dbt CLI invocation. See" " https://docs.getdbt.com/reference/global-configs for a full list of configuration." ), ) profiles_dir: Optional[str] = Field( default=None, description=( "The path to the directory containing your dbt `profiles.yml`. By default, the current" " working directory is used, which is the dbt project directory." " See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for " " more information." ), ) profile: Optional[str] = Field( default=None, description=( "The profile from your dbt `profiles.yml` to use for execution. See" " https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more" " information." ), ) target: Optional[str] = Field( default=None, description=( "The target from your dbt `profiles.yml` to use for execution. See" " https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more" " information." ), ) dbt_executable: str = Field( default=DBT_EXECUTABLE, description="The path to the dbt executable.", ) @classmethod def _validate_absolute_path_exists(cls, path: Union[str, Path]) -> Path: absolute_path = Path(path).absolute() try: resolved_path = absolute_path.resolve(strict=True) except FileNotFoundError: raise ValueError(f"The absolute path of '{path}' ('{absolute_path}') does not exist") return resolved_path @classmethod def _validate_path_contains_file(cls, path: Path, file_name: str, error_message: str): if not path.joinpath(file_name).exists(): raise ValueError(error_message) @validator("project_dir", "profiles_dir", "dbt_executable", pre=True) def convert_path_to_str(cls, v: Any) -> Any: """Validate that the path is converted to a string.""" if isinstance(v, Path): resolved_path = cls._validate_absolute_path_exists(v) absolute_path = Path(v).absolute() try: resolved_path = absolute_path.resolve(strict=True) except FileNotFoundError: raise ValueError(f"The absolute path of '{v}' ('{absolute_path}') does not exist") return os.fspath(resolved_path) return v @validator("project_dir") def validate_project_dir(cls, project_dir: str) -> str: resolved_project_dir = cls._validate_absolute_path_exists(project_dir) cls._validate_path_contains_file( path=resolved_project_dir, file_name=DBT_PROJECT_YML_NAME, error_message=( f"{resolved_project_dir} does not contain a {DBT_PROJECT_YML_NAME} file. Please" " specify a valid path to a dbt project." ), ) return os.fspath(resolved_project_dir) @validator("profiles_dir") def validate_profiles_dir(cls, profiles_dir: Optional[str]) -> Optional[str]: if profiles_dir is None: return None resolved_profiles_dir = cls._validate_absolute_path_exists(profiles_dir) cls._validate_path_contains_file( path=resolved_profiles_dir, file_name=DBT_PROFILES_YML_NAME, error_message=( f"{resolved_profiles_dir} does not contain a {DBT_PROFILES_YML_NAME} file. Please" " specify a valid path to a dbt profile directory." ), ) return os.fspath(resolved_profiles_dir) @validator("dbt_executable") def validate_dbt_executable(cls, dbt_executable: str) -> str: resolved_dbt_executable = shutil.which(dbt_executable) if not resolved_dbt_executable: raise ValueError( f"The dbt executable '{dbt_executable}' does not exist. Please specify a valid" " path to a dbt executable." ) return dbt_executable @compat_model_validator(mode="before") def validate_dbt_version(cls, values: Dict[str, Any]) -> Dict[str, Any]: """Validate that the dbt version is supported.""" if version.parse(dbt_version) < version.parse("1.5.0"): raise ValueError( "To use `dagster_dbt.DbtCliResource`, you must use `dbt-core>=1.5.0`. Currently," f" you are using `dbt-core=={dbt_version}`. Please install a compatible dbt-core" " version." ) return values def _get_unique_target_path(self, *, context: Optional[OpExecutionContext]) -> Path: """Get a unique target path for the dbt CLI invocation. Args: context (Optional[OpExecutionContext]): The execution context. Returns: str: A unique target path for the dbt CLI invocation. """ unique_id = str(uuid.uuid4())[:7] path = unique_id if context: path = f"{context.op.name}-{context.run.run_id[:7]}-{unique_id}" current_target_path = _get_dbt_target_path() return current_target_path.joinpath(path)
[docs] @public def cli( self, args: List[str], *, raise_on_error: bool = True, manifest: Optional[DbtManifestParam] = None, dagster_dbt_translator: Optional[DagsterDbtTranslator] = None, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None, target_path: Optional[Path] = None, ) -> DbtCliInvocation: """Create a subprocess to execute a dbt CLI command. Args: args (List[str]): The dbt CLI command to execute. raise_on_error (bool): Whether to raise an exception if the dbt CLI command fails. manifest (Optional[Union[Mapping[str, Any], str, Path]]): The dbt manifest blob. If an execution context from within `@dbt_assets` is provided to the context argument, then the manifest provided to `@dbt_assets` will be used. dagster_dbt_translator (Optional[DagsterDbtTranslator]): The translator to link dbt nodes to Dagster assets. If an execution context from within `@dbt_assets` is provided to the context argument, then the dagster_dbt_translator provided to `@dbt_assets` will be used. context (Optional[Union[OpExecutionContext, AssetExecutionContext]]): The execution context from within `@dbt_assets`. If an AssetExecutionContext is passed, its underlying OpExecutionContext will be used. target_path (Optional[Path]): An explicit path to a target folder to use to store and retrieve dbt artifacts when running a dbt CLI command. If not provided, a unique target path will be generated. Returns: DbtCliInvocation: A invocation instance that can be used to retrieve the output of the dbt CLI command. Examples: Streaming Dagster events for dbt asset materializations and observations: .. code-block:: python from pathlib import Path from dagster import AssetExecutionContext from dagster_dbt import DbtCliResource, dbt_assets @dbt_assets(manifest=Path("target", "manifest.json")) def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["run"], context=context).stream() Retrieving a dbt artifact after streaming the Dagster events: .. code-block:: python from pathlib import Path from dagster import AssetExecutionContext from dagster_dbt import DbtCliResource, dbt_assets @dbt_assets(manifest=Path("target", "manifest.json")) def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): dbt_run_invocation = dbt.cli(["run"], context=context) yield from dbt_run_invocation.stream() # Retrieve the `run_results.json` dbt artifact as a dictionary: run_results_json = dbt_run_invocation.get_artifact("run_results.json") # Retrieve the `run_results.json` dbt artifact as a file path: run_results_path = dbt_run_invocation.target_path.joinpath("run_results.json") Customizing the asset materialization metadata when streaming the Dagster events: .. code-block:: python from pathlib import Path from dagster import AssetExecutionContext from dagster_dbt import DbtCliResource, dbt_assets @dbt_assets(manifest=Path("target", "manifest.json")) def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): dbt_cli_invocation = dbt.cli(["run"], context=context) for dagster_event in dbt_cli_invocation.stream(): if isinstance(dagster_event, Output): context.add_output_metadata( metadata={ "my_custom_metadata": "my_custom_metadata_value", }, output_name=dagster_event.output_name, ) yield dagster_event Suppressing exceptions from a dbt CLI command when a non-zero exit code is returned: .. code-block:: python from pathlib import Path from dagster import AssetExecutionContext from dagster_dbt import DbtCliResource, dbt_assets @dbt_assets(manifest=Path("target", "manifest.json")) def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): dbt_run_invocation = dbt.cli(["run"], context=context, raise_on_error=False) if dbt_run_invocation.is_successful(): yield from dbt_run_invocation.stream() else: ... Invoking a dbt CLI command in a custom asset or op: .. code-block:: python import json from dagster import asset, op from dagster_dbt import DbtCliResource @asset def my_dbt_asset(dbt: DbtCliResource): dbt_macro_args = {"key": "value"} dbt.cli(["run-operation", "my-macro", json.dumps(dbt_macro_args)]).wait() @op def my_dbt_op(dbt: DbtCliResource): dbt_macro_args = {"key": "value"} dbt.cli(["run-operation", "my-macro", json.dumps(dbt_macro_args)]).wait() """ dagster_dbt_translator = validate_opt_translator(dagster_dbt_translator) assets_def: Optional[AssetsDefinition] = None with suppress(DagsterInvalidPropertyError): assets_def = context.assets_def if context else None context = ( context.op_execution_context if isinstance(context, AssetExecutionContext) else context ) target_path = target_path or self._get_unique_target_path(context=context) env = { **os.environ.copy(), # Run dbt with unbuffered output. "PYTHONUNBUFFERED": "1", # Disable anonymous usage statistics for performance. "DBT_SEND_ANONYMOUS_USAGE_STATS": "false", # The DBT_LOG_FORMAT environment variable must be set to `json`. We use this # environment variable to ensure that the dbt CLI outputs structured logs. "DBT_LOG_FORMAT": "json", # The DBT_DEBUG environment variable must be set to `true`. We use this # environment variable to ensure that the dbt CLI logs have enriched metadata. "DBT_DEBUG": "true", # The DBT_TARGET_PATH environment variable is set to a unique value for each dbt # invocation so that artifact paths are separated. # See https://discourse.getdbt.com/t/multiple-run-results-json-and-manifest-json-files/7555 # for more information. "DBT_TARGET_PATH": os.fspath(target_path), # The DBT_LOG_PATH environment variable is set to the same value as DBT_TARGET_PATH # so that logs for each dbt invocation has separate log files. "DBT_LOG_PATH": os.fspath(target_path), # The DBT_PROFILES_DIR environment variable is set to the path containing the dbt # profiles.yml file. # See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles#advanced-customizing-a-profile-directory # for more information. **({"DBT_PROFILES_DIR": self.profiles_dir} if self.profiles_dir else {}), } # Although we always set the dbt log level to debug, we only write those logs to stdout if # the user has explicitly set the log level to debug. log_level = ( "debug" if set(["--debug", "-d"]).intersection([*args, *self.global_config_flags]) else "info" ) selection_args: List[str] = [] dagster_dbt_translator = dagster_dbt_translator or DagsterDbtTranslator() if context and assets_def is not None: manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets( [assets_def] ) # When dbt is enabled with asset checks, we turn off any indirection with dbt selection. # This way, the Dagster context completely determines what is executed in a dbt # invocation. if dagster_dbt_translator.settings.enable_asset_checks: logger.info( "Setting environment variable `DBT_INDIRECT_SELECTION` to 'empty'. When dbt " "tests are modeled as asset checks, they are executed through explicit selection." ) env["DBT_INDIRECT_SELECTION"] = "empty" selection_args = get_subset_selection_for_context( context=context, manifest=manifest, select=context.op.tags.get("dagster-dbt/select"), exclude=context.op.tags.get("dagster-dbt/exclude"), dagster_dbt_translator=dagster_dbt_translator, ) else: manifest = validate_manifest(manifest) if manifest else {} # TODO: verify that args does not have any selection flags if the context and manifest # are passed to this function. profile_args: List[str] = [] if self.profile: profile_args = ["--profile", self.profile] if self.target: profile_args += ["--target", self.target] args = [ self.dbt_executable, *self.global_config_flags, *args, *profile_args, *selection_args, ] project_dir = Path(self.project_dir) if not target_path.is_absolute(): target_path = project_dir.joinpath(target_path) return DbtCliInvocation.run( args=args, env=env, manifest=manifest, dagster_dbt_translator=dagster_dbt_translator, project_dir=project_dir, target_path=target_path, raise_on_error=raise_on_error, log_level=log_level, context=context, )
def get_subset_selection_for_context( context: OpExecutionContext, manifest: Mapping[str, Any], select: Optional[str], exclude: Optional[str], dagster_dbt_translator: DagsterDbtTranslator, ) -> List[str]: """Generate a dbt selection string to materialize the selected resources in a subsetted execution context. See https://docs.getdbt.com/reference/node-selection/syntax#how-does-selection-work. Args: context (OpExecutionContext): The execution context for the current execution step. select (Optional[str]): A dbt selection string to select resources to materialize. exclude (Optional[str]): A dbt selection string to exclude resources from materializing. Returns: List[str]: dbt CLI arguments to materialize the selected resources in a subsetted execution context. If the current execution context is not performing a subsetted execution, return CLI arguments composed of the inputed selection and exclusion arguments. """ default_dbt_selection = [] if select: default_dbt_selection += ["--select", select] if exclude: default_dbt_selection += ["--exclude", exclude] dbt_resource_props_by_output_name = get_dbt_resource_props_by_output_name(manifest) dbt_resource_props_by_test_name = get_dbt_resource_props_by_test_name(manifest) # It's nice to use the default dbt selection arguments when not subsetting for readability. # However with asset checks, we make a tradeoff between readability and functionality for the user. # This is because we use an explicit selection of each individual dbt resource we execute to ensure # we control which models *and tests* run. By using explicit selection, we allow the user to # also subset the execution of dbt tests. if not context.is_subset and not dagster_dbt_translator.settings.enable_asset_checks: logger.info( "A dbt subsetted execution is not being performed. Using the default dbt selection" f" arguments `{default_dbt_selection}`." ) return default_dbt_selection selected_dbt_resources = [] for output_name in context.selected_output_names: dbt_resource_props = dbt_resource_props_by_output_name[output_name] # Explicitly select a dbt resource by its fully qualified name (FQN). # https://docs.getdbt.com/reference/node-selection/methods#the-file-or-fqn-method fqn_selector = f"fqn:{'.'.join(dbt_resource_props['fqn'])}" selected_dbt_resources.append(fqn_selector) for _, check_name in context.selected_asset_check_keys: test_resource_props = dbt_resource_props_by_test_name[check_name] # Explicitly select a dbt resource by its fully qualified name (FQN). # https://docs.getdbt.com/reference/node-selection/methods#the-file-or-fqn-method fqn_selector = f"fqn:{'.'.join(test_resource_props['fqn'])}" selected_dbt_resources.append(fqn_selector) # Take the union of all the selected resources. # https://docs.getdbt.com/reference/node-selection/set-operators#unions union_selected_dbt_resources = ["--select"] + [" ".join(selected_dbt_resources)] if context.is_subset: logger.info( "A dbt subsetted execution is being performed. Overriding default dbt selection" f" arguments `{default_dbt_selection}` with arguments: `{union_selected_dbt_resources}`" ) else: logger.info( "A dbt subsetted execution is not being performed. Because asset checks are enabled," f" converting the default dbt selection arguments `{default_dbt_selection}` with the " f"explicit set of models and tests: `{union_selected_dbt_resources}`" ) return union_selected_dbt_resources def get_dbt_resource_props_by_output_name( manifest: Mapping[str, Any], ) -> Mapping[str, Mapping[str, Any]]: node_info_by_dbt_unique_id = get_dbt_resource_props_by_dbt_unique_id_from_manifest(manifest) return { dagster_name_fn(node): node for node in node_info_by_dbt_unique_id.values() if node["resource_type"] in ASSET_RESOURCE_TYPES } def get_dbt_resource_props_by_test_name( manifest: Mapping[str, Any], ) -> Mapping[str, Mapping[str, Any]]: return { dbt_resource_props["name"]: dbt_resource_props for unique_id, dbt_resource_props in manifest["nodes"].items() if unique_id.startswith("test") }