From fb724f0c8989c684259c24ebad3cefbefdc43432 Mon Sep 17 00:00:00 2001 From: Vinish Reddy Date: Thu, 26 Dec 2024 11:10:59 -0800 Subject: [PATCH] [590] Add interfaces for CatalogSyncClient and CatalogSync --- pom.xml | 15 +- rfc/template.md | 55 -------- xtable-api/pom.xml | 4 + .../apache/xtable/conversion/SourceTable.java | 2 +- .../model/catalog/CatalogTableIdentifier.java | 25 ++++ .../catalog/HierarchicalTableIdentifier.java | 34 +++++ .../ThreePartHierarchicalTableIdentifier.java | 105 ++++++++++++++ .../exception/CatalogRefreshException.java | 31 +++++ .../xtable/model/exception/ErrorCode.java | 3 +- .../apache/xtable/model/sync/SyncResult.java | 28 +++- .../extractor/CatalogConversionSource.java | 32 +++++ .../spi/extractor/ConversionSource.java | 9 ++ .../apache/xtable/spi/sync/CatalogSync.java | 129 ++++++++++++++++++ .../xtable/spi/sync/CatalogSyncClient.java | 71 ++++++++++ .../apache/xtable/spi/sync/CatalogUtils.java | 63 +++++++++ .../xtable/spi/sync/TableFormatSync.java | 13 +- ...tThreePartHierarchicalTableIdentifier.java | 50 +++++++ .../xtable/spi/sync/TestCatalogSync.java | 128 +++++++++++++++++ .../xtable/spi/sync/TestCatalogUtils.java | 72 ++++++++++ .../xtable/spi/sync/TestTableFormatSync.java | 38 ++++-- .../conversion/ConversionController.java | 4 +- .../xtable/conversion/ConversionUtils.java | 57 -------- .../xtable/delta/DeltaConversionSource.java | 7 + .../xtable/hudi/HudiConversionSource.java | 13 ++ .../iceberg/IcebergConversionSource.java | 7 + .../apache/xtable/ITConversionController.java | 3 +- .../conversion/TestConversionController.java | 2 +- .../conversion/TestConversionUtils.java | 111 --------------- .../delta/ITDeltaConversionTargetSource.java | 55 +++++--- .../hudi/ITHudiConversionSourceSource.java | 125 +++++++++++++++++ .../ITIcebergConversionTargetSource.java | 69 ++++++++++ .../xtable/iceberg/TestIcebergDataHelper.java | 8 ++ .../apache/xtable/testutil/ITTestUtils.java | 47 +++++++ .../xtable/hudi/sync/XTableSyncTool.java | 2 +- 34 files changed, 1148 insertions(+), 269 deletions(-) delete mode 100644 rfc/template.md create mode 100644 xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogTableIdentifier.java create mode 100644 xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java create mode 100644 xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java create mode 100644 xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java create mode 100644 xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java create mode 100644 xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java create mode 100644 xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java create mode 100644 xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java create mode 100644 xtable-api/src/test/java/org/apache/xtable/model/catalog/TestThreePartHierarchicalTableIdentifier.java create mode 100644 xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogSync.java create mode 100644 xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java delete mode 100644 xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java delete mode 100644 xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java create mode 100644 xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java diff --git a/pom.xml b/pom.xml index 99b4fe1a9..7a5973428 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,7 @@ 3.2.4 3.1.1 2.5.3 + 5.2.0 1.12.2 3.25.5 2.12.20 @@ -438,7 +439,19 @@ org.mockito mockito-core - 4.8.0 + ${mockito.version} + test + + + org.mockito + mockito-junit-jupiter + test + ${mockito.version} + + + org.mockito + mockito-inline + ${mockito.version} test diff --git a/rfc/template.md b/rfc/template.md deleted file mode 100644 index 75ab32a4a..000000000 --- a/rfc/template.md +++ /dev/null @@ -1,55 +0,0 @@ - -# RFC-[number]: [Title] - -## Proposers - -- @ -- @ - -## Approvers -- @ -- @ - -## Status - -GH Feature Request: - -> Please keep the status updated in `rfc/README.md`. - -## Abstract - -Describe the problem you are trying to solve and a brief description of why it’s needed. - -## Background -Introduce any background context which is relevant or necessary to understand the feature and design choices. - -## Implementation -Describe the new thing you want to do in appropriate detail, how it fits into the project architecture.
-Provide a detailed description of how you intend to implement this feature, this may be fairly extensive and have large subsections of its own or it may be a few sentences.
-Use judgement to decide on how detailed the description needs to be based on the scope of the change. If unclear, you can ask questions in dev@xtable.apache.org. - -## Rollout/Adoption Plan - -- Are there any breaking changes as part of this new feature/functionality? -- What impact (if any) will there be on existing users? -- If we are changing behavior how will we phase out the older behavior? When will we remove the existing behavior? -- If we need special migration tools, describe them here. - -## Test Plan - -Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing breaks? \ No newline at end of file diff --git a/xtable-api/pom.xml b/xtable-api/pom.xml index 9436b7752..43aa7bace 100644 --- a/xtable-api/pom.xml +++ b/xtable-api/pom.xml @@ -84,5 +84,9 @@ mockito-core test + + org.mockito + mockito-junit-jupiter + diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java index b37e1c1e2..f3e1c3599 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java @@ -28,7 +28,7 @@ @EqualsAndHashCode(callSuper = true) @Getter public class SourceTable extends ExternalTable { - /** The path to the data files, defaults to the metadataPath */ + /** The path to the data files, defaults to the basePath */ @NonNull private final String dataPath; @Builder(toBuilder = true) diff --git a/xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogTableIdentifier.java b/xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogTableIdentifier.java new file mode 100644 index 000000000..7679035f1 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogTableIdentifier.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.catalog; + +/** Represents a catalog table identifier in a multi-level catalog system. */ +public interface CatalogTableIdentifier { + /** Returns the string identifier for the table within its catalog context */ + String getId(); +} diff --git a/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java b/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java new file mode 100644 index 000000000..e6d7cf0eb --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.catalog; + +/** + * Represents a hierarchical table identifier, often including catalog, database (or schema), and + * table names. Some catalogs may omit the catalog name. + */ +public interface HierarchicalTableIdentifier extends CatalogTableIdentifier { + /** @return the catalog name if present, otherwise null */ + String getCatalogName(); + + /** @return the database (or schema) name; required */ + String getDatabaseName(); + + /** @return the table name; required */ + String getTableName(); +} diff --git a/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java new file mode 100644 index 000000000..2608d36a3 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.catalog; + +import lombok.NonNull; +import lombok.Value; + +/** + * An internal representation of a fully qualified table identifier within a catalog. The naming + * convention follows a three level hierarchy, few examples can be found below. + * + *
    + *
  • 1. catalog.database.table + *
  • 2. catalog.schema.table + *
  • 3. database.schema.table + *
+ * + * We have selected the first naming convention and will interoperate among other catalogs following + * a different convention. + */ +@Value +public class ThreePartHierarchicalTableIdentifier implements HierarchicalTableIdentifier { + /** + * The top level hierarchy/namespace for organizing tables. Each catalog can have multiple + * databases/schemas. This is an optional field as many catalogs have a "default" catalog whose + * name varies depending on the catalogType. + */ + String catalogName; + /** + * Catalogs have the ability to group tables logically, databaseName is the identifier for such + * logical classification. The alternate names for this field include namespace, schemaName etc. + */ + @NonNull String databaseName; + + /** + * The table name used when syncing the table to the catalog. NOTE: This name can be different + * from the table name in storage. + */ + @NonNull String tableName; + + public ThreePartHierarchicalTableIdentifier( + String catalogName, @NonNull String databaseName, @NonNull String tableName) { + this.catalogName = catalogName; + this.databaseName = databaseName; + this.tableName = tableName; + } + + public ThreePartHierarchicalTableIdentifier(String databaseName, String tableName) { + this(null, databaseName, tableName); + } + + /** + * Constructs a new {@code CatalogTableIdentifier} from a hierarchical string identifier. + * + *

The identifier is expected to be in one of the following formats: + * + *

    + *
  • {@code database.table} (two parts, no catalog) + *
  • {@code catalog.database.table} (three parts) + *
+ * + * If the identifier does not match one of these formats, an {@link IllegalArgumentException} is + * thrown. + * + * @param hierarchicalTableIdentifier The hierarchical string identifier (e.g., + * "myCatalog.myDatabase.myTable" or "myDatabase.myTable"). + * @throws IllegalArgumentException If the provided string does not match a valid two-part or + * three-part identifier. + */ + public static ThreePartHierarchicalTableIdentifier fromDotSeparatedIdentifier( + String hierarchicalTableIdentifier) { + String[] parts = hierarchicalTableIdentifier.split("\\."); + if (parts.length == 2) { + return new ThreePartHierarchicalTableIdentifier(parts[0], parts[1]); + } else if (parts.length == 3) { + return new ThreePartHierarchicalTableIdentifier(parts[0], parts[1], parts[2]); + } else { + throw new IllegalArgumentException("Invalid table identifier " + hierarchicalTableIdentifier); + } + } + + @Override + public String getId() { + if (catalogName != null && !catalogName.isEmpty()) { + return catalogName + "." + databaseName + "." + tableName; + } + return databaseName + "." + tableName; + } +} diff --git a/xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java b/xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java new file mode 100644 index 000000000..dd3222927 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.exception; + +/** Exception thrown when refresh operation (updating table format metadata) in catalog fails. */ +public class CatalogRefreshException extends InternalException { + + public CatalogRefreshException(String message, Throwable e) { + super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message, e); + } + + public CatalogRefreshException(String message) { + super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message); + } +} diff --git a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java b/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java index 920a95f49..af85c9007 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java @@ -30,7 +30,8 @@ public enum ErrorCode { INVALID_SCHEMA(10006), UNSUPPORTED_SCHEMA_TYPE(10007), UNSUPPORTED_FEATURE(10008), - PARSE_EXCEPTION(10009); + PARSE_EXCEPTION(10009), + CATALOG_REFRESH_EXCEPTION(10010); private final int errorCode; diff --git a/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java b/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java index d158b38c8..824f626c3 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.List; import lombok.Builder; import lombok.Value; @@ -30,7 +31,7 @@ * @since 0.1 */ @Value -@Builder +@Builder(toBuilder = true) public class SyncResult { // Mode used for the sync SyncMode mode; @@ -38,10 +39,12 @@ public class SyncResult { Instant syncStartTime; // Duration Duration syncDuration; - // Status of the sync - SyncStatus status; + // Status of the tableFormat sync + SyncStatus tableFormatSyncStatus; // The Sync Mode recommended for the next sync (Usually filled on an error) SyncMode recommendedSyncMode; + // The sync status for each catalog. + List catalogSyncStatusList; public enum SyncStatusCode { SUCCESS, @@ -57,6 +60,25 @@ public static class SyncStatus { SyncStatus.builder().statusCode(SyncStatusCode.SUCCESS).build(); // Status code SyncStatusCode statusCode; + // errorDetails if any + ErrorDetails errorDetails; + } + + /** Represents status for catalog sync status operation */ + @Value + @Builder + public static class CatalogSyncStatus { + // A user defined unique catalog identifier. + String catalogId; + // Status code + SyncStatusCode statusCode; + // errorDetails if any + ErrorDetails errorDetails; + } + + @Value + @Builder + public static class ErrorDetails { // error Message if any String errorMessage; // Readable description of the error diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java new file mode 100644 index 000000000..616f3d459 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.extractor; + +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; + +/** + * A client for converting the table with tableIdentifier {@link CatalogTableIdentifier} in source + * catalog to SourceTable object {@link SourceTable}, can be used by downstream consumers for + * syncing it to multiple {@link org.apache.xtable.conversion.TargetTable} + */ +public interface CatalogConversionSource { + /** Returns the source table object present in the catalog. */ + SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier); +} diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java index 2500454cb..21f7f63f2 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java @@ -41,6 +41,15 @@ public interface ConversionSource extends Closeable { */ InternalTable getTable(COMMIT commit); + /** + * Extracts the {@link InternalTable} as of latest state. This method is less expensive as + * compared to {@link ConversionSource#getCurrentSnapshot()} as it doesn't load the files present + * in the table. + * + * @return {@link InternalTable} representing the current table. + */ + InternalTable getCurrentTable(); + /** * Extracts the {@link InternalSnapshot} as of latest state. * diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java new file mode 100644 index 000000000..e76d59068 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import static org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.sync.SyncResult; +import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus; + +/** Provides the functionality to sync metadata from InternalTable to multiple target catalogs */ +@Log4j2 +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class CatalogSync { + private static final CatalogSync INSTANCE = new CatalogSync(); + + public static CatalogSync getInstance() { + return INSTANCE; + } + + public SyncResult syncTable( + Map catalogSyncClients, InternalTable table) { + List results = new ArrayList<>(); + Instant startTime = Instant.now(); + catalogSyncClients.forEach( + ((tableIdentifier, catalogSyncClient) -> { + try { + results.add(syncCatalog(catalogSyncClient, tableIdentifier, table)); + log.info( + "Catalog sync is successful for table {} with format {} using catalogSync {}", + table.getBasePath(), + table.getTableFormat(), + catalogSyncClient.getClass().getName()); + } catch (Exception e) { + log.error( + "Catalog sync failed for table {} with format {} using catalogSync {}", + table.getBasePath(), + table.getTableFormat(), + catalogSyncClient.getClass().getName()); + results.add( + getCatalogSyncFailureStatus( + catalogSyncClient.getCatalogId(), catalogSyncClient.getClass().getName(), e)); + } + })); + return SyncResult.builder() + .lastInstantSynced(table.getLatestCommitTime()) + .syncStartTime(startTime) + .syncDuration(Duration.between(startTime, Instant.now())) + .catalogSyncStatusList(results) + .build(); + } + + private CatalogSyncStatus syncCatalog( + CatalogSyncClient
catalogSyncClient, + CatalogTableIdentifier tableIdentifier, + InternalTable table) { + if (!catalogSyncClient.hasDatabase(tableIdentifier)) { + catalogSyncClient.createDatabase(tableIdentifier); + } + TABLE catalogTable = catalogSyncClient.getTable(tableIdentifier); + String storageDescriptorLocation = catalogSyncClient.getStorageLocation(catalogTable); + if (catalogTable == null) { + catalogSyncClient.createTable(table, tableIdentifier); + } else if (hasStorageDescriptorLocationChanged( + storageDescriptorLocation, table.getBasePath())) { + // Replace table if there is a mismatch between hmsTable location and Xtable basePath. + // Possible reasons could be: + // 1) hms table (manually) created with a different location before and need to be + // re-created with a new basePath + // 2) XTable basePath changes due to migration or other reasons + String oldLocation = + StringUtils.isEmpty(storageDescriptorLocation) ? "null" : storageDescriptorLocation; + log.warn( + "StorageDescriptor location changed from {} to {}, re-creating table", + oldLocation, + table.getBasePath()); + catalogSyncClient.createOrReplaceTable(table, tableIdentifier); + } else { + log.debug("Table metadata changed, refreshing table"); + catalogSyncClient.refreshTable(table, catalogTable, tableIdentifier); + } + return CatalogSyncStatus.builder() + .catalogId(catalogSyncClient.getCatalogId()) + .statusCode(SyncResult.SyncStatusCode.SUCCESS) + .build(); + } + + private CatalogSyncStatus getCatalogSyncFailureStatus( + String catalogId, String catalogImpl, Exception e) { + return CatalogSyncStatus.builder() + .catalogId(catalogId) + .statusCode(SyncResult.SyncStatusCode.ERROR) + .errorDetails( + SyncResult.ErrorDetails.builder() + .errorMessage(e.getMessage()) + .errorDescription("catalogSync failed for " + catalogImpl) + .build()) + .build(); + } +} diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java new file mode 100644 index 000000000..62de93793 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; + +/** + * An interface for syncing {@link InternalTable} object to {@link TABLE} object defined by the + * catalog. + * + * @param
+ */ +public interface CatalogSyncClient
extends AutoCloseable { + /** + * Returns the user-defined unique identifier for the catalog, allows user to sync table to + * multiple catalogs of the same name/type eg: HMS catalog with url1, HMS catalog with url2. + */ + String getCatalogId(); + + /** Returns the storage location of the table synced to the catalog. */ + String getStorageLocation(TABLE table); + + /** Checks whether a database used by tableIdentifier exists in the catalog. */ + boolean hasDatabase(CatalogTableIdentifier tableIdentifier); + + /** Creates a database used by tableIdentifier in the catalog. */ + void createDatabase(CatalogTableIdentifier tableIdentifier); + + /** + * Return the TABLE object used by the catalog implementation. Eg: HMS uses + * org.apache.hadoop.hive.metastore.api.Table, Glue uses + * software.amazon.awssdk.services.glue.model.Table etc. + */ + TABLE getTable(CatalogTableIdentifier tableIdentifier); + + /** + * Create a table in the catalog using the canonical InternalTable representation and + * tableIdentifier. + */ + void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier); + + /** Refreshes the table metadata in the catalog with tableIdentifier. */ + void refreshTable( + InternalTable table, TABLE catalogTable, CatalogTableIdentifier tableIdentifier); + + /** + * Tries to re-create the table in the catalog replacing state with the new canonical + * InternalTable representation and tableIdentifier. + */ + void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier); + + /** Drops a table from the catalog. */ + void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier); +} diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java new file mode 100644 index 000000000..cd3d6d5de --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import java.net.URI; +import java.util.Objects; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; + +import org.apache.xtable.model.exception.CatalogRefreshException; + +/** Utility methods used by CatalogSync. */ +public class CatalogUtils { + + /** + * Returns whether the location of the table in catalog is same as the one currently in storage. + * + * @param storageDescriptorLocation location of the table in catalog. + * @param tableBasePath location of the table in source table. + * @return equality of both the locations. + */ + static boolean hasStorageDescriptorLocationChanged( + String storageDescriptorLocation, String tableBasePath) { + + if (StringUtils.isEmpty(storageDescriptorLocation)) { + return true; + } + URI storageDescriptorUri = new Path(storageDescriptorLocation).toUri(); + URI basePathUri = new Path(tableBasePath).toUri(); + + if (storageDescriptorUri.equals(basePathUri) + || storageDescriptorUri.getScheme() == null + || basePathUri.getScheme() == null + || storageDescriptorUri.getScheme().startsWith(basePathUri.getScheme()) + || basePathUri.getScheme().startsWith(storageDescriptorUri.getScheme())) { + String storageDescriptorLocationIdentifier = + storageDescriptorUri.getAuthority() + storageDescriptorUri.getPath(); + String tableBasePathIdentifier = basePathUri.getAuthority() + basePathUri.getPath(); + return !Objects.equals(storageDescriptorLocationIdentifier, tableBasePathIdentifier); + } + throw new CatalogRefreshException( + String.format( + "Storage scheme has changed for table catalogStorageDescriptorUri %s basePathUri %s", + storageDescriptorUri, basePathUri)); + } +} diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java index 7cd0b384f..bb3406696 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java @@ -166,7 +166,7 @@ private SyncResult getSyncResult( return SyncResult.builder() .mode(mode) - .status(SyncResult.SyncStatus.SUCCESS) + .tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS) .syncStartTime(startTime) .syncDuration(Duration.between(startTime, Instant.now())) .lastInstantSynced(tableState.getLatestCommitTime()) @@ -181,12 +181,15 @@ private interface SyncFiles { private SyncResult buildResultForError(SyncMode mode, Instant startTime, Exception e) { return SyncResult.builder() .mode(mode) - .status( + .tableFormatSyncStatus( SyncResult.SyncStatus.builder() .statusCode(SyncResult.SyncStatusCode.ERROR) - .errorMessage(e.getMessage()) - .errorDescription("Failed to sync " + mode.name()) - .canRetryOnFailure(true) + .errorDetails( + SyncResult.ErrorDetails.builder() + .errorMessage(e.getMessage()) + .errorDescription("Failed to sync " + mode.name()) + .canRetryOnFailure(true) + .build()) .build()) .syncStartTime(startTime) .syncDuration(Duration.between(startTime, Instant.now())) diff --git a/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestThreePartHierarchicalTableIdentifier.java b/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestThreePartHierarchicalTableIdentifier.java new file mode 100644 index 000000000..5f10be0b5 --- /dev/null +++ b/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestThreePartHierarchicalTableIdentifier.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.catalog; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestThreePartHierarchicalTableIdentifier { + + @Test + void testGetId() { + ThreePartHierarchicalTableIdentifier catalogTableIdentifier = + ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier( + "catalogName.databaseName.tableName"); + assertEquals("catalogName.databaseName.tableName", catalogTableIdentifier.getId()); + } + + @Test + void testConstructorForHierarchicalTableIdentifier() { + Assertions.assertDoesNotThrow( + () -> + ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier( + "catalogName.databaseName.tableName")); + Assertions.assertDoesNotThrow( + () -> + ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier( + "databaseName.tableName")); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier("tableName")); + } +} diff --git a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogSync.java b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogSync.java new file mode 100644 index 000000000..2f4fbdc9f --- /dev/null +++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogSync.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.google.common.collect.ImmutableMap; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.sync.SyncResult; + +@ExtendWith(MockitoExtension.class) +public class TestCatalogSync
{ + + @Mock CatalogSyncClient
mockClient1; + @Mock CatalogSyncClient
mockClient2; + @Mock CatalogSyncClient
mockClient3; + @Mock CatalogSyncClient
mockClient4; + + private final ThreePartHierarchicalTableIdentifier tableIdentifier1 = + new ThreePartHierarchicalTableIdentifier("database1", "table1"); + private final ThreePartHierarchicalTableIdentifier tableIdentifier2 = + new ThreePartHierarchicalTableIdentifier("database2", "table2"); + private final ThreePartHierarchicalTableIdentifier tableIdentifier3 = + new ThreePartHierarchicalTableIdentifier("database3", "table3"); + private final ThreePartHierarchicalTableIdentifier tableIdentifier4 = + new ThreePartHierarchicalTableIdentifier("database4", "table4"); + + @Mock TABLE mockTable; + private final InternalTable internalTable = + InternalTable.builder() + .readSchema(InternalSchema.builder().name("test_schema").build()) + .partitioningFields( + Collections.singletonList( + InternalPartitionField.builder() + .sourceField(InternalField.builder().name("partition_field").build()) + .transformType(PartitionTransformType.VALUE) + .build())) + .latestCommitTime(Instant.now().minus(10, ChronoUnit.MINUTES)) + .basePath("/tmp/test") + .build(); + + @Test + void testSyncTable() { + when(mockClient1.hasDatabase(tableIdentifier1)).thenReturn(false); + when(mockClient2.hasDatabase(tableIdentifier2)).thenReturn(true); + when(mockClient3.hasDatabase(tableIdentifier3)).thenReturn(true); + when(mockClient4.hasDatabase(tableIdentifier4)) + .thenThrow(new UnsupportedOperationException("No catalog impl")); + + when(mockClient1.getTable(tableIdentifier1)).thenReturn(mockTable); + when(mockClient2.getTable(tableIdentifier2)).thenReturn(null); + when(mockClient3.getTable(tableIdentifier3)).thenReturn(mockTable); + + when(mockClient1.getStorageLocation(any())).thenReturn("/tmp/test_changed"); + when(mockClient2.getStorageLocation(any())).thenReturn("/tmp/test"); + when(mockClient3.getStorageLocation(any())).thenReturn("/tmp/test"); + + when(mockClient4.getCatalogId()).thenReturn("catalogId4"); + + Map catalogSyncClients = + ImmutableMap.of( + tableIdentifier1, mockClient1, + tableIdentifier2, mockClient2, + tableIdentifier3, mockClient3, + tableIdentifier4, mockClient4); + + List results = + CatalogSync.getInstance() + .syncTable(catalogSyncClients, internalTable) + .getCatalogSyncStatusList(); + List errorStatus = + results.stream() + .filter(status -> status.getStatusCode().equals(SyncResult.SyncStatusCode.ERROR)) + .collect(Collectors.toList()); + assertEquals(SyncResult.SyncStatusCode.ERROR, errorStatus.get(0).getStatusCode()); + assertEquals( + 3, + results.stream() + .map(SyncResult.CatalogSyncStatus::getStatusCode) + .filter(statusCode -> statusCode.equals(SyncResult.SyncStatusCode.SUCCESS)) + .count()); + + verify(mockClient1, times(1)).createDatabase(tableIdentifier1); + verify(mockClient1, times(1)).createOrReplaceTable(internalTable, tableIdentifier1); + verify(mockClient2, times(1)).createTable(eq(internalTable), eq(tableIdentifier2)); + verify(mockClient3, times(1)).refreshTable(eq(internalTable), any(), eq(tableIdentifier3)); + } +} diff --git a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java new file mode 100644 index 000000000..69575e8e7 --- /dev/null +++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.spi.sync; + +import static org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.stream.Stream; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import org.apache.xtable.model.exception.CatalogRefreshException; + +public class TestCatalogUtils { + + static Stream storageLocationTestArgs() { + return Stream.of( + Arguments.of("s3://bucket/table/v1", "s3://bucket/table/v2", true), + Arguments.of("s3://bucket/table1/v1", "s3://bucket/table2/v1", true), + Arguments.of("file:///var/lib/bucket/table/v1", "file:///var/lib/bucket/table/v2/", true), + Arguments.of("s3://bucket/table/v1", "s3://bucket/table/v1", false), + Arguments.of("s3a://bucket/table/v1", "s3://bucket/table/v1/", false), + Arguments.of("s3://bucket/table/v1", "s3a://bucket/table/v1", false), + Arguments.of("s3://bucket/table/v1/", "s3a://bucket/table/v1", false), + Arguments.of("/var/lib/bucket/table/v1", "/var/lib/bucket/table/v1/", false), + Arguments.of("file:///var/lib/bucket/table/v1", "file:///var/lib/bucket/table/v1/", false)); + } + + static Stream storageLocationTestArgsException() { + return Stream.of( + Arguments.of( + "s3://bucket/table/v1", + "gs://bucket/table/v1", + new CatalogRefreshException( + "Storage scheme has changed for table catalogStorageDescriptorUri s3://bucket/table/v1 basePathUri gs://bucket/table/v1"))); + } + + @ParameterizedTest + @MethodSource("storageLocationTestArgs") + void testHasStorageLocationChanged(String storageLocation, String basePath, boolean expected) { + assertEquals(expected, hasStorageDescriptorLocationChanged(storageLocation, basePath)); + } + + @ParameterizedTest + @MethodSource("storageLocationTestArgsException") + void testHasStorageLocationChangedException( + String storageLocation, String basePath, Exception exception) { + assertThrows( + exception.getClass(), + () -> hasStorageDescriptorLocationChanged(storageLocation, basePath), + exception.getMessage()); + } +} diff --git a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java index 2a9e05886..39480f8b0 100644 --- a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java +++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java @@ -88,7 +88,7 @@ void syncSnapshotWithFailureForOneFormat() { assertEquals(2, result.size()); SyncResult successResult = result.get(TableFormat.DELTA); - assertEquals(SyncResult.SyncStatus.SUCCESS, successResult.getStatus()); + assertEquals(SyncResult.SyncStatus.SUCCESS, successResult.getTableFormatSyncStatus()); assertEquals(SyncMode.FULL, successResult.getMode()); assertEquals(startingTableState.getLatestCommitTime(), successResult.getLastInstantSynced()); assertSyncResultTimes(successResult, start); @@ -99,11 +99,14 @@ void syncSnapshotWithFailureForOneFormat() { assertEquals( SyncResult.SyncStatus.builder() .statusCode(SyncResult.SyncStatusCode.ERROR) - .errorMessage("Failure") - .errorDescription("Failed to sync FULL") - .canRetryOnFailure(true) + .errorDetails( + SyncResult.ErrorDetails.builder() + .errorMessage("Failure") + .errorDescription("Failed to sync FULL") + .canRetryOnFailure(true) + .build()) .build(), - failureResult.getStatus()); + failureResult.getTableFormatSyncStatus()); verifyBaseConversionTargetCalls( mockConversionTarget2, startingTableState, pendingCommitInstants); @@ -168,7 +171,8 @@ void syncChangesWithFailureForOneFormat() { assertEquals( tableChanges.get(0).getTableAsOfChange().getLatestCommitTime(), partialSuccessResults.get(0).getLastInstantSynced()); - assertEquals(SyncResult.SyncStatus.SUCCESS, partialSuccessResults.get(0).getStatus()); + assertEquals( + SyncResult.SyncStatus.SUCCESS, partialSuccessResults.get(0).getTableFormatSyncStatus()); assertSyncResultTimes(partialSuccessResults.get(0), start); assertEquals(SyncMode.INCREMENTAL, partialSuccessResults.get(1).getMode()); @@ -176,11 +180,14 @@ void syncChangesWithFailureForOneFormat() { assertEquals( SyncResult.SyncStatus.builder() .statusCode(SyncResult.SyncStatusCode.ERROR) - .errorMessage("Failure") - .errorDescription("Failed to sync INCREMENTAL") - .canRetryOnFailure(true) + .errorDetails( + SyncResult.ErrorDetails.builder() + .errorMessage("Failure") + .errorDescription("Failed to sync INCREMENTAL") + .canRetryOnFailure(true) + .build()) .build(), - partialSuccessResults.get(1).getStatus()); + partialSuccessResults.get(1).getTableFormatSyncStatus()); // Assert that all 3 changes are properly synced to the other format List successResults = result.get(TableFormat.DELTA); @@ -190,7 +197,7 @@ void syncChangesWithFailureForOneFormat() { assertEquals( tableChanges.get(i).getTableAsOfChange().getLatestCommitTime(), successResults.get(i).getLastInstantSynced()); - assertEquals(SyncResult.SyncStatus.SUCCESS, successResults.get(i).getStatus()); + assertEquals(SyncResult.SyncStatus.SUCCESS, successResults.get(i).getTableFormatSyncStatus()); assertSyncResultTimes(successResults.get(i), start); } @@ -257,7 +264,8 @@ void syncChangesWithDifferentFormatsAndMetadata() { assertEquals(2, conversionTarget1Results.size()); for (SyncResult conversionTarget1Result : conversionTarget1Results) { assertEquals(SyncMode.INCREMENTAL, conversionTarget1Result.getMode()); - assertEquals(SyncResult.SyncStatus.SUCCESS, conversionTarget1Result.getStatus()); + assertEquals( + SyncResult.SyncStatus.SUCCESS, conversionTarget1Result.getTableFormatSyncStatus()); assertSyncResultTimes(conversionTarget1Result, start); } assertEquals( @@ -275,7 +283,9 @@ void syncChangesWithDifferentFormatsAndMetadata() { assertEquals( tableChanges.get(i + 1).getTableAsOfChange().getLatestCommitTime(), conversionTarget2Results.get(i).getLastInstantSynced()); - assertEquals(SyncResult.SyncStatus.SUCCESS, conversionTarget2Results.get(i).getStatus()); + assertEquals( + SyncResult.SyncStatus.SUCCESS, + conversionTarget2Results.get(i).getTableFormatSyncStatus()); assertSyncResultTimes(conversionTarget2Results.get(i), start); } @@ -330,7 +340,7 @@ void syncChangesOneFormatWithNoRequiredChanges() { conversionTarget2Results.forEach( syncResult -> { assertEquals(SyncMode.INCREMENTAL, syncResult.getMode()); - assertEquals(SyncResult.SyncStatus.SUCCESS, syncResult.getStatus()); + assertEquals(SyncResult.SyncStatus.SUCCESS, syncResult.getTableFormatSyncStatus()); assertSyncResultTimes(syncResult, start); }); diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java index bc5f5e02d..222652a6d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java @@ -85,7 +85,7 @@ public Map sync( if (config.getTargetTables() == null || config.getTargetTables().isEmpty()) { throw new IllegalArgumentException("Please provide at-least one format to sync"); } - config = ConversionUtils.normalizeTargetPaths(config); + try (ConversionSource conversionSource = conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) { ExtractFromSource source = ExtractFromSource.of(conversionSource); @@ -143,7 +143,7 @@ public Map sync( private static String getFormatsWithStatusCode( Map syncResultsMerged, SyncResult.SyncStatusCode statusCode) { return syncResultsMerged.entrySet().stream() - .filter(entry -> entry.getValue().getStatus().getStatusCode() == statusCode) + .filter(entry -> entry.getValue().getTableFormatSyncStatus().getStatusCode() == statusCode) .map(Map.Entry::getKey) .collect(Collectors.joining(",")); } diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java deleted file mode 100644 index fdeedc9bb..000000000 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.xtable.conversion; - -import java.util.List; -import java.util.stream.Collectors; - -import org.apache.xtable.model.storage.TableFormat; - -public class ConversionUtils { - - /** - * Few table formats need the metadata to be located at the root level of the data files. Eg: An - * iceberg table generated through spark will have two directories basePath/data and - * basePath/metadata For synchronising the iceberg metadata to hudi and delta, they need to be - * present in basePath/data/.hoodie and basePath/data/_delta_log. - * - * @param config conversion config for synchronizing source and target tables - * @return updated table config. - */ - public static ConversionConfig normalizeTargetPaths(ConversionConfig config) { - if (!config.getSourceTable().getDataPath().equals(config.getSourceTable().getBasePath()) - && config.getSourceTable().getFormatName().equals(TableFormat.ICEBERG)) { - List updatedTargetTables = - config.getTargetTables().stream() - .filter( - targetTable -> - targetTable.getFormatName().equals(TableFormat.HUDI) - || targetTable.getFormatName().equals(TableFormat.DELTA)) - .map( - targetTable -> - targetTable.toBuilder() - .basePath(config.getSourceTable().getDataPath()) - .build()) - .collect(Collectors.toList()); - return new ConversionConfig( - config.getSourceTable(), updatedTargetTables, config.getSyncMode()); - } - return config; - } -} diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index a5937b022..140eb8adc 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -83,6 +83,13 @@ public InternalTable getTable(Long version) { return tableExtractor.table(deltaLog, tableName, version); } + @Override + public InternalTable getCurrentTable() { + DeltaLog deltaLog = DeltaLog.forTable(sparkSession, basePath); + Snapshot snapshot = deltaLog.snapshot(); + return getTable(snapshot.version()); + } + @Override public InternalSnapshot getCurrentSnapshot() { DeltaLog deltaLog = DeltaLog.forTable(sparkSession, basePath); diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java index 1b1d0bf37..02423c2d6 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java @@ -76,6 +76,19 @@ public InternalTable getTable(HoodieInstant commit) { return tableExtractor.table(metaClient, commit); } + @Override + public InternalTable getCurrentTable() { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieTimeline completedTimeline = activeTimeline.filterCompletedInstants(); + // get latest commit + HoodieInstant latestCommit = + completedTimeline + .lastInstant() + .orElseThrow( + () -> new ReadException("Unable to read latest commit from Hudi source table")); + return getTable(latestCommit); + } + @Override public InternalSnapshot getCurrentSnapshot() { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index f96ec7149..c84fb196b 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -131,6 +131,13 @@ public InternalTable getTable(Snapshot snapshot) { .build(); } + @Override + public InternalTable getCurrentTable() { + Table iceTable = getSourceTable(); + Snapshot currentSnapshot = iceTable.currentSnapshot(); + return getTable(currentSnapshot); + } + @Override public InternalSnapshot getCurrentSnapshot() { Table iceTable = getSourceTable(); diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index b5ffcdf13..3d539766a 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -1013,7 +1013,8 @@ private static ConversionConfig getTableSyncConfig( TargetTable.builder() .name(tableName) .formatName(formatName) - .basePath(table.getBasePath()) + // set the metadata path to the data path as the default (required by Hudi) + .basePath(table.getDataPath()) .metadataRetention(metadataRetention) .build()) .collect(Collectors.toList()); diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java index 652bbe426..caba80468 100644 --- a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java @@ -392,7 +392,7 @@ private SyncResult buildSyncResult(SyncMode syncMode, Instant lastSyncedInstant) return SyncResult.builder() .mode(syncMode) .lastInstantSynced(lastSyncedInstant) - .status(SyncResult.SyncStatus.SUCCESS) + .tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS) .build(); } diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java deleted file mode 100644 index b1044039f..000000000 --- a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.xtable.conversion; - -import static org.junit.jupiter.api.Assertions.*; - -import java.util.Arrays; - -import org.junit.jupiter.api.Test; - -import org.apache.xtable.model.storage.TableFormat; -import org.apache.xtable.model.sync.SyncMode; - -class TestConversionUtils { - - @Test - void testNormalizeTargetPaths() { - ConversionConfig config = - ConversionConfig.builder() - .sourceTable( - SourceTable.builder() - .name("table_name") - .formatName(TableFormat.ICEBERG) - .basePath("/tmp/basePath") - .dataPath("/tmp/basePath/data") - .build()) - .syncMode(SyncMode.FULL) - .targetTables( - Arrays.asList( - TargetTable.builder() - .name("table_name") - .basePath("/tmp/basePath") - .formatName(TableFormat.DELTA) - .build(), - TargetTable.builder() - .name("table_name") - .basePath("/tmp/basePath") - .formatName(TableFormat.HUDI) - .build())) - .build(); - ConversionConfig expectedNormalizedConfig = - ConversionConfig.builder() - .sourceTable( - SourceTable.builder() - .name("table_name") - .formatName(TableFormat.ICEBERG) - .basePath("/tmp/basePath") - .dataPath("/tmp/basePath/data") - .build()) - .syncMode(SyncMode.FULL) - .targetTables( - Arrays.asList( - TargetTable.builder() - .name("table_name") - .basePath("/tmp/basePath/data") - .formatName(TableFormat.DELTA) - .build(), - TargetTable.builder() - .name("table_name") - .basePath("/tmp/basePath/data") - .formatName(TableFormat.HUDI) - .build())) - .build(); - ConversionConfig actualConfig = ConversionUtils.normalizeTargetPaths(config); - assertEquals(expectedNormalizedConfig, actualConfig); - } - - @Test - void testNormalizeTargetPathsNoOp() { - ConversionConfig config = - ConversionConfig.builder() - .sourceTable( - SourceTable.builder() - .name("table_name") - .formatName(TableFormat.HUDI) - .basePath("/tmp/basePath") - .build()) - .syncMode(SyncMode.FULL) - .targetTables( - Arrays.asList( - TargetTable.builder() - .name("table_name") - .basePath("/tmp/basePath") - .formatName(TableFormat.ICEBERG) - .build(), - TargetTable.builder() - .name("table_name") - .basePath("/tmp/basePath") - .formatName(TableFormat.DELTA) - .build())) - .build(); - ConversionConfig actualConfig = ConversionUtils.normalizeTargetPaths(config); - assertEquals(config, actualConfig); - } -} diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java index 8fcf07533..ca8bc3fa0 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java @@ -18,6 +18,7 @@ package org.apache.xtable.delta; +import static org.apache.xtable.testutil.ITTestUtils.validateTable; import static org.junit.jupiter.api.Assertions.*; import java.net.URI; @@ -208,6 +209,44 @@ void getCurrentSnapshotNonPartitionedTest() throws URISyntaxException { snapshot.getPartitionedDataFiles().get(0)); } + @Test + void getCurrentTableTest() { + // Table name + final String tableName = GenericTable.getTableName(); + final Path basePath = tempDir.resolve(tableName); + // Create table with a single row using Spark + sparkSession.sql( + "CREATE TABLE `" + + tableName + + "` USING DELTA LOCATION '" + + basePath + + "' AS SELECT * FROM VALUES (1, 2)"); + // Create Delta source + SourceTable tableConfig = + SourceTable.builder() + .name(tableName) + .basePath(basePath.toString()) + .formatName(TableFormat.DELTA) + .build(); + DeltaConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance(tableConfig); + // Get current table + InternalTable internalTable = conversionSource.getCurrentTable(); + List fields = Arrays.asList(COL1_INT_FIELD, COL2_INT_FIELD); + validateTable( + internalTable, + tableName, + TableFormat.DELTA, + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .fields(fields) + .build(), + DataLayoutStrategy.FLAT, + "file:" + basePath, + Collections.emptyList()); + } + @Test void getCurrentSnapshotPartitionedTest() throws URISyntaxException { // Table name @@ -660,22 +699,6 @@ private void validateDeltaPartitioning(InternalSnapshot internalSnapshot) { assertEquals(PartitionTransformType.YEAR, partitionField.getTransformType()); } - private static void validateTable( - InternalTable internalTable, - String tableName, - String tableFormat, - InternalSchema readSchema, - DataLayoutStrategy dataLayoutStrategy, - String basePath, - List partitioningFields) { - Assertions.assertEquals(tableName, internalTable.getName()); - Assertions.assertEquals(tableFormat, internalTable.getTableFormat()); - Assertions.assertEquals(readSchema, internalTable.getReadSchema()); - Assertions.assertEquals(dataLayoutStrategy, internalTable.getLayoutStrategy()); - Assertions.assertEquals(basePath, internalTable.getBasePath()); - Assertions.assertEquals(partitioningFields, internalTable.getPartitioningFields()); - } - private void validatePartitionDataFiles( PartitionFileGroup expectedPartitionFiles, PartitionFileGroup actualPartitionFiles) throws URISyntaxException { diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java index c074bc23f..3debf9043 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java @@ -19,6 +19,7 @@ package org.apache.xtable.hudi; import static java.util.stream.Collectors.groupingBy; +import static org.apache.xtable.testutil.ITTestUtils.validateTable; import static org.junit.jupiter.api.Assertions.*; import java.nio.file.Path; @@ -26,6 +27,7 @@ import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -38,6 +40,7 @@ import lombok.Builder; import lombok.Value; +import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -65,7 +68,13 @@ import org.apache.xtable.model.CommitsBacklog; import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; +import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.TableFormat; /** * A suite of functional tests that the extraction from Hudi to Intermediate representation works. @@ -98,6 +107,122 @@ public static void teardown() { } } + @Test + void getCurrentTableTest() { + String tableName = GenericTable.getTableName(); + Path basePath = tempDir.resolve(tableName); + HudiTestUtil.PartitionConfig partitionConfig = HudiTestUtil.PartitionConfig.of(null, null); + Schema schema = + Schema.createRecord( + "testCurrentTable", + null, + "hudi", + false, + Arrays.asList( + new Schema.Field("field1", Schema.create(Schema.Type.STRING)), + new Schema.Field("field2", Schema.create(Schema.Type.STRING)))); + try (TestJavaHudiTable table = + TestJavaHudiTable.withSchema( + tableName, + tempDir, + HudiTestUtil.PartitionConfig.of(null, null).getHudiConfig(), + HoodieTableType.MERGE_ON_READ, + schema)) { + table.insertRecords(5, Collections.emptyList(), false); + HudiConversionSource hudiClient = + getHudiSourceClient( + CONFIGURATION, table.getBasePath(), partitionConfig.getXTableConfig()); + InternalTable internalTable = hudiClient.getCurrentTable(); + InternalSchema internalSchema = + InternalSchema.builder() + .name("testCurrentTable") + .dataType(InternalType.RECORD) + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("_hoodie_commit_time") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(), + InternalField.builder() + .name("_hoodie_commit_seqno") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(), + InternalField.builder() + .name("_hoodie_record_key") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(), + InternalField.builder() + .name("_hoodie_partition_path") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(), + InternalField.builder() + .name("_hoodie_file_name") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(), + InternalField.builder() + .name("field1") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .defaultValue(null) + .build(), + InternalField.builder() + .name("field2") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .defaultValue(null) + .build())) + .recordKeyFields(Collections.singletonList(null)) + .build(); + validateTable( + internalTable, + tableName, + TableFormat.HUDI, + internalSchema, + DataLayoutStrategy.FLAT, + "file:" + basePath + "_v1", + Collections.emptyList()); + } + } + @ParameterizedTest @MethodSource("testsForAllTableTypesAndPartitions") public void insertAndUpsertData( diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java index 3f20ac9db..acd886887 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java @@ -21,6 +21,7 @@ import static org.apache.xtable.GenericTable.getTableName; import static org.apache.xtable.ValidationTestHelper.validateSnapshot; import static org.apache.xtable.ValidationTestHelper.validateTableChanges; +import static org.apache.xtable.testutil.ITTestUtils.validateTable; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -29,6 +30,8 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -49,9 +52,14 @@ import org.apache.xtable.model.CommitsBacklog; import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; +import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.storage.DataLayoutStrategy; import org.apache.xtable.model.storage.TableFormat; public class ITIcebergConversionTargetSource { @@ -66,6 +74,67 @@ void setup() { sourceProvider.init(hadoopConf); } + @Test + void getCurrentTableTest() { + String tableName = getTableName(); + try (TestIcebergTable testIcebergTable = + new TestIcebergTable( + tableName, + tempDir, + hadoopConf, + "field1", + Collections.singletonList(null), + TestIcebergDataHelper.SchemaType.BASIC)) { + testIcebergTable.insertRows(50); + SourceTable tableConfig = + SourceTable.builder() + .name(testIcebergTable.getTableName()) + .basePath(testIcebergTable.getBasePath()) + .formatName(TableFormat.ICEBERG) + .build(); + IcebergConversionSource conversionSource = + sourceProvider.getConversionSourceInstance(tableConfig); + InternalTable internalTable = conversionSource.getCurrentTable(); + InternalSchema internalSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields( + Arrays.asList( + InternalField.builder() + .name("field1") + .fieldId(1) + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build(), + InternalField.builder() + .name("field2") + .fieldId(2) + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build())) + .build(); + validateTable( + internalTable, + testIcebergTable.getBasePath(), + TableFormat.ICEBERG, + internalSchema, + DataLayoutStrategy.FLAT, + testIcebergTable.getBasePath(), + Collections.emptyList()); + } + } + @ParameterizedTest @ValueSource(booleans = {false, true}) public void testInsertsUpsertsAndDeletes(boolean isPartitioned) { diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java index 1d10fe7af..d90ba169f 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java @@ -56,6 +56,10 @@ @Value public class TestIcebergDataHelper { private static final Random RANDOM = new Random(); + private static final List BASIC_FIELDS = + Arrays.asList( + NestedField.optional(1, "field1", Types.StringType.get()), + NestedField.optional(2, "field2", Types.StringType.get())); private static final List COMMON_FIELDS = Arrays.asList( NestedField.optional(1, "id", Types.StringType.get()), @@ -114,6 +118,7 @@ public class TestIcebergDataHelper { private static final Schema SCHEMA_WITH_UUID_COLUMN = new Schema( Stream.concat(COMMON_FIELDS.stream(), UUID_FIELDS.stream()).collect(Collectors.toList())); + private static final Schema BASIC_SCHEMA = new Schema(BASIC_FIELDS); private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); @@ -122,6 +127,7 @@ public class TestIcebergDataHelper { List partitionFieldNames; public static enum SchemaType { + BASIC, COMMON, COMMON_WITH_ADDITIONAL_COLUMNS, COMMON_WITH_UUID_COLUMN, @@ -145,6 +151,8 @@ private static Schema getSchema(SchemaType schemaType) { return SCHEMA_WITH_ADDITIONAL_COLUMNS; case COMMON_WITH_UUID_COLUMN: return SCHEMA_WITH_UUID_COLUMN; + case BASIC: + return BASIC_SCHEMA; default: throw new IllegalArgumentException("Unknown schema type: " + schemaType); } diff --git a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java new file mode 100644 index 000000000..281e61fe1 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.testutil; + +import java.util.List; + +import org.junit.jupiter.api.Assertions; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.DataLayoutStrategy; + +public class ITTestUtils { + + public static void validateTable( + InternalTable internalTable, + String tableName, + String tableFormat, + InternalSchema readSchema, + DataLayoutStrategy dataLayoutStrategy, + String basePath, + List partitioningFields) { + Assertions.assertEquals(tableName, internalTable.getName()); + Assertions.assertEquals(tableFormat, internalTable.getTableFormat()); + Assertions.assertEquals(readSchema, internalTable.getReadSchema()); + Assertions.assertEquals(dataLayoutStrategy, internalTable.getLayoutStrategy()); + Assertions.assertEquals(basePath, internalTable.getBasePath()); + Assertions.assertEquals(partitioningFields, internalTable.getPartitioningFields()); + } +} diff --git a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java index a9653eb51..7491cec41 100644 --- a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java +++ b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java @@ -106,7 +106,7 @@ public void syncHoodieTable() { results.entrySet().stream() .filter( entry -> - entry.getValue().getStatus().getStatusCode() + entry.getValue().getTableFormatSyncStatus().getStatusCode() != SyncResult.SyncStatusCode.SUCCESS) .map(Map.Entry::getKey) .collect(Collectors.joining(","));