Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUGFIX] Added get_partition pagination #82

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
21 changes: 13 additions & 8 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,19 @@ def clean_up_partitions(
with boto3_client_lock:
glue_client = boto3.client('glue', region_name=client.region_name)
s3_resource = boto3.resource('s3', region_name=client.region_name)
partitions = glue_client.get_partitions(
# CatalogId='123456789012', # Need to make this configurable if it is different from default AWS Account ID
DatabaseName=database_name,
TableName=table_name,
Expression=where_condition
)
p = re.compile('s3://([^/]*)/(.*)')
for partition in partitions["Partitions"]:
paginator = glue_client.get_paginator("get_partitions")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you have a chance, can you check what would happen if the current user doesn't have enough permission to fetch the partitions?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you have a chance, can you check what would happen if the current user doesn't have enough permission to fetch the partitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You bet. I should have some time to look into it tomorrow to check the permission case and run through the proper dbt-athena testing.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DVerzal
Thank you.
While I debug #84, I noticed that there could be unexpected behaviors when Error in API is not handled properly :D

I am working on making a PR to handle #84

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have validated that the dbt run fails before doing any work when permission to access the permissions is not available:

14:48:34  Runtime Error in model epic_835 (models/test.sql)
14:48:34    Insufficient permissions to execute the query.  User: dustin.verzal is not authorized to perform: glue:GetPartitions on resource: arn:aws:glue:us-east-2:********:catalog with an explicit deny in an identity-based policy . You may need to manually clean the data at location 'PATH' before retrying. Athena will not delete data in your account.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DVerzal this is an issue due to missing permissions. You are missing a glue:GetPartitions on the table.

partition_params = {
"DatabaseName": database_name,
"TableName": table_name,
"Expression": where_condition,
"ExcludeColumnSchema": True,
}
partition_pg = paginator.paginate(**partition_params)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternatively you could do this

partitions  = partition_pg.build_full_result().get('Partitions')

Doing so you avoid a for loop, that will happen automatically when calling build_full_result method.

partitions = []
for pg in partition_pg:
partitions.extend(pg["Partitions"])
p = re.compile("s3://([^/]*)/(.*)")
for partition in partitions:
logger.debug("Deleting objects for partition '{}' at '{}'", partition["Values"], partition["StorageDescriptor"]["Location"])
m = p.match(partition["StorageDescriptor"]["Location"])
if m is not None:
Expand Down