From 89d18308a72bdb636d5e9b33ff2c4680fe6c8605 Mon Sep 17 00:00:00 2001 From: Gordon Ball Date: Fri, 17 Dec 2021 12:16:58 +0100 Subject: [PATCH 1/8] Support s3_data_dir and s3_data_naming --- README.md | 37 +++++++----- dbt/adapters/athena/connections.py | 2 + dbt/adapters/athena/impl.py | 59 ++++++++++++++++++- .../models/table/create_table_as.sql | 2 + .../macros/materializations/seeds/helpers.sql | 2 +- 5 files changed, 84 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 42f32459..b043dd01 100644 --- a/README.md +++ b/README.md @@ -41,16 +41,19 @@ stored login info. You can configure the AWS profile name to use via `aws_profil A dbt profile can be configured to run against AWS Athena using the following configuration: -| Option | Description | Required? | Example | -|---------------- |-------------------------------------------------------------------------------- |----------- |-------------------- | -| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | -| region_name | AWS region of your Athena instance | Required | `eu-west-1` | -| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | -| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | -| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | -| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` | -| work_group| Identifier of Athena workgroup | Optional | `my-custom-workgroup` | -| num_retries| Number of times to retry a failing query | Optional | `3` | `5` +| Option | Description | Required? | Example | +|---------------- |-------------------------------------------------------------------------------- |----------- |---------------------- | +| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | +| region_name | AWS region of your Athena instance | Required | `eu-west-1` | +| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | +| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | +| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | +| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` | +| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` | +| num_retries | Number of times to retry a failing query | Optional | `3` | +| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` | +| s3_data_naming | How to generate table paths in `s3_data_dir`: `uuid/schema_table` | Optional | `uuid` | + **Example profiles.yml entry:** ```yaml @@ -78,9 +81,7 @@ _Additional information_ #### Table Configuration * `external_location` (`default=none`) - * The location where Athena saves your table in Amazon S3 - * If `none` then it will default to `{s3_staging_dir}/tables` - * If you are using a static value, when your table/partition is recreated underlying data will be cleaned up and overwritten by new data + * If set, the full S3 path in which the table will be saved. * `partitioned_by` (`default=none`) * An array list of columns by which the table will be partitioned * Limited to creation of 100 partitions (_currently_) @@ -93,7 +94,15 @@ _Additional information_ * Supports `ORC`, `PARQUET`, `AVRO`, `JSON`, or `TEXTFILE` * `field_delimiter` (`default=none`) * Custom field delimiter, for when format is set to `TEXTFILE` - + +The location in which a table is saved is determined by: + +1. If `external_location` is defined, that value is used. +2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming`: + + `s3_data_naming=uuid`: `{s3_data_dir}/{uuid4()}/` + + `s3_data_naming=schema_table`: `{s3_data_dir}/{schema}/{table}/` +3. Otherwise, the default location for a CTAS query is used, which will depend on how your workgroup is configured. + More information: [CREATE TABLE AS][create-table-as] [run_started_at]: https://docs.getdbt.com/reference/dbt-jinja-functions/run_started_at diff --git a/dbt/adapters/athena/connections.py b/dbt/adapters/athena/connections.py index cc30862c..49811212 100644 --- a/dbt/adapters/athena/connections.py +++ b/dbt/adapters/athena/connections.py @@ -40,6 +40,8 @@ class AthenaCredentials(Credentials): poll_interval: float = 1.0 _ALIASES = {"catalog": "database"} num_retries: Optional[int] = 5 + s3_data_dir: Optional[str] = None + s3_data_naming: Optional[str] = "uuid" @property def type(self) -> str: diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index f815d70a..9530c49e 100644 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -38,11 +38,64 @@ def convert_datetime_type( return "timestamp" @available - def s3_uuid_table_location(self): + def s3_table_prefix(self) -> str: + """ + Returns the root location for storing tables in S3. + + This is `s3_data_dir`, if set, and `s3_staging_dir/tables/` if not. + + We generate a value here even if `s3_data_dir` is not set, + since creating a seed table requires a non-default location. + """ conn = self.connections.get_thread_connection() - client = conn.handle + creds = conn.credentials + if creds.s3_data_dir is not None: + return creds.s3_data_dir + else: + return f"{creds.s3_staging_dir}tables/" + + @available + def s3_uuid_table_location(self) -> str: + """ + Returns a random location for storing a table, using a UUID as + the final directory part + """ + return f"{self.s3_table_prefix()}{str(uuid4())}/" + + + @available + def s3_schema_table_location(self, schema_name: str, table_name: str) -> str: + """ + Returns a fixed location for storing a table determined by the + (athena) schema and table name + """ + return f"{self.s3_table_prefix()}{schema_name}/{table_name}/" + + @available + def s3_table_location(self, schema_name: str, table_name: str) -> str: + """ + Returns either a UUID or database/table prefix for storing a table, + depending on the value of s3_table + """ + conn = self.connections.get_thread_connection() + creds = conn.credentials + if creds.s3_data_naming == "schema_table": + return self.s3_schema_table_location(schema_name, table_name) + elif creds.s3_data_naming == "uuid": + return self.s3_uuid_table_location() + else: + raise ValueError(f"Unknown value for s3_data_naming: {creds.s3_data_naming}") + + @available + def has_s3_data_dir(self) -> bool: + """ + Returns true if the user has specified `s3_data_dir`, and + we should set `external_location + """ + conn = self.connections.get_thread_connection() + creds = conn.credentials + return creds.s3_data_dir is not None - return f"{client.s3_staging_dir}tables/{str(uuid4())}/" @available def clean_up_partitions( diff --git a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql index 504ba148..af398c9a 100644 --- a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql +++ b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql @@ -12,6 +12,8 @@ with ( {%- if external_location is not none and not temporary %} external_location='{{ external_location }}', + {%- elif adapter.has_s3_data_dir() -%} + external_location='{{ adapter.s3_table_location(relation.schema, relation.identifier) }}', {%- endif %} {%- if partitioned_by is not none %} partitioned_by=ARRAY{{ partitioned_by | tojson | replace('\"', '\'') }}, diff --git a/dbt/include/athena/macros/materializations/seeds/helpers.sql b/dbt/include/athena/macros/materializations/seeds/helpers.sql index bbc0e0e0..057d5d0d 100644 --- a/dbt/include/athena/macros/materializations/seeds/helpers.sql +++ b/dbt/include/athena/macros/materializations/seeds/helpers.sql @@ -21,7 +21,7 @@ {%- endfor -%} ) stored as parquet - location '{{ adapter.s3_uuid_table_location() }}' + location '{{ adapter.s3_table_location(model["schema"], model["alias"]) }}' tblproperties ('classification'='parquet') {% endset %} From bce728dcc4155bc75ade40604b54c2de03f12fba Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Sat, 19 Nov 2022 20:59:09 +0100 Subject: [PATCH 2/8] tweak docs --- README.md | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index dcb98372..24c6f17d 100644 --- a/README.md +++ b/README.md @@ -111,13 +111,11 @@ The location in which a table is saved is determined by: 2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming`: + `s3_data_naming=uuid`: `{s3_data_dir}/{uuid4()}/` + `s3_data_naming=schema_table`: `{s3_data_dir}/{schema}/{table}/` -3. Otherwise, the default location for a CTAS query is used, which will depend on how your workgroup is configured. + + `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()/` +3. Otherwise, the staging dir location used by the adapter is used by default. -More information: [CREATE TABLE AS][create-table-as] +It's possible to set the `s3_data_naming` globally in the target profile, or overwrite the value in the table config. -[run_started_at]: https://docs.getdbt.com/reference/dbt-jinja-functions/run_started_at -[invocation_id]: https://docs.getdbt.com/reference/dbt-jinja-functions/invocation_id -[create-table-as]: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html #### Supported functionality @@ -138,7 +136,6 @@ To get started just add this as your model: materialized='table', format='iceberg', partitioned_by=['bucket(5, user_id)'], - strict_location=false, table_properties={ 'optimize_rewrite_delete_file_threshold': '2' } @@ -202,6 +199,12 @@ You can run the tests using `make`: make run_tests ``` +### Helpful Resources + +* [Athena CREATE TABLE AS](https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html) +* [dbt run_started_at](https://docs.getdbt.com/reference/dbt-jinja-functions/run_started_at) +* [dbt invocation_id](https://docs.getdbt.com/reference/dbt-jinja-functions/invocation_id) + ### Community * [fishtown-analytics/dbt][fishtown-analytics/dbt] From b06cbf6c84739f521282f3a8e9580e12b870f076 Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Sat, 19 Nov 2022 23:17:21 +0100 Subject: [PATCH 3/8] tweaks --- README.md | 26 ++++++++++--------- dbt/adapters/athena/connections.py | 2 ++ .../models/helpers_iceberg.sql | 3 +++ 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 24c6f17d..82acdb8b 100644 --- a/README.md +++ b/README.md @@ -47,18 +47,18 @@ stored login info. You can configure the AWS profile name to use via `aws_profil A dbt profile can be configured to run against AWS Athena using the following configuration: -| Option | Description | Required? | Example | -|---------------- |-------------------------------------------------------------------------------- |----------- |-------------------- | -| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | -| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` | -| s3_data_naming | How to generate table paths in `s3_data_dir`: `uuid/schema_table` | Optional | `schema_table` | -| region_name | AWS region of your Athena instance | Required | `eu-west-1` | -| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | -| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | -| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | -| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` | -| work_group| Identifier of Athena workgroup | Optional | `my-custom-workgroup` | -| num_retries| Number of times to retry a failing query | Optional | `3` | `5` +| Option | Description | Required? | Example | +|---------------- |--------------------------------------------------------------------------------|----------- |-------------------- | +| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | +| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` | +| s3_data_naming | How to generate table paths in `s3_data_dir`: `uuid/schema_table/schema_table_unique` | Optional | `schema_table` | +| region_name | AWS region of your Athena instance | Required | `eu-west-1` | +| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | +| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | +| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | +| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` | +| work_group| Identifier of Athena workgroup | Optional | `my-custom-workgroup` | +| num_retries| Number of times to retry a failing query | Optional | `3` | `5` **Example profiles.yml entry:** ```yaml @@ -68,6 +68,8 @@ athena: dev: type: athena s3_staging_dir: s3://athena-query-results/dbt/ + s3_data_dir: s3://your_s3_bucket/dbt/ + s3_data_naming: schema_table region_name: eu-west-1 schema: dbt database: awsdatacatalog diff --git a/dbt/adapters/athena/connections.py b/dbt/adapters/athena/connections.py index 7e73a8fb..54cbc1aa 100644 --- a/dbt/adapters/athena/connections.py +++ b/dbt/adapters/athena/connections.py @@ -66,6 +66,8 @@ def _connection_keys(self) -> Tuple[str, ...]: "poll_interval", "aws_profile_name", "endpoing_url", + "s3_data_dir", + "s3_data_naming", ) diff --git a/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql b/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql index 34fe0af8..52c0f8b5 100644 --- a/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql +++ b/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql @@ -57,6 +57,9 @@ {%- set table_properties_formatted = [] -%} {%- set dest_columns_with_type = [] -%} + {{ print("default_s3_data_naming:" ~ default_s3_data_naming) }} + {{ print("s3_data_naming:" ~ s3_data_naming) }} + {%- for k in table_properties -%} {% set _ = table_properties_formatted.append("'" + k + "'='" + table_properties[k] + "'") -%} {%- endfor -%} From 2b16fd3b9a14a991ea4a0690ec112671d2fb2549 Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Sat, 19 Nov 2022 23:42:52 +0100 Subject: [PATCH 4/8] tweak --- dbt/adapters/athena/impl.py | 2 +- .../athena/macros/materializations/models/helpers_iceberg.sql | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index 356c310f..f4f85f38 100755 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -95,7 +95,7 @@ def s3_table_location(self, s3_data_naming: str, schema_name: str, table_name: s elif s3_data_naming == "schema_table": return self.s3_schema_table_location(schema_name, table_name) elif s3_data_naming == "schema_table_unique": - return self.s3_schema_table_location(schema_name, table_name) + return self.s3_schema_table_unique_location(schema_name, table_name) else: raise ValueError(f"Unknown value for s3_data_naming: {s3_data_naming}") diff --git a/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql b/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql index 52c0f8b5..642400bb 100644 --- a/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql +++ b/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql @@ -57,7 +57,6 @@ {%- set table_properties_formatted = [] -%} {%- set dest_columns_with_type = [] -%} - {{ print("default_s3_data_naming:" ~ default_s3_data_naming) }} {{ print("s3_data_naming:" ~ s3_data_naming) }} {%- for k in table_properties -%} From d389ce1fec0f397804c65c8fe6e928f2f243de41 Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Sat, 19 Nov 2022 23:55:03 +0100 Subject: [PATCH 5/8] remove print --- .../athena/macros/materializations/models/helpers_iceberg.sql | 2 -- 1 file changed, 2 deletions(-) diff --git a/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql b/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql index 642400bb..34fe0af8 100644 --- a/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql +++ b/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql @@ -57,8 +57,6 @@ {%- set table_properties_formatted = [] -%} {%- set dest_columns_with_type = [] -%} - {{ print("s3_data_naming:" ~ s3_data_naming) }} - {%- for k in table_properties -%} {% set _ = table_properties_formatted.append("'" + k + "'='" + table_properties[k] + "'") -%} {%- endfor -%} From f64627c481964fae8a4e7d0a869c360f660c9769 Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Sun, 20 Nov 2022 00:46:12 +0100 Subject: [PATCH 6/8] fix seeds --- dbt/include/athena/macros/materializations/seeds/helpers.sql | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbt/include/athena/macros/materializations/seeds/helpers.sql b/dbt/include/athena/macros/materializations/seeds/helpers.sql index 057d5d0d..67fe37d5 100644 --- a/dbt/include/athena/macros/materializations/seeds/helpers.sql +++ b/dbt/include/athena/macros/materializations/seeds/helpers.sql @@ -10,6 +10,7 @@ {% macro athena__create_csv_table(model, agate_table) %} {%- set column_override = model['config'].get('column_types', {}) -%} {%- set quote_seed_column = model['config'].get('quote_columns', None) -%} + {%- set s3_data_naming = model['config'].get('s3_data_naming', target.s3_data_naming) -%} {% set sql %} create external table {{ this.render() }} ( @@ -21,7 +22,7 @@ {%- endfor -%} ) stored as parquet - location '{{ adapter.s3_table_location(model["schema"], model["alias"]) }}' + location '{{ adapter.s3_table_location(s3_data_naming, model["schema"], model["alias"]) }}' tblproperties ('classification'='parquet') {% endset %} From 33c1a9299190e6849b07e0d974bece02fc216613 Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Sun, 20 Nov 2022 15:27:52 +0100 Subject: [PATCH 7/8] tweak implementation --- dbt/adapters/athena/connections.py | 2 +- dbt/adapters/athena/impl.py | 43 ++++++++---------------------- 2 files changed, 12 insertions(+), 33 deletions(-) diff --git a/dbt/adapters/athena/connections.py b/dbt/adapters/athena/connections.py index 54cbc1aa..bd0772fc 100644 --- a/dbt/adapters/athena/connections.py +++ b/dbt/adapters/athena/connections.py @@ -46,7 +46,7 @@ class AthenaCredentials(Credentials): _ALIASES = {"catalog": "database"} num_retries: Optional[int] = 5 s3_data_dir: Optional[str] = None - s3_data_naming: Optional[str] = "schema_table" + s3_data_naming: Optional[str] = "schema_table_unique" @property def type(self) -> str: diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index f4f85f38..45b2dddc 100755 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -59,46 +59,25 @@ def s3_table_prefix(self) -> str: else: return path.join(creds.s3_staging_dir, "tables") - @available - def s3_uuid_table_location(self): - """ - Returns a random location for storing a table, using a UUID as - the final directory part - """ - return path.join(self.s3_table_prefix(), str(uuid4())) + "/" - - @available - def s3_schema_table_location(self, schema_name: str, table_name: str) -> str: - """ - Returns a fixed location for storing a table determined by the - (athena) schema and table name - """ - return path.join(self.s3_table_prefix(), schema_name, table_name) + "/" - - @available - def s3_schema_table_unique_location(self, schema_name: str, table_name: str) -> str: - """ - Returns a fixed location for storing a table determined by the - (athena) schema and table name with a final unique UUID. - Helpful when working with Iceberg and table renaming - """ - return path.join(self.s3_table_prefix(), schema_name, table_name, str(uuid4())) + "/" - @available def s3_table_location(self, s3_data_naming: str, schema_name: str, table_name: str) -> str: """ Returns either a UUID or database/table prefix for storing a table, depending on the value of s3_table """ - if s3_data_naming == "uuid": - return self.s3_uuid_table_location() - elif s3_data_naming == "schema_table": - return self.s3_schema_table_location(schema_name, table_name) - elif s3_data_naming == "schema_table_unique": - return self.s3_schema_table_unique_location(schema_name, table_name) - else: + mapping = { + "uuid": path.join(self.s3_table_prefix(), str(uuid4())) + "/", + "schema_table": path.join(self.s3_table_prefix(), schema_name, table_name) + "/", + "schema_table_unique": path.join(self.s3_table_prefix(), schema_name, table_name, str(uuid4())) + "/", + } + + table_location = mapping.get(s3_data_naming) + + if table_location is None: raise ValueError(f"Unknown value for s3_data_naming: {s3_data_naming}") + return table_location + @available def clean_up_partitions(self, database_name: str, table_name: str, where_condition: str): # Look up Glue partitions & clean up From a877bbe1ffde3a6a2899fdd2d0c9f5757e1a4bbc Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Sun, 20 Nov 2022 20:36:30 +0100 Subject: [PATCH 8/8] data_dir in conf and update readme --- README.md | 42 +++++++++++-------- dbt/adapters/athena/impl.py | 17 ++++---- .../models/helpers_iceberg.sql | 5 ++- .../models/table/create_table_as.sql | 3 +- .../macros/materializations/seeds/helpers.sql | 3 +- 5 files changed, 41 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 82acdb8b..bb80a84c 100644 --- a/README.md +++ b/README.md @@ -47,18 +47,19 @@ stored login info. You can configure the AWS profile name to use via `aws_profil A dbt profile can be configured to run against AWS Athena using the following configuration: -| Option | Description | Required? | Example | -|---------------- |--------------------------------------------------------------------------------|----------- |-------------------- | -| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | +| Option | Description | Required? | Example | +|---------------- |--------------------------------------------------------------------------------|----------- |-----------------------| +| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | | s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` | -| s3_data_naming | How to generate table paths in `s3_data_dir`: `uuid/schema_table/schema_table_unique` | Optional | `schema_table` | -| region_name | AWS region of your Athena instance | Required | `eu-west-1` | -| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | -| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | -| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | -| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` | -| work_group| Identifier of Athena workgroup | Optional | `my-custom-workgroup` | -| num_retries| Number of times to retry a failing query | Optional | `3` | `5` +| s3_data_naming | How to generate table paths in `s3_data_dir` | Optional | `schema_table_unique` | +| region_name | AWS region of your Athena instance | Required | `eu-west-1` | +| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | +| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | +| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | +| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` | +| work_group| Identifier of Athena workgroup | Optional | `my-custom-workgroup` | +| num_retries| Number of times to retry a failing query | Optional | `3` | `5` + **Example profiles.yml entry:** ```yaml @@ -110,13 +111,18 @@ _Additional information_ The location in which a table is saved is determined by: 1. If `external_location` is defined, that value is used. -2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming`: - + `s3_data_naming=uuid`: `{s3_data_dir}/{uuid4()}/` - + `s3_data_naming=schema_table`: `{s3_data_dir}/{schema}/{table}/` - + `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()/` -3. Otherwise, the staging dir location used by the adapter is used by default. - -It's possible to set the `s3_data_naming` globally in the target profile, or overwrite the value in the table config. +2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming` +3. If `s3_data_dir` is not defined data is stored under `s3_staging_dir/tables/` + +Here all the options available for `s3_data_naming`: +* `uuid`: `{s3_data_dir}/{uuid4()}/` +* `table_table`: `{s3_data_dir}/{table}/` +* `table_unique`: `{s3_data_dir}/{table}/{uuid4()}/` +* `schema_table`: `{s3_data_dir}/{schema}/{table}/` +* `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()}/` + +It's possible to set the `s3_data_naming` globally in the target profile, or overwrite the value in the table config, +or setting up the value for groups of model in dbt_project.yml #### Supported functionality diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index 9252b22d..37ad06e9 100755 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -46,7 +46,7 @@ def convert_datetime_type(cls, agate_table: agate.Table, col_idx: int) -> str: return "timestamp" @available - def s3_table_prefix(self) -> str: + def s3_table_prefix(self, s3_data_dir: str) -> str: """ Returns the root location for storing tables in S3. This is `s3_data_dir`, if set, and `s3_staging_dir/tables/` if not. @@ -55,21 +55,24 @@ def s3_table_prefix(self) -> str: """ conn = self.connections.get_thread_connection() creds = conn.credentials - if creds.s3_data_dir is not None: - return creds.s3_data_dir + if s3_data_dir is not None: + return s3_data_dir else: return path.join(creds.s3_staging_dir, "tables") @available - def s3_table_location(self, s3_data_naming: str, schema_name: str, table_name: str) -> str: + def s3_table_location(self, s3_data_dir: str, s3_data_naming: str, schema_name: str, table_name: str) -> str: """ Returns either a UUID or database/table prefix for storing a table, depending on the value of s3_table """ mapping = { - "uuid": path.join(self.s3_table_prefix(), str(uuid4())) + "/", - "schema_table": path.join(self.s3_table_prefix(), schema_name, table_name) + "/", - "schema_table_unique": path.join(self.s3_table_prefix(), schema_name, table_name, str(uuid4())) + "/", + "uuid": path.join(self.s3_table_prefix(s3_data_dir), str(uuid4())) + "/", + "table": path.join(self.s3_table_prefix(s3_data_dir), table_name) + "/", + "table_unique": path.join(self.s3_table_prefix(s3_data_dir), table_name, str(uuid4())) + "/", + "schema_table": path.join(self.s3_table_prefix(s3_data_dir), schema_name, table_name) + "/", + "schema_table_unique": path.join(self.s3_table_prefix(s3_data_dir), schema_name, table_name, str(uuid4())) + + "/", } table_location = mapping.get(s3_data_naming) diff --git a/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql b/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql index 34fe0af8..ef14a720 100644 --- a/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql +++ b/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql @@ -53,9 +53,10 @@ {%- set partitioned_by = config.get('partitioned_by', default=none) -%} {%- set table_properties = config.get('table_properties', default={}) -%} {%- set _ = table_properties.update({'table_type': 'ICEBERG'}) -%} - {%- set s3_data_naming = config.get('s3_data_naming', default=target.s3_data_naming) -%} {%- set table_properties_formatted = [] -%} {%- set dest_columns_with_type = [] -%} + {%- set s3_data_dir = config.get('s3_data_dir', default=target.s3_data_dir) -%} + {%- set s3_data_naming = config.get('s3_data_naming', default=target.s3_data_naming) -%} {%- for k in table_properties -%} {% set _ = table_properties_formatted.append("'" + k + "'='" + table_properties[k] + "'") -%} @@ -64,7 +65,7 @@ {%- set table_properties_csv= table_properties_formatted | join(', ') -%} {%- if external_location is none %} - {%- set external_location = adapter.s3_table_location(s3_data_naming, relation.schema, relation.identifier) -%} + {%- set external_location = adapter.s3_table_location(s3_data_dir, s3_data_naming, relation.schema, relation.identifier) -%} {%- endif %} {%- for col in dest_columns -%} diff --git a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql index 47ac0352..82cf391a 100644 --- a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql +++ b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql @@ -6,6 +6,7 @@ {%- set field_delimiter = config.get('field_delimiter', default=none) -%} {%- set format = config.get('format', default='parquet') -%} {%- set write_compression = config.get('write_compression', default=none) -%} + {%- set s3_data_dir = config.get('s3_data_dir', default=target.s3_data_dir) -%} {%- set s3_data_naming = config.get('s3_data_naming', default=target.s3_data_naming) -%} create table @@ -15,7 +16,7 @@ {%- if external_location is not none and not temporary %} external_location='{{ external_location }}', {%- else -%} - external_location='{{ adapter.s3_table_location(s3_data_naming, relation.schema, relation.identifier) }}', + external_location='{{ adapter.s3_table_location(s3_data_dir, s3_data_naming, relation.schema, relation.identifier) }}', {%- endif %} {%- if partitioned_by is not none %} partitioned_by=ARRAY{{ partitioned_by | tojson | replace('\"', '\'') }}, diff --git a/dbt/include/athena/macros/materializations/seeds/helpers.sql b/dbt/include/athena/macros/materializations/seeds/helpers.sql index 67fe37d5..2ce2956c 100644 --- a/dbt/include/athena/macros/materializations/seeds/helpers.sql +++ b/dbt/include/athena/macros/materializations/seeds/helpers.sql @@ -10,6 +10,7 @@ {% macro athena__create_csv_table(model, agate_table) %} {%- set column_override = model['config'].get('column_types', {}) -%} {%- set quote_seed_column = model['config'].get('quote_columns', None) -%} + {%- set s3_data_dir = config.get('s3_data_dir', default=target.s3_data_dir) -%} {%- set s3_data_naming = model['config'].get('s3_data_naming', target.s3_data_naming) -%} {% set sql %} @@ -22,7 +23,7 @@ {%- endfor -%} ) stored as parquet - location '{{ adapter.s3_table_location(s3_data_naming, model["schema"], model["alias"]) }}' + location '{{ adapter.s3_table_location(s3_data_dir, s3_data_naming, model["schema"], model["alias"]) }}' tblproperties ('classification'='parquet') {% endset %}