|
39 | 39 | from pyiceberg.schema import INITIAL_SCHEMA_ID, Schema |
40 | 40 | from pyiceberg.table.metadata import INITIAL_SPEC_ID |
41 | 41 | from pyiceberg.table.sorting import INITIAL_SORT_ORDER_ID, SortField, SortOrder |
42 | | -from pyiceberg.transforms import DayTransform, IdentityTransform |
| 42 | +from pyiceberg.transforms import BucketTransform, DayTransform, IdentityTransform |
43 | 43 | from pyiceberg.types import IntegerType, LongType, NestedField, TimestampType, UUIDType |
44 | 44 | from tests.conftest import clean_up |
45 | 45 |
|
@@ -503,6 +503,69 @@ def test_update_namespace_properties(test_catalog: Catalog, database_name: str) |
503 | 503 | assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"] |
504 | 504 |
|
505 | 505 |
|
| 506 | +@pytest.mark.integration |
| 507 | +@pytest.mark.parametrize("test_catalog", CATALOGS) |
| 508 | +def test_update_table_spec(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None: |
| 509 | + identifier = (database_name, table_name) |
| 510 | + test_catalog.create_namespace(database_name) |
| 511 | + table = test_catalog.create_table(identifier, test_schema) |
| 512 | + |
| 513 | + with table.update_spec() as update: |
| 514 | + update.add_field(source_column_name="VendorID", transform=BucketTransform(16), partition_field_name="shard") |
| 515 | + |
| 516 | + loaded = test_catalog.load_table(identifier) |
| 517 | + expected_spec = PartitionSpec( |
| 518 | + PartitionField(source_id=1, field_id=1000, transform=BucketTransform(16), name="shard"), spec_id=1 |
| 519 | + ) |
| 520 | + # The spec ID may not match, so check equality of the fields |
| 521 | + assert loaded.spec() == expected_spec |
| 522 | + |
| 523 | + |
| 524 | +@pytest.mark.integration |
| 525 | +@pytest.mark.parametrize("test_catalog", CATALOGS) |
| 526 | +def test_update_table_spec_conflict(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None: |
| 527 | + identifier = (database_name, table_name) |
| 528 | + test_catalog.create_namespace(database_name) |
| 529 | + spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=BucketTransform(16), name="id_bucket")) |
| 530 | + table = test_catalog.create_table(identifier, test_schema, partition_spec=spec) |
| 531 | + |
| 532 | + update = table.update_spec() |
| 533 | + update.add_field(source_column_name="tpep_pickup_datetime", transform=BucketTransform(16), partition_field_name="shard") |
| 534 | + |
| 535 | + # update with conflict |
| 536 | + conflict_table = test_catalog.load_table(identifier) |
| 537 | + with conflict_table.update_spec() as conflict_update: |
| 538 | + conflict_update.remove_field("id_bucket") |
| 539 | + |
| 540 | + with pytest.raises( |
| 541 | + CommitFailedException, match="Requirement failed: default spec id has changed|default partition spec changed" |
| 542 | + ): |
| 543 | + update.commit() |
| 544 | + |
| 545 | + loaded = test_catalog.load_table(identifier) |
| 546 | + assert loaded.spec() == PartitionSpec(spec_id=1) |
| 547 | + |
| 548 | + |
| 549 | +@pytest.mark.integration |
| 550 | +@pytest.mark.parametrize("test_catalog", CATALOGS) |
| 551 | +def test_update_table_spec_then_revert(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None: |
| 552 | + identifier = (database_name, table_name) |
| 553 | + test_catalog.create_namespace(database_name) |
| 554 | + |
| 555 | + initial_spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=BucketTransform(16), name="id_bucket")) |
| 556 | + |
| 557 | + table = test_catalog.create_table(identifier, test_schema, partition_spec=initial_spec, properties={"format-version": "2"}) |
| 558 | + assert table.format_version == 2 |
| 559 | + |
| 560 | + with table.update_spec() as update: |
| 561 | + update.add_identity(source_column_name="tpep_pickup_datetime") |
| 562 | + |
| 563 | + with table.update_spec() as update: |
| 564 | + update.remove_field("tpep_pickup_datetime") |
| 565 | + |
| 566 | + assert table.spec() == initial_spec |
| 567 | + |
| 568 | + |
506 | 569 | @pytest.mark.integration |
507 | 570 | @pytest.mark.parametrize("test_catalog", CATALOGS) |
508 | 571 | def test_register_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None: |
|
0 commit comments