Data lineage helps you keep track of the origin of data, the transformations done on it over time and its impact in an organization. Airflow has built-in support to send lineage metadata to Apache Atlas. This plugin leverages that and enables you to create lineage metadata for operation on Snowflake entities. This lineage can then be viewed on Atlan
You need to have the following setup before you can start using this:
pip install git+https://github.com/atlanhq/atlan-lineage-airflow
To send lineage to Atlan, change the backend
value in airflow.cfg like so:
[lineage]
backend = atlan.lineage.backend.Atlan
Generate an access token on Atlan and add the following to airflow.cfg
[atlan]
url = domain.atlan.com # no quotes
token = my-secret-token # no quotes
The value of url
should be the URL of your Atlan instance.
- At the top of dag file, import the modules for entity you want create lineage for
from atlan.lineage.assets import SnowflakeTable
- For every task, configure Airflow operator’s inlets and outlets parameters
# Sample task
# Creates Snowflake table `my_new_table` from Snowflake table `my_table`
# Inlet for task - Snowflake table 'my_table'
# Outlet for task - file 'my_new_table'
# Let snowflake account name: mi04151.ap-south-1
# Let snowflake database name: biw
# Let snowflake schema name: public
sample_task = SnowflakeOperator(
task_id = "sample_task",
## query snowflake
sql = "CREATE TABLE MY_NEW_TABLE AS SELECT * FROM MY_TABLE",
## snowflake connection id as configured in Airflow connections
snowflake_conn_id = "snowflake_common",
## define inlets
inlets: {
"datasets": [SnowflakeTable(table_alias = "mi04151.ap-south-1/biw/public/my_table",
name = "my_table")]
},
## define outlets
outlets: {
"datasets": [SnowflakeTable(table_alias = "mi04151.ap-south-1/biw/public/my_new_table",
name = "my_new_table")]
}
)
SnowflakeTable takes 2 arguments - table_alias
and name
:
table_alias
: This is a string representing the location of a Snowflake table i.e. the account, database, schema a table is present in. It follows formatsnowflake-account/snowflake-database/snowflake-schema/snowflake-table
, wheresnowflake-account
is the name of your snowflake account, etc. Read more about Snowflake account naming (here)[https://docs.snowflake.com/en/user-guide/python-connector-api.html#usage-notes-for-the-account-parameter-for-the-connect-method].name
: This is the name of the table in question
Let's call this lineage-object
Inlets
and Outlets
are parameters of Airflow operators that allow us to specify task input and output. They follow the following format:
{
"key": [
lineage-object_1,
lineage-object_2,
.
.
.
]
}
Only keys datasets
, task_ids
, auto
are accepted. If the key is auto
, then value should be True
and not a list
This plugin supports the Airflow API to create inlets and outlets. So inlets can be defined in the following ways:
- by a list of dataset {"datasets": [dataset1, dataset2]}
- can be configured to look for outlets from upstream tasks {"task_ids": ["task_id1", "task_id2"]}
- can be configured to pick up outlets from direct upstream tasks {"auto": True}
- a combination of them
## task definition
sample_task:
operator: airflow.contrib.operators.snowflake_operator.SnowflakeOperator
sql: CREATE TABLE MY_NEW_TABLE AS SELECT * FROM MY_TABLE
snowflake_conn_id: snowflake_common
inlets: '{"datasets":[SnowflakeTable(table_alias="mi04151.ap-south-1/biw/public/my_table", name = "my_table")]}'
outlets: '{"datasets":[SnowflakeTable(table_alias="mi04151.ap-south-1/biw/public/my_new_table", name = "my_new_table")]}'
The inlets and outlets are defined same as above, just the dictionary is enclosed in quotes.
Note:- We used dag-factory to create sample YAML dags. We made some changes to enable support for inlets
& outlets
parameters.
Let's take a look at an example dag and see what the result looks like on Atlas:
## DAG name: customer_distribution_apac
import logging
import airflow
from airflow import DAG
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
from atlan.lineage.assets import SnowflakeTable
args = {"owner": "Atlan", "start_date": airflow.utils.dates.days_ago(2)}
dag = DAG(dag_id="customer_distribution_apac", default_args=args, schedule_interval=None)
with dag:
## task definition
customer_nation_join = SnowflakeOperator(
task_id = "customer_nation_join",
sql = """CREATE TABLE biw.private.customer_enriched AS
SELECT c.c_custkey,
c.c_acctbal,
c.c_mktsegment,
n.n_nationkey,
n.n_name
FROM biw.raw.customer c
INNER JOIN biw.raw.nation n ON c.c_nationkey = n.n_nationkey""",
snowflake_conn_id = "snowflake_common",
inlets: {
"datasets": [SnowflakeTable(table_alias = "mi04151.ap-south-1/biw/raw/customer",
name = "customer"),
SnowflakeTable(table_alias = "mi04151.ap-south-1/biw/raw/nation",
name = "nation")]
},
outlets: {
"datasets": [SnowflakeTable(table_alias = "mi04151.ap-south-1/biw/private/customer_enriched",
name = "customer_enriched")]
}
)
## task definition
filter_apac = SnowflakeOperator(
task_id = "filter_apac",
sql = """CREATE TABLE biw.private.customer_apac AS
SELECT *
FROM biw.private.customer_enriched
WHERE n_name IN ('CHINA',
'INDIA',
'INDONESIA',
'VIETNAM',
'PAKISTAN',
'NEW ZEALAND',
'AUSTRALIA')""",
snowflake_conn_id = "snowflake_common",
inlets: {
"auto": True
},
outlets: {
"datasets": [SnowflakeTable(table_alias = "mi04151.ap-south-1/biw/private/customer_apac",
name = "customer_apac")]
}
)
## task definition
aggregate_apac = SnowflakeOperator(
task_id = "aggregate_apac",
sql = """CREATE TABLE biw.cubes.customer_distribution AS
SELECT count(*) AS num_customers,
n_name AS nation
FROM biw.private.customer_apac
GROUP BY n_name""",
snowflake_conn_id = "snowflake_common",
inlets: {
"auto": True
}
outlets: {
"datasets": [SnowflakeTable(table_alias = "mi04151.ap-south-1/biw/cubes/customer_distribution",
name = "customer_distribution")]
}
)
customer_nation_join >> filter_apac >> aggregate_apac
In the above dag, we have three tasks:
customer_nation_join
: This task joins two Snowflake tablesnation
andcustomer
and creates the Snowflake tablecustomer_enriched
. The input of of this task are Snowflake tablesnation
andcustomer
, and the outputcustomer_enriched
table, so this is what task inlets and outlets are configured as.filter_apac
: This task filters out the customers in tablecustomer_enriched
that lies in APAC nations and creates the tablecustomer_apac
. The input of this task is the output of the upstream task and the output iscustomer_apac
aggregate_apac
: This task counts the customers present in each APAC nation and creates tablecustomer_distribition
. The input of this table is the output of upstream task and output is tablecustomer_distribution
-
Lineage on Atlas: This is what lineage from the dag above is represented in Atlas - you can clearly see which table was produced from which task, which task has which table as input, etc.
The icons in green represent Airflow task - one can see the inputs and outputs for each task. The yellow arrows represent lineage of an entity and the red arrows represent impact of the entity.
- DAG Entity on Atlas: This is what the Airflow DAG entity looks on Atlas. You can see the tasks present in the dag, along with other meta.
- DAG Entity on Atlas: This is what the Airflow Operator entity looks on Atlas. You can see the DAG that the operator is part of, the inputs and outputs for the operator.
If you are using YAML configs to create Airflow DAGs, this is what the above dag would look like
customer_distribution_apac:
default_args:
owner: 'Atlan'
start_date: 2020-01-01
retries: 1
retry_delay_sec: 30
description: Create an aggregated table to see distribution of customers across APAC nations
schedule_interval: '0 0 * 12 0'
concurrency: 1
max_active_runs: 1
dagrun_timeout_sec: 60
default_view: 'tree'
orientation: 'LR'
tasks:
## task definition
customer_nation_join:
operator: airflow.contrib.operators.snowflake_operator.SnowflakeOperator
sql: create table biw.private.customer_enriched as select c.c_custkey, c.c_acctbal, c.c_mktsegment, n.n_nationkey, n.n_name from biw.raw.customer c inner join biw.raw.nation n on c.c_nationkey = n.n_nationkey
snowflake_conn_id: "snowflake_common"
inlets: '{"datasets":[SnowflakeTable(table_alias="mi04151.ap-south-1/biw/raw/customer", name = "customer"),SnowflakeTable(table_alias="mi04151.ap-south-1/biw/raw/nation", name = "nation")]}'
outlets: '{"datasets":[SnowflakeTable(table_alias="mi04151.ap-south-1/biw/private/customer_enriched", name = "customer_enriched")]}'
## task definition
filter_apac:
operator: airflow.contrib.operators.snowflake_operator.SnowflakeOperator
sql: create table biw.private.customer_apac as select * from biw.private.customer_enriched where n_name in ('CHINA', 'INDIA', 'INDONESIA', 'VIETNAM', 'PAKISTAN', 'NEW ZEALAND', 'AUSTRALIA')
snowflake_conn_id: "snowflake_common"
inlets: '{"auto":True}'
outlets: '{"datasets":[SnowflakeTable(table_alias="mi04151.ap-south-1/biw/private/customer_apac", name = "customer_apac")]}'
dependencies: [customer_nation_join]
## task definition
aggregate_apac:
operator: airflow.contrib.operators.snowflake_operator.SnowflakeOperator
sql: create table biw.cubes.customer_distribution as select count(*) as num_customers, n_name as nation from biw.private.customer_apac group by n_name
snowflake_conn_id: "snowflake_common"
inlets: '{"auto":True}'
outlets: '{"datasets":[SnowflakeTable(table_alias="mi04151.ap-south-1/biw/cubes/customer_distribution", name = "customer_distribution")]}'
dependencies: [filter_apac]