-
Notifications
You must be signed in to change notification settings - Fork 229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support S3 Table Buckets with S3TablesCatalog #1429
base: main
Are you sure you want to change the base?
Conversation
I was able to work around the issue above by using |
Thanks for working on this @felixscherz Feel free to tag me when its ready for review :) |
I think you can now review this PR if you have time @kevinjqliu :) I currently run tests by setting the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, i added a few comments to clarify the catalog behaviors
I'm a little hesitant to merge this in given that we have to run tests against a production S3 endpoint. Maybe we can mock the endpoint?
I ran the tests locally
And these 3 testa failed, everything else is ✅
|
Thank you for the review! I removed tests related to boto3 and set the AWS region explicitly for the test run. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few more comments.
I was able to run the test locally
AWS_REGION=us-east-2 ARN=... poetry run pytest tests/catalog/test_s3tables.py
after making a few local changes
- poetry update boto3
- add
aws_region
fixture - pass aws_region to catalog
Could you update the PR description so others can test this PR out?
398e2d7
to
05e4dfd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@felixscherz Thanks for the great contribution! Looking forward to adding this to PyIceberg! I left some comments. Please let me know what you think.
def commit_table( | ||
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] | ||
) -> CommitTableResponse: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not find the logic for cases when table not exist, which means create_table_transaction
will not be supported in the current version.
iceberg-python/pyiceberg/catalog/__init__.py
Lines 754 to 765 in e41c428
def create_table_transaction( | |
self, | |
identifier: Union[str, Identifier], | |
schema: Union[Schema, "pa.Schema"], | |
location: Optional[str] = None, | |
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | |
sort_order: SortOrder = UNSORTED_SORT_ORDER, | |
properties: Properties = EMPTY_DICT, | |
) -> CreateTableTransaction: | |
return CreateTableTransaction( | |
self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties) | |
) |
We do not have to support everything in the initial PR. But it will be good to override create_table_transaction
as "Not Implemented" for the s3tables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added exceptions for this case for now along with a test. I will have a look at how to implement this properly
pyiceberg/catalog/s3tables.py
Outdated
try: | ||
self.s3tables.create_table( | ||
tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name, format="ICEBERG" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If anything goes wrong after this point, I think we should clean up the created s3 table by s3tables' delete_table
endpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a try/except to delete the s3 table in case something goes wrong with writing the initial metadata.
can you run |
894cbc9
to
2e1c383
Compare
@felixscherz could you rebase this against main? i see that getmoto/moto/8470 is now merged, thanks for driving that! |
99e569b
to
54b8e87
Compare
I rebased onto the main. I prepared the unit tests using the new |
@kevinjqliu |
f30b7e6
to
804a468
Compare
Co-authored-by: Honah J. <[email protected]>
c27b60d
to
018bf0b
Compare
its green! i'll review this so we can include it in the upcoming 0.9.0 release |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for adding both the unit test and integration test. And for driving downstream dependency to add support for S3Tables!! (getmoto/moto#8470)
I pushed a few changes to resolve merge conflicts. And I verified the integration test locally
AWS_REGION=us-east-2 AWS_TEST_S3_TABLE_BUCKET_ARN=arn:aws:s3tables:us-east-2:033327485438:bucket/s3-table poetry run pytest tests/catalog/integration_test_s3tables.py
def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> None: | ||
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path)) | ||
def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str, overwrite: bool = False) -> None: | ||
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path), overwrite=overwrite) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 default is False
iceberg-python/pyiceberg/serializers.py
Line 123 in 300b840
def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite: bool = False) -> None: |
try: | ||
self.s3tables = session.client("s3tables", endpoint_url=properties.get(S3TABLES_ENDPOINT)) | ||
except boto3.session.UnknownServiceError as e: | ||
raise S3TablesError("'s3tables' requires boto3>=1.35.74. Current version: {boto3.__version__}.") from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to make this an f string so that boto3 version can be interpolated
raise NotImplementedError("Namespace properties are read only.") | ||
|
||
def purge_table(self, identifier: Union[str, Identifier]) -> None: | ||
# purge is not supported as s3tables doesn't support delete operations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC: s3Tables API only wants users to drop tables if they're specifying a purge because they will lose access to their data
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-delete.html
Java impl: https://github.com/awslabs/s3-tables-catalog/blob/main/src/software/amazon/s3tables/iceberg/S3TablesCatalog.java#L439
namespace = self._validate_namespace_identifier(namespace) | ||
paginator = self.s3tables.get_paginator("list_tables") | ||
tables: List[Identifier] = [] | ||
for page in paginator.paginate(tableBucketARN=self.table_bucket_arn, namespace=namespace): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: catch and throw the NoSuchNamespaceError to align with catalog exceptions
|
||
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: | ||
namespace = self._validate_namespace_identifier(namespace) | ||
response = self.s3tables.get_namespace(tableBucketARN=self.table_bucket_arn, namespace=namespace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: same as above, catch and throw the NoSuchNamespaceError to align with catalog exceptions
def __init__(self, name: str, **properties: str): | ||
super().__init__(name, **properties) | ||
|
||
self.table_bucket_arn = self.properties[S3TABLES_TABLE_BUCKET_ARN] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about placing an assertion on this property to require the user to set the table bucket arn? Similar to the java implementation.
Hi, this is in regards to #1404.
I created a first draft of an
S3TablesCatalog
that uses the S3 Table Bucket API for catalog operations.How to run tests
Since moto does not support mocking the S3 Tables API yet (WIP: getmoto/moto#8470) we have to run tests against a live AWS account. To do that, create an S3 Tables Bucket in one of the supported regions and then set the table bucket ARN and AWS Region as environment variables