forked from datahub-project/datahub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mysql_sample_dag.py
63 lines (53 loc) · 1.61 KB
/
mysql_sample_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
"""MySQL DataHub Ingest DAG
This example demonstrates how to ingest metadata from MySQL into DataHub
from within an Airflow DAG. Note that the DB connection configuration is
embedded within the code.
"""
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from datahub.ingestion.run.pipeline import Pipeline
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=120),
}
def ingest_from_mysql():
pipeline = Pipeline.create(
{
"source": {
"type": "mysql",
"config": {
"username": "user",
"password": "pass",
"database": "db_name",
"host_port": "localhost:3306",
},
},
"sink": {
"type": "datahub-rest",
"config": {"server": "http://localhost:8080"},
},
}
)
pipeline.run()
pipeline.raise_from_status()
with DAG(
"datahub_mysql_ingest",
default_args=default_args,
description="An example DAG which ingests metadata from MySQL to DataHub",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["datahub-ingest"],
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_from_mysql",
python_callable=ingest_from_mysql,
)