Skip to content

Commit

Permalink
Support s3_data_dir and s3_data_naming
Browse files Browse the repository at this point in the history
  • Loading branch information
chronitis committed Feb 16, 2022
1 parent 5991fc4 commit 89d1830
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 18 deletions.
37 changes: 23 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,19 @@ stored login info. You can configure the AWS profile name to use via `aws_profil

A dbt profile can be configured to run against AWS Athena using the following configuration:

| Option | Description | Required? | Example |
|---------------- |-------------------------------------------------------------------------------- |----------- |-------------------- |
| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` |
| region_name | AWS region of your Athena instance | Required | `eu-west-1` |
| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` |
| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` |
| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` |
| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` |
| work_group| Identifier of Athena workgroup | Optional | `my-custom-workgroup` |
| num_retries| Number of times to retry a failing query | Optional | `3` | `5`
| Option | Description | Required? | Example |
|---------------- |-------------------------------------------------------------------------------- |----------- |---------------------- |
| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` |
| region_name | AWS region of your Athena instance | Required | `eu-west-1` |
| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` |
| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` |
| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` |
| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` |
| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` |
| num_retries | Number of times to retry a failing query | Optional | `3` |
| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` |
| s3_data_naming | How to generate table paths in `s3_data_dir`: `uuid/schema_table` | Optional | `uuid` |


**Example profiles.yml entry:**
```yaml
Expand Down Expand Up @@ -78,9 +81,7 @@ _Additional information_
#### Table Configuration

* `external_location` (`default=none`)
* The location where Athena saves your table in Amazon S3
* If `none` then it will default to `{s3_staging_dir}/tables`
* If you are using a static value, when your table/partition is recreated underlying data will be cleaned up and overwritten by new data
* If set, the full S3 path in which the table will be saved.
* `partitioned_by` (`default=none`)
* An array list of columns by which the table will be partitioned
* Limited to creation of 100 partitions (_currently_)
Expand All @@ -93,7 +94,15 @@ _Additional information_
* Supports `ORC`, `PARQUET`, `AVRO`, `JSON`, or `TEXTFILE`
* `field_delimiter` (`default=none`)
* Custom field delimiter, for when format is set to `TEXTFILE`


The location in which a table is saved is determined by:

1. If `external_location` is defined, that value is used.
2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming`:
+ `s3_data_naming=uuid`: `{s3_data_dir}/{uuid4()}/`
+ `s3_data_naming=schema_table`: `{s3_data_dir}/{schema}/{table}/`
3. Otherwise, the default location for a CTAS query is used, which will depend on how your workgroup is configured.

More information: [CREATE TABLE AS][create-table-as]

[run_started_at]: https://docs.getdbt.com/reference/dbt-jinja-functions/run_started_at
Expand Down
2 changes: 2 additions & 0 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class AthenaCredentials(Credentials):
poll_interval: float = 1.0
_ALIASES = {"catalog": "database"}
num_retries: Optional[int] = 5
s3_data_dir: Optional[str] = None
s3_data_naming: Optional[str] = "uuid"

@property
def type(self) -> str:
Expand Down
59 changes: 56 additions & 3 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,64 @@ def convert_datetime_type(
return "timestamp"

@available
def s3_uuid_table_location(self):
def s3_table_prefix(self) -> str:
"""
Returns the root location for storing tables in S3.
This is `s3_data_dir`, if set, and `s3_staging_dir/tables/` if not.
We generate a value here even if `s3_data_dir` is not set,
since creating a seed table requires a non-default location.
"""
conn = self.connections.get_thread_connection()
client = conn.handle
creds = conn.credentials
if creds.s3_data_dir is not None:
return creds.s3_data_dir
else:
return f"{creds.s3_staging_dir}tables/"

@available
def s3_uuid_table_location(self) -> str:
"""
Returns a random location for storing a table, using a UUID as
the final directory part
"""
return f"{self.s3_table_prefix()}{str(uuid4())}/"


@available
def s3_schema_table_location(self, schema_name: str, table_name: str) -> str:
"""
Returns a fixed location for storing a table determined by the
(athena) schema and table name
"""
return f"{self.s3_table_prefix()}{schema_name}/{table_name}/"

@available
def s3_table_location(self, schema_name: str, table_name: str) -> str:
"""
Returns either a UUID or database/table prefix for storing a table,
depending on the value of s3_table
"""
conn = self.connections.get_thread_connection()
creds = conn.credentials
if creds.s3_data_naming == "schema_table":
return self.s3_schema_table_location(schema_name, table_name)
elif creds.s3_data_naming == "uuid":
return self.s3_uuid_table_location()
else:
raise ValueError(f"Unknown value for s3_data_naming: {creds.s3_data_naming}")

@available
def has_s3_data_dir(self) -> bool:
"""
Returns true if the user has specified `s3_data_dir`, and
we should set `external_location
"""
conn = self.connections.get_thread_connection()
creds = conn.credentials
return creds.s3_data_dir is not None

return f"{client.s3_staging_dir}tables/{str(uuid4())}/"

@available
def clean_up_partitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
with (
{%- if external_location is not none and not temporary %}
external_location='{{ external_location }}',
{%- elif adapter.has_s3_data_dir() -%}
external_location='{{ adapter.s3_table_location(relation.schema, relation.identifier) }}',
{%- endif %}
{%- if partitioned_by is not none %}
partitioned_by=ARRAY{{ partitioned_by | tojson | replace('\"', '\'') }},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
{%- endfor -%}
)
stored as parquet
location '{{ adapter.s3_uuid_table_location() }}'
location '{{ adapter.s3_table_location(model["schema"], model["alias"]) }}'
tblproperties ('classification'='parquet')
{% endset %}

Expand Down

0 comments on commit 89d1830

Please sign in to comment.