Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Passing 'sub_type' to lineage nodes. #1361

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions elementary/monitor/api/lineage/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

from elementary.clients.api.api_client import APIClient
from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner
from elementary.monitor.api.lineage.schema import LineageNodeSchema, LineageSchema
from elementary.monitor.api.lineage.schema import (
LineageNodeSchema,
LineageSchema,
NodeDependsOnNodesSchema,
)
from elementary.monitor.fetchers.lineage.lineage import LineageFetcher
from elementary.monitor.fetchers.lineage.schema import NodeDependsOnNodesSchema


class LineageAPI(APIClient):
Expand Down Expand Up @@ -38,6 +41,10 @@ def _convert_depends_on_node_to_lineage_node(
nodes_depends_on_nodes: List[NodeDependsOnNodesSchema],
) -> List[LineageNodeSchema]:
return [
LineageNodeSchema(type=node.type, id=node.unique_id)
LineageNodeSchema(
id=node.unique_id,
type=node.type,
sub_type=node.sub_type,
)
for node in nodes_depends_on_nodes
]
29 changes: 29 additions & 0 deletions elementary/monitor/api/lineage/schema.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from typing import List, Optional, Tuple

import networkx as nx
Expand All @@ -6,11 +7,16 @@

NodeUniqueIdType = str
NodeType = Literal["model", "source", "exposure"]
NodeSubType = Literal["table", "view"]


_SEED_PATH_PATTERN = re.compile(r"^seed\.")


class LineageNodeSchema(BaseModel):
id: NodeUniqueIdType
type: NodeType
sub_type: Optional[NodeSubType] = None


class LineageSchema(BaseModel):
Expand All @@ -34,3 +40,26 @@ def to_directed_graph(self) -> nx.DiGraph:
graph = nx.DiGraph()
graph.add_edges_from(self.edges)
return graph


class NodeDependsOnNodesSchema(BaseModel):
unique_id: NodeUniqueIdType
depends_on_nodes: Optional[List[NodeUniqueIdType]] = None
type: NodeType
sub_type: Optional[NodeSubType] = None

@validator("depends_on_nodes", pre=True, always=True)
def set_depends_on_nodes(cls, depends_on_nodes):
formatted_depends_on = depends_on_nodes or []
formatted_depends_on = [
cls._format_node_id(node_id) for node_id in formatted_depends_on
]
return [node_id for node_id in formatted_depends_on if node_id]

@classmethod
def _format_node_id(cls, node_id: str):
# Currently we don't save seeds in our artifacts.
# We remove seeds from the lineage graph (as long as we don't support them).
if re.search(_SEED_PATH_PATTERN, node_id):
return None
return node_id
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,33 @@
{% if not elementary.relation_exists(exposures_relation) %}
{% set exposures_relation = ref('elementary', 'dbt_exposures') %}
{% endif %}
{% set models_depends_on_nodes_query %}
with dbt_models as (
select unique_id, depends_on_nodes from {{ ref('elementary', 'dbt_models') }}
{% if exclude_elementary %}
where package_name != 'elementary'
{% endif %}
),

dbt_sources as (
select unique_id from {{ ref('elementary', 'dbt_sources') }}
),

dbt_exposures as (
select unique_id, depends_on_nodes from {{ exposures_relation }}
)

{% set models_depends_on_nodes_query %}
select
unique_id,
depends_on_nodes,
materialization,
'model' as type
from dbt_models
from {{ ref('elementary', 'dbt_models') }}
{% if exclude_elementary %}
where package_name != 'elementary'
{% endif %}
union all
select
unique_id,
null as depends_on_nodes,
null as materialization,
'source' as type
from dbt_sources
from {{ ref('elementary', 'dbt_sources') }}
union all
select
unique_id,
depends_on_nodes,
null as materialization,
'exposure' as type
from dbt_exposures
from {{ exposures_relation }}
{% endset %}

{% set models_depends_on_agate = run_query(models_depends_on_nodes_query) %}
{% do return(elementary.agate_to_dicts(models_depends_on_agate)) %}
{% endmacro %}
15 changes: 14 additions & 1 deletion elementary/monitor/fetchers/lineage/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
from typing import List

from elementary.clients.fetcher.fetcher import FetcherClient
from elementary.monitor.fetchers.lineage.schema import NodeDependsOnNodesSchema
from elementary.monitor.api.lineage.schema import NodeDependsOnNodesSchema

_MATERIALIZATION_TO_SUB_TYPE = {
"view": "view",
"table": "table",
"incremental": "table",
}


class LineageFetcher(FetcherClient):
Expand All @@ -27,6 +33,13 @@ def get_nodes_depends_on_nodes(
if node_depends_on_nodes_result.get("depends_on_nodes")
else None,
type=node_depends_on_nodes_result.get("type"),
sub_type=self.get_node_sub_type(node_depends_on_nodes_result),
)
)
return nodes_depends_on_nodes

@staticmethod
def get_node_sub_type(node_depends_on_nodes_result: dict):
materialization = node_depends_on_nodes_result.get("materialization")
if materialization:
return _MATERIALIZATION_TO_SUB_TYPE.get(materialization)
32 changes: 0 additions & 32 deletions elementary/monitor/fetchers/lineage/schema.py

This file was deleted.