Skip to content

Commit

Permalink
Example Dynamic Map Task with Astro-SDK (#575)
Browse files Browse the repository at this point in the history
Description

Add Example DAG for Dynamic Map Task with Astro-SDK.
DAG does

    load data from s3 to bigquery table
    get_campaigns task execute SQL query and get a Table object
    expanding summarize_campaign task so if the bigquery table has n row then n copy of summarize_campaign will be created
    clean table

closes: #377
  • Loading branch information
pankajastro committed Jul 27, 2022
1 parent 0b9afa3 commit 35eab19
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
57 changes: 57 additions & 0 deletions example_dags/example_bigquery_dynamic_map_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
This Example DAG:
- Pull CSV from S3 and load in bigquery table
- Run select query on bigquery table
- Expand on the returned rows i.e if bigquery table contain n rows then
n copy of ``summarize_campaign`` task will be created dynamically
using dynamic task mapping
"""
import os
from datetime import datetime

from airflow import DAG
from airflow.decorators import task

from astro import sql as aql
from astro.files import File
from astro.sql import Table
from astro.sql.table import Metadata

ASTRO_BIGQUERY_DATASET = os.getenv("ASTRO_BIGQUERY_DATASET", "dag_authoring")
ASTRO_GCP_CONN_ID = os.getenv("ASTRO_GCP_CONN_ID", "google_cloud_default")
ASTRO_S3_BUCKET = os.getenv("S3_BUCKET", "s3://tmp9")


@task
def summarize_campaign(capaign_id: str):
print(capaign_id)


def handle_result(result):
return result.fetchall()


with DAG(
dag_id="example_dynamic_map_task",
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
) as dag:

@aql.run_raw_sql(handler=handle_result)
def get_campaigns(table: Table):
return """select id from {{table}}"""

bq_table = aql.load_file(
input_file=File(path=f"{ASTRO_S3_BUCKET}/ads.csv"),
output_table=Table(
metadata=Metadata(
schema=ASTRO_BIGQUERY_DATASET,
),
conn_id=ASTRO_GCP_CONN_ID,
),
use_native_support=False,
)

summarize_campaign.expand(capaign_id=get_campaigns(bq_table))
aql.cleanup()
1 change: 1 addition & 0 deletions tests/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def session():
"example_google_bigquery_gcs_load_and_save",
"example_snowflake_partial_table_with_append",
"example_sqlite_load_transform",
"example_dynamic_map_task",
],
)
def test_example_dag(session, dag_id):
Expand Down

0 comments on commit 35eab19

Please sign in to comment.