from dataclasses import dataclass
from typing import Any, Mapping, Optional
from dagster import AssetKey, AutoMaterializePolicy, FreshnessPolicy
from dagster._annotations import public
from dagster._core.definitions.events import (
CoercibleToAssetKeyPrefix,
check_opt_coercible_to_asset_key_prefix_param,
)
from .asset_utils import (
default_asset_key_fn,
default_auto_materialize_policy_fn,
default_description_fn,
default_freshness_policy_fn,
default_group_from_dbt_resource_props,
default_metadata_from_dbt_resource_props,
)
[docs]class DagsterDbtTranslator:
"""Holds a set of methods that derive Dagster asset definition metadata given a representation
of a dbt resource (models, tests, sources, etc).
This class is exposed so that methods can be overriden to customize how Dagster asset metadata
is derived.
"""
[docs] @classmethod
@public
def get_asset_key(cls, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
"""A function that takes a dictionary representing properties of a dbt resource, and
returns the Dagster asset key that represents that resource.
Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents
a model, seed, snapshot or source in a given dbt project. You can learn more about dbt
resources and the properties available in this dictionary here:
https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom asset key for a dbt resource.
Args:
dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource.
Returns:
AssetKey: The Dagster asset key for the dbt resource.
Examples:
Adding a prefix to the default asset key generated for each dbt resource:
.. code-block:: python
from typing import Any, Mapping
from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_asset_key(cls, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
return super().get_asset_key(dbt_resource_props).with_prefix("prefix")
Adding a prefix to the default asset key generated for each dbt resource, but only for dbt sources:
.. code-block:: python
from typing import Any, Mapping
from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_asset_key(cls, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
asset_key = super().get_asset_key(dbt_resource_props)
if dbt_resource_props["resource_type"] == "source":
asset_key = asset_key.with_prefix("my_prefix")
return asset_key
"""
return default_asset_key_fn(dbt_resource_props)
[docs] @classmethod
@public
def get_description(cls, dbt_resource_props: Mapping[str, Any]) -> str:
"""A function that takes a dictionary representing properties of a dbt resource, and
returns the Dagster description for that resource.
Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents
a model, seed, snapshot or source in a given dbt project. You can learn more about dbt
resources and the properties available in this dictionary here:
https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom description for a dbt resource.
Args:
dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource.
Returns:
str: The description for the dbt resource.
Examples:
.. code-block:: python
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_description(cls, dbt_resource_props: Mapping[str, Any]) -> str:
return "custom description"
"""
return default_description_fn(dbt_resource_props)
[docs] @classmethod
@public
def get_group_name(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
"""A function that takes a dictionary representing properties of a dbt resource, and
returns the Dagster group name for that resource.
Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents
a model, seed, snapshot or source in a given dbt project. You can learn more about dbt
resources and the properties available in this dictionary here:
https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom group name for a dbt resource.
Args:
dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource.
Returns:
Optional[str]: A Dagster group name.
Examples:
.. code-block:: python
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_group_name(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
return "custom_group_prefix" + dbt_resource_props.get("config", {}).get("group")
"""
return default_group_from_dbt_resource_props(dbt_resource_props)
[docs] @classmethod
@public
def get_freshness_policy(
cls, dbt_resource_props: Mapping[str, Any]
) -> Optional[FreshnessPolicy]:
"""A function that takes a dictionary representing properties of a dbt resource, and
returns the Dagster :py:class:`dagster.FreshnessPolicy` for that resource.
Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents
a model, seed, snapshot or source in a given dbt project. You can learn more about dbt
resources and the properties available in this dictionary here:
https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom freshness policy for a dbt resource.
Args:
dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource.
Returns:
Optional[FreshnessPolicy]: A Dagster freshness policy.
Examples:
Set a custom freshness policy for all dbt resources:
.. code-block:: python
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_freshness_policy(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[FreshnessPolicy]:
return FreshnessPolicy(maximum_lag_minutes=60)
Set a custom freshness policy for dbt resources with a specific tag:
.. code-block:: python
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_freshness_policy(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[FreshnessPolicy]:
freshness_policy = None
if "my_custom_tag" in dbt_resource_props.get("tags", []):
freshness_policy = FreshnessPolicy(maximum_lag_minutes=60)
return freshness_policy
"""
return default_freshness_policy_fn(dbt_resource_props)
[docs] @classmethod
@public
def get_auto_materialize_policy(
cls, dbt_resource_props: Mapping[str, Any]
) -> Optional[AutoMaterializePolicy]:
"""A function that takes a dictionary representing properties of a dbt resource, and
returns the Dagster :py:class:`dagster.AutoMaterializePolicy` for that resource.
Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents
a model, seed, snapshot or source in a given dbt project. You can learn more about dbt
resources and the properties available in this dictionary here:
https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom auto-materialize policy for a dbt resource.
Args:
dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource.
Returns:
Optional[AutoMaterializePolicy]: A Dagster auto-materialize policy.
Examples:
Set a custom auto-materialize policy for all dbt resources:
.. code-block:: python
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_auto_materialize_policy(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[AutoMaterializePolicy]:
return AutoMaterializePolicy.eager()
Set a custom auto-materialize policy for dbt resources with a specific tag:
.. code-block:: python
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_auto_materialize_policy(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[AutoMaterializePolicy]:
auto_materialize_policy = None
if "my_custom_tag" in dbt_resource_props.get("tags", []):
auto_materialize_policy = AutoMaterializePolicy.eager()
return auto_materialize_policy
"""
return default_auto_materialize_policy_fn(dbt_resource_props)
class KeyPrefixDagsterDbtTranslator(DagsterDbtTranslator):
"""A DagsterDbtTranslator that applies prefixes to the asset keys generated from dbt resources.
Attributes:
asset_key_prefix (Optional[Union[str, Sequence[str]]]): A prefix to apply to all dbt models,
seeds, snapshots, etc. This will *not* apply to dbt sources.
source_asset_key_prefix (Optional[Union[str, Sequence[str]]]): A prefix to apply to all dbt
sources.
"""
def __init__(
self,
asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
source_asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
):
self._asset_key_prefix = (
check_opt_coercible_to_asset_key_prefix_param(asset_key_prefix, "asset_key_prefix")
or []
)
self._source_asset_key_prefix = (
check_opt_coercible_to_asset_key_prefix_param(
source_asset_key_prefix, "source_asset_key_prefix"
)
or []
)
@public
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
base_key = default_asset_key_fn(dbt_resource_props)
if dbt_resource_props["resource_type"] == "source":
return base_key.with_prefix(self._source_asset_key_prefix)
else:
return base_key.with_prefix(self._asset_key_prefix)
@dataclass
class DbtManifestWrapper:
manifest: Mapping[str, Any]