Skip to content

Commit

Permalink
fix: add catalog_id glue_client function calls
Browse files Browse the repository at this point in the history
  • Loading branch information
brunofaustino committed Aug 11, 2023
1 parent 7e881b3 commit 3927130
Showing 1 changed file with 36 additions and 8 deletions.
44 changes: 36 additions & 8 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,16 +529,25 @@ def swap_table(self, src_relation: AthenaRelation, target_relation: AthenaRelati
conn = self.connections.get_thread_connection()
client = conn.handle

data_catalog = self._get_data_catalog(src_relation.database)
src_catalog_id = get_catalog_id(data_catalog)

with boto3_client_lock:
glue_client = client.session.client("glue", region_name=client.region_name, config=get_boto3_config())

src_table = glue_client.get_table(DatabaseName=src_relation.schema, Name=src_relation.identifier).get("Table")
src_table = glue_client.get_table(
CatalogId=src_catalog_id, DatabaseName=src_relation.schema, Name=src_relation.identifier
).get("Table")

src_table_partitions = glue_client.get_partitions(
DatabaseName=src_relation.schema, TableName=src_relation.identifier
CatalogId=src_catalog_id, DatabaseName=src_relation.schema, TableName=src_relation.identifier
).get("Partitions")

data_catalog = self._get_data_catalog(src_relation.database)
target_catalog_id = get_catalog_id(data_catalog)

target_table_partitions = glue_client.get_partitions(
DatabaseName=target_relation.schema, TableName=target_relation.identifier
CatalogId=target_catalog_id, DatabaseName=target_relation.schema, TableName=target_relation.identifier
).get("Partitions")

target_table_version = {
Expand All @@ -551,7 +560,9 @@ def swap_table(self, src_relation: AthenaRelation, target_relation: AthenaRelati
}

# perform a table swap
glue_client.update_table(DatabaseName=target_relation.schema, TableInput=target_table_version)
glue_client.update_table(
CatalogId=target_catalog_id, DatabaseName=target_relation.schema, TableInput=target_table_version
)
LOGGER.debug(f"Table {target_relation.render()} swapped with the content of {src_relation.render()}")

# we delete the target table partitions in any case
Expand All @@ -560,6 +571,7 @@ def swap_table(self, src_relation: AthenaRelation, target_relation: AthenaRelati
if target_table_partitions:
for partition_batch in get_chunks(target_table_partitions, AthenaAdapter.BATCH_DELETE_PARTITION_API_LIMIT):
glue_client.batch_delete_partition(
CatalogId=target_catalog_id,
DatabaseName=target_relation.schema,
TableName=target_relation.identifier,
PartitionsToDelete=[{"Values": partition["Values"]} for partition in partition_batch],
Expand All @@ -568,6 +580,7 @@ def swap_table(self, src_relation: AthenaRelation, target_relation: AthenaRelati
if src_table_partitions:
for partition_batch in get_chunks(src_table_partitions, AthenaAdapter.BATCH_CREATE_PARTITION_API_LIMIT):
glue_client.batch_create_partition(
CatalogId=target_catalog_id,
DatabaseName=target_relation.schema,
TableName=target_relation.identifier,
PartitionInputList=[
Expand Down Expand Up @@ -609,6 +622,9 @@ def expire_glue_table_versions(
conn = self.connections.get_thread_connection()
client = conn.handle

data_catalog = self._get_data_catalog(relation.database)
catalog_id = get_catalog_id(data_catalog)

with boto3_client_lock:
glue_client = client.session.client("glue", region_name=client.region_name, config=get_boto3_config())

Expand All @@ -621,7 +637,10 @@ def expire_glue_table_versions(
location = v["Table"]["StorageDescriptor"]["Location"]
try:
glue_client.delete_table_version(
DatabaseName=relation.schema, TableName=relation.identifier, VersionId=str(version)
CatalogId=catalog_id,
DatabaseName=relation.schema,
TableName=relation.identifier,
VersionId=str(version)
)
deleted_versions.append(version)
LOGGER.debug(f"Deleted version {version} of table {relation.render()} ")
Expand Down Expand Up @@ -653,13 +672,16 @@ def persist_docs_to_glue(
conn = self.connections.get_thread_connection()
client = conn.handle

data_catalog = self._get_data_catalog(relation.database)
catalog_id = get_catalog_id(data_catalog)

with boto3_client_lock:
glue_client = client.session.client("glue", region_name=client.region_name, config=get_boto3_config())

# By default, there is no need to update Glue Table
need_udpate_table = False
# Get Table from Glue
table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.name)["Table"]
table = glue_client.get_table(CatalogId=catalog_id, DatabaseName=relation.schema, Name=relation.name)["Table"]
# Prepare new version of Glue Table picking up significant fields
updated_table = self._get_table_input(table)
# Update table description
Expand Down Expand Up @@ -699,7 +721,10 @@ def persist_docs_to_glue(
# It prevents redundant schema version creating after incremental runs.
if need_udpate_table:
glue_client.update_table(
DatabaseName=relation.schema, TableInput=updated_table, SkipArchive=skip_archive_table_version
CatalogId=catalog_id,
DatabaseName=relation.schema,
TableInput=updated_table,
SkipArchive=skip_archive_table_version
)

@available
Expand Down Expand Up @@ -730,11 +755,14 @@ def get_columns_in_relation(self, relation: AthenaRelation) -> List[AthenaColumn
conn = self.connections.get_thread_connection()
client = conn.handle

data_catalog = self._get_data_catalog(relation.database)
catalog_id = get_catalog_id(data_catalog)

with boto3_client_lock:
glue_client = client.session.client("glue", region_name=client.region_name, config=get_boto3_config())

try:
table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.identifier)["Table"]
table = glue_client.get_table(CatalogId=catalog_id, DatabaseName=relation.schema, Name=relation.identifier)["Table"]
except ClientError as e:
if e.response["Error"]["Code"] == "EntityNotFoundException":
LOGGER.debug("table not exist, catching the error")
Expand Down

0 comments on commit 3927130

Please sign in to comment.