Skip to content

Commit

Permalink
Add cleanup task to the tutorial (#501)
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana authored Jul 5, 2022
1 parent 03dfcb5 commit 6188d4f
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 28 deletions.
92 changes: 65 additions & 27 deletions TUTORIAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,11 @@ need to be consistent with whatever names you choose throughout the remainder of

## Install Astro-SDK on your local machine

* Run the following command to install the Python SDK.
* Run the following command to install the Python SDK, with the extra packages necessary to access AWS and Snowflake.

```shell
pip install 'astro-sdk-python>=0.9'
```

You can also install dependencies for specific providers by specifying "extras".
The following command will install all the dependencies for Amazon & Snowflake:

```shell
pip install 'astro-sdk-python[amazon,snowflake]>=0.9'
```

Or, you can install those providers separately using the following command, though we recommend using "extras":

```shell
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-snowflake
pip install 'astro-sdk-python[amazon,snowflake]>=0.11'
```

* Create the following environment variables
Expand Down Expand Up @@ -226,7 +213,8 @@ dag = DAG(
)
with dag:
# Extract a file with a header from S3 into a Table object
# Extract a file with a header from S3 into a temporary Table, referenced by the
# variable `orders_data`
orders_data = aql.load_file(
# data file needs to have a header row
input_file=File(
Expand All @@ -235,16 +223,17 @@ with dag:
output_table=Table(conn_id=SNOWFLAKE_CONN_ID),
)
# create a Table object for customer data in our Snowflake database
# Create a Table object for customer data in our Snowflake database
customers_table = Table(
name=SNOWFLAKE_CUSTOMERS,
conn_id=SNOWFLAKE_CONN_ID,
)
# filter the orders data and then join with the customer table
# Filter the orders data and then join with the customer table,
# saving the output into a temporary table referenced by the Table instance `joined_data`
joined_data = join_orders_customers(filter_orders(orders_data), customers_table)
# merge the joined data into our reporting table, based on the order_id .
# Merge the joined data into our reporting table, based on the order_id .
# If there's a conflict in the customer_id or customer_name then use the ones from
# the joined data
reporting_table = aql.merge(
Expand All @@ -259,6 +248,10 @@ with dag:
)
purchase_dates = transform_dataframe(reporting_table)
# Delete temporary and unnamed tables created by `load_file` and `transform`, in this example
# both `orders_data` and `joined_data`
aql.cleanup()
```
***
# Run it!
Expand All @@ -279,16 +272,22 @@ Click on the astro_orders DAG name to see the grid view of its execution:
## Extract
To extract from S3 into a Table object, we need only specify the location on S3 of the data and a connection for Snowflake that the Python SDK can use for internal purposes:
To extract from S3 into a SQL Table, we need only specify the location on S3 of the data and a connection for Snowflake that the Python SDK can use for internal purposes:
```python
# Extract a file with a header from S3 into a Table object
# Extract a file with a header from S3 into a temporary Table, referenced by the
# variable `orders_data`
orders_data = aql.load_file(
# data file needs to have a header row
input_file=File(path=S3_FILE_PATH + "/orders_data_header.csv", conn_id=S3_CONN_ID),
output_table=Table(conn_id=SNOWFLAKE_CONN_ID),
)
```
In this example, we do not want to persist the content of `orders_data` after the DAG is completed.
When we create a `Table` object without a name, that table is considered a temporary table.
The Astro SDK will delete all temporary tables if the user adds the task `aql.cleanup` to the DAG.
## Transform
We can execute a filter and join in a single line of code to fill in a value for `joined_data`:
Expand All @@ -306,22 +305,28 @@ def join_orders_customers(filtered_orders_table: Table, customers_table: Table):
ON f.customer_id = c.customer_id"""
# create a Table object for customer data in our Snowflake database
# Create a Table object for customer data in our Snowflake database
customers_table = Table(
table=SNOWFLAKE_CUSTOMERS,
name=SNOWFLAKE_CUSTOMERS,
conn_id=SNOWFLAKE_CONN_ID,
)
# filter the orders data and then join with the customer table
# Filter the orders data and then join with the customer table,
# saving the output into a temporary table referenced by the Table instance `joined_data`
joined_data = join_orders_customers(filter_orders(orders_data), customers_table)
```
Since the table `customers_table` is named, it is not considered temporary and will not be deleted
by the end of the DAG run. However, The table `joined_data`, however, was not named in this DAG
and therefore it will be deleted by the `aql.cleanup` step.
## Merge
As our penultimate step, we call a database-agnostic merge function:
As our penultimate transformation, we call a database-agnostic merge function:
```python
# merge the joined data into our reporting table, based on the order_id .
# Merge the joined data into our reporting table, based on the order_id .
# If there's a conflict in the customer_id or customer_name then use the ones from
# the joined data
reporting_table = aql.merge(
Expand All @@ -336,9 +341,10 @@ reporting_table = aql.merge(
)
```


## Dataframe transformation

As an illustration of the `@df` decorator, we show a simple dataframe operation:
As an illustration of the `@aql.dataframe` decorator, we show a simple dataframe operation:

```python
@aql.dataframe
Expand All @@ -354,3 +360,35 @@ After all that, you'll find the meager output of this example in the logs of the

To view this log output, in the tree view of the DAG, click on the green box next to
`transform_dataframe` and then on "Log" button.


## Clean up temporary tables

Finally, if there were temporary tables (`table.temp=True` or an unnamed tables) in the DAG,
the user should consider cleaning them up by the end of the DAG run.

It is possible to achieve this using one of the following approaches:

(1) Clean all temporary tables by the end of the DAG run

```python
# Delete all temporary
aql.cleanup()
```

(2) Specify a subset of temporary tables to be deleted

The user may also opt for explicitly setting a subset of tables to be deleted,
by using one of the following

```python
aql.cleanup([orders_data, joined_data])
```

or
```python
[orders_data, joined_data] >> aql.cleanup()
```

In all scenarios, even if the user gives a non-temporary table, only temporary
tables will actually be deleted.
2 changes: 1 addition & 1 deletion docs/aep/AEP-2-table-cleanup.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Authors:

## Motivation

When running `aql.transform` or `aql.load_file` jobs, the Astro SDK implicitly creates temporary tables in the database every time the DAG runs. These extra tables are currently handled by placing all tables into a “temporary schema”. The thought behind these schemas is that a DBA can easily delete the schema if it starts to get full. This answer is insufficient so a better solution for table cleanup should be enacted.
When running `aql.transform` or `aql.load_file` jobs, unless the user creates named Table instances, the Astro SDK implicitly creates temporary tables in the database every time the DAG runs. These extra tables are currently handled by placing all tables into a “temporary schema”. The thought behind these schemas is that a DBA can easily delete the schema if it starts to get full. This answer is insufficient so a better solution for table cleanup should be enacted.

Temporary schemas are unfortunately an unideal solution to this problem. Users would need to define the same named schema across all of their databases, and there’s no guarantee that a user would even have access to more than one schema in a certain database (e.g. if you only have write access to a sandbox in your snowflake instance, you wouldn’t want to necessarily delete your sandbox.
## Proposal
Expand Down
Binary file modified docs/images/select-dag-grid-view.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 6188d4f

Please sign in to comment.