Skip to content

Commit

Permalink
Support s3_data_dir and s3_data_naming (#39)
Browse files Browse the repository at this point in the history
* Support s3_data_dir and s3_data_naming
  • Loading branch information
nicor88 authored Nov 21, 2022
1 parent d5bed36 commit eb49b61
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 36 deletions.
56 changes: 38 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,19 @@ stored login info. You can configure the AWS profile name to use via `aws_profil

A dbt profile can be configured to run against AWS Athena using the following configuration:

| Option | Description | Required? | Example |
|---------------- |-------------------------------------------------------------------------------- |----------- |-------------------- |
| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` |
| region_name | AWS region of your Athena instance | Required | `eu-west-1` |
| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` |
| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` |
| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` |
| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` |
| work_group| Identifier of Athena workgroup | Optional | `my-custom-workgroup` |
| num_retries| Number of times to retry a failing query | Optional | `3` | `5`
| Option | Description | Required? | Example |
|---------------- |--------------------------------------------------------------------------------|----------- |-----------------------|
| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` |
| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` |
| s3_data_naming | How to generate table paths in `s3_data_dir` | Optional | `schema_table_unique` |
| region_name | AWS region of your Athena instance | Required | `eu-west-1` |
| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` |
| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` |
| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` |
| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` |
| work_group| Identifier of Athena workgroup | Optional | `my-custom-workgroup` |
| num_retries| Number of times to retry a failing query | Optional | `3` | `5`


**Example profiles.yml entry:**
```yaml
Expand All @@ -66,6 +69,8 @@ athena:
dev:
type: athena
s3_staging_dir: s3://athena-query-results/dbt/
s3_data_dir: s3://your_s3_bucket/dbt/
s3_data_naming: schema_table
region_name: eu-west-1
schema: dbt
database: awsdatacatalog
Expand All @@ -84,9 +89,7 @@ _Additional information_
#### Table Configuration

* `external_location` (`default=none`)
* The location where Athena saves your table in Amazon S3
* If `none` then it will default to `{s3_staging_dir}/tables`
* If you are using a static value, when your table/partition is recreated underlying data will be cleaned up and overwritten by new data
* If set, the full S3 path in which the table will be saved.
* `partitioned_by` (`default=none`)
* An array list of columns by which the table will be partitioned
* Limited to creation of 100 partitions (_currently_)
Expand All @@ -104,11 +107,23 @@ _Additional information_
* `table_properties`: table properties to add to the table, valid for Iceberg only
* `strict_location` (`default=True`): when working with iceberg it's possible to rename tables, in order to do so, tables need to avoid to have same location. Setting up `strict_location` to *false* allow a table creation on an unique location

More information: [CREATE TABLE AS][create-table-as]
#### Table location
The location in which a table is saved is determined by:

1. If `external_location` is defined, that value is used.
2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming`
3. If `s3_data_dir` is not defined data is stored under `s3_staging_dir/tables/`

Here all the options available for `s3_data_naming`:
* `uuid`: `{s3_data_dir}/{uuid4()}/`
* `table_table`: `{s3_data_dir}/{table}/`
* `table_unique`: `{s3_data_dir}/{table}/{uuid4()}/`
* `schema_table`: `{s3_data_dir}/{schema}/{table}/`
* `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()}/`

It's possible to set the `s3_data_naming` globally in the target profile, or overwrite the value in the table config,
or setting up the value for groups of model in dbt_project.yml

[run_started_at]: https://docs.getdbt.com/reference/dbt-jinja-functions/run_started_at
[invocation_id]: https://docs.getdbt.com/reference/dbt-jinja-functions/invocation_id
[create-table-as]: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html

#### Supported functionality

Expand All @@ -129,7 +144,6 @@ To get started just add this as your model:
materialized='table',
format='iceberg',
partitioned_by=['bucket(5, user_id)'],
strict_location=false,
table_properties={
'optimize_rewrite_delete_file_threshold': '2'
}
Expand Down Expand Up @@ -193,6 +207,12 @@ You can run the tests using `make`:
make run_tests
```

### Helpful Resources

* [Athena CREATE TABLE AS](https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html)
* [dbt run_started_at](https://docs.getdbt.com/reference/dbt-jinja-functions/run_started_at)
* [dbt invocation_id](https://docs.getdbt.com/reference/dbt-jinja-functions/invocation_id)

### Community

* [fishtown-analytics/dbt][fishtown-analytics/dbt]
Expand Down
4 changes: 4 additions & 0 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class AthenaCredentials(Credentials):
poll_interval: float = 1.0
_ALIASES = {"catalog": "database"}
num_retries: Optional[int] = 5
s3_data_dir: Optional[str] = None
s3_data_naming: Optional[str] = "schema_table_unique"

@property
def type(self) -> str:
Expand All @@ -64,6 +66,8 @@ def _connection_keys(self) -> Tuple[str, ...]:
"poll_interval",
"aws_profile_name",
"endpoing_url",
"s3_data_dir",
"s3_data_naming",
)


Expand Down
45 changes: 30 additions & 15 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,41 @@ def convert_datetime_type(cls, agate_table: agate.Table, col_idx: int) -> str:
return "timestamp"

@available
def s3_uuid_table_location(self):
def s3_table_prefix(self, s3_data_dir: str) -> str:
"""
Returns the root location for storing tables in S3.
This is `s3_data_dir`, if set, and `s3_staging_dir/tables/` if not.
We generate a value here even if `s3_data_dir` is not set,
since creating a seed table requires a non-default location.
"""
conn = self.connections.get_thread_connection()
client = conn.handle
return f"{client.s3_staging_dir}tables/{str(uuid4())}/"
creds = conn.credentials
if s3_data_dir is not None:
return s3_data_dir
else:
return path.join(creds.s3_staging_dir, "tables")

@available
def s3_unique_location(self, external_location, strict_location, staging_dir, relation_name):
def s3_table_location(self, s3_data_dir: str, s3_data_naming: str, schema_name: str, table_name: str) -> str:
"""
Generate a unique not overlapping location.
Returns either a UUID or database/table prefix for storing a table,
depending on the value of s3_table
"""
unique_id = str(uuid4())
if external_location is not None:
if not strict_location:
if external_location.endswith("/"):
external_location = external_location[:-1]
external_location = f"{external_location}_{unique_id}/"
else:
base_path = path.join(staging_dir, f"{relation_name}_{unique_id}")
external_location = f"{base_path}/"
return external_location
mapping = {
"uuid": path.join(self.s3_table_prefix(s3_data_dir), str(uuid4())) + "/",
"table": path.join(self.s3_table_prefix(s3_data_dir), table_name) + "/",
"table_unique": path.join(self.s3_table_prefix(s3_data_dir), table_name, str(uuid4())) + "/",
"schema_table": path.join(self.s3_table_prefix(s3_data_dir), schema_name, table_name) + "/",
"schema_table_unique": path.join(self.s3_table_prefix(s3_data_dir), schema_name, table_name, str(uuid4()))
+ "/",
}

table_location = mapping.get(s3_data_naming)

if table_location is None:
raise ValueError(f"Unknown value for s3_data_naming: {s3_data_naming}")

return table_location

@available
def clean_up_partitions(self, database_name: str, table_name: str, where_condition: str):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,23 @@

{% macro create_iceberg_table_definition(relation, dest_columns) -%}
{%- set external_location = config.get('external_location', default=none) -%}
{%- set strict_location = config.get('strict_location', default=true) -%}
{%- set partitioned_by = config.get('partitioned_by', default=none) -%}
{%- set table_properties = config.get('table_properties', default={}) -%}
{%- set _ = table_properties.update({'table_type': 'ICEBERG'}) -%}
{%- set table_properties_formatted = [] -%}
{%- set dest_columns_with_type = [] -%}
{%- set s3_data_dir = config.get('s3_data_dir', default=target.s3_data_dir) -%}
{%- set s3_data_naming = config.get('s3_data_naming', default=target.s3_data_naming) -%}

{%- for k in table_properties -%}
{% set _ = table_properties_formatted.append("'" + k + "'='" + table_properties[k] + "'") -%}
{%- endfor -%}

{%- set table_properties_csv= table_properties_formatted | join(', ') -%}

{%- set external_location = adapter.s3_unique_location(external_location, strict_location, target.s3_staging_dir, relation.name) -%}
{%- if external_location is none %}
{%- set external_location = adapter.s3_table_location(s3_data_dir, s3_data_naming, relation.schema, relation.identifier) -%}
{%- endif %}

{%- for col in dest_columns -%}
{% set dtype = iceberg_data_type(col.dtype) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
{%- set field_delimiter = config.get('field_delimiter', default=none) -%}
{%- set format = config.get('format', default='parquet') -%}
{%- set write_compression = config.get('write_compression', default=none) -%}
{%- set s3_data_dir = config.get('s3_data_dir', default=target.s3_data_dir) -%}
{%- set s3_data_naming = config.get('s3_data_naming', default=target.s3_data_naming) -%}

create table
{{ relation }}

with (
{%- if external_location is not none and not temporary %}
external_location='{{ external_location }}',
{%- else -%}
external_location='{{ adapter.s3_table_location(s3_data_dir, s3_data_naming, relation.schema, relation.identifier) }}',
{%- endif %}
{%- if partitioned_by is not none %}
partitioned_by=ARRAY{{ partitioned_by | tojson | replace('\"', '\'') }},
Expand Down
4 changes: 3 additions & 1 deletion dbt/include/athena/macros/materializations/seeds/helpers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
{% macro athena__create_csv_table(model, agate_table) %}
{%- set column_override = model['config'].get('column_types', {}) -%}
{%- set quote_seed_column = model['config'].get('quote_columns', None) -%}
{%- set s3_data_dir = config.get('s3_data_dir', default=target.s3_data_dir) -%}
{%- set s3_data_naming = model['config'].get('s3_data_naming', target.s3_data_naming) -%}

{% set sql %}
create external table {{ this.render() }} (
Expand All @@ -21,7 +23,7 @@
{%- endfor -%}
)
stored as parquet
location '{{ adapter.s3_uuid_table_location() }}'
location '{{ adapter.s3_table_location(s3_data_dir, s3_data_naming, model["schema"], model["alias"]) }}'
tblproperties ('classification'='parquet')
{% endset %}

Expand Down

0 comments on commit eb49b61

Please sign in to comment.