Skip to content

Commit

Permalink
Address comments except SourceProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
vinishjail97 committed Dec 31, 2024
1 parent 89cf227 commit 0d5b763
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@
public interface CatalogConversionSource {
/** Returns the source table object present in the catalog. */
SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier);

/** Returns the {@link org.apache.xtable.model.storage.CatalogType} for the catalog conversion */
String getCatalogType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public interface CatalogSyncClient<TABLE> extends AutoCloseable {
*/
String getCatalogId();

/** Returns the {@link org.apache.xtable.model.storage.CatalogType} the client syncs to */
String getCatalogType();

/** Returns the storage location of the table synced to the catalog. */
String getStorageLocation(TABLE table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@
package org.apache.xtable.catalog;

import java.util.Map;
import java.util.ServiceLoader;
import java.util.function.Function;

import org.apache.xtable.conversion.ExternalCatalogConfig;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.spi.extractor.CatalogConversionSource;
import org.apache.xtable.spi.sync.CatalogSyncClient;

/** A factory class which returns {@link ExternalCatalogConfig} based on catalogType. */
public class ExternalCatalogConfigFactory {

public static ExternalCatalogConfig fromCatalogType(
String catalogType, String catalogId, Map<String, String> properties) {
// TODO: Choose existing implementation based on catalogType.
String catalogSyncClientImpl = "";
String catalogConversionSourceImpl = "";
String catalogSyncClientImpl =
findImplClassName(CatalogSyncClient.class, catalogType, CatalogSyncClient::getCatalogType);
String catalogConversionSourceImpl =
findImplClassName(
CatalogConversionSource.class, catalogType, CatalogConversionSource::getCatalogType);
;
return ExternalCatalogConfig.builder()
.catalogType(catalogType)
.catalogSyncClientImpl(catalogSyncClientImpl)
Expand All @@ -38,4 +46,16 @@ public static ExternalCatalogConfig fromCatalogType(
.catalogProperties(properties)
.build();
}

private static <T> String findImplClassName(
Class<T> serviceClass, String catalogType, Function<T, String> catalogTypeExtractor) {
ServiceLoader<T> loader = ServiceLoader.load(serviceClass);
for (T instance : loader) {
String instanceCatalogType = catalogTypeExtractor.apply(instance);
if (catalogType.equals(instanceCatalogType)) {
return instance.getClass().getName();
}
}
throw new NotSupportedException("catalogType is not yet supported: " + catalogType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

import static org.apache.xtable.testutil.ITTestUtils.TEST_CATALOG_TYPE;
import static org.junit.jupiter.api.Assertions.*;

import java.util.Collections;
import java.util.UUID;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import org.apache.xtable.conversion.ExternalCatalogConfig;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.testutil.ITTestUtils;

class TestExternalCatalogConfigFactory {

@Test
void testFromCatalogType() {
ExternalCatalogConfig externalCatalogConfig =
ExternalCatalogConfigFactory.fromCatalogType(
TEST_CATALOG_TYPE, UUID.randomUUID().toString(), Collections.emptyMap());
Assertions.assertEquals(
ITTestUtils.TestCatalogSyncImpl.class.getName(),
externalCatalogConfig.getCatalogSyncClientImpl());
Assertions.assertEquals(
ITTestUtils.TestCatalogConversionSourceImpl.class.getName(),
externalCatalogConfig.getCatalogConversionSourceImpl());
}

@Test
void testFromCatalogTypeNotFound() {
Assertions.assertThrows(
NotSupportedException.class,
() ->
ExternalCatalogConfigFactory.fromCatalogType(
"invalid", UUID.randomUUID().toString(), Collections.emptyMap()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,9 @@ void testNoTableFormatConversionWithMultipleCatalogSync() {
when(mockConversionSource.getCurrentTable()).thenReturn(getInternalTable());
// Mocks for tableFormatSync.
Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1));
SyncResult syncResult = buildSyncResult(syncMode, instantBeforeHour, Duration.ofSeconds(1));
Instant syncStartTime = Instant.now();
SyncResult syncResult =
buildSyncResult(syncMode, instantBeforeHour, syncStartTime, Duration.ofSeconds(1));
Map<String, SyncResult> tableFormatSyncResults =
buildPerTableResult(Arrays.asList(ICEBERG, DELTA), syncResult);
when(tableFormatSync.syncSnapshot(
Expand All @@ -470,7 +472,8 @@ void testNoTableFormatConversionWithMultipleCatalogSync() {
targetCatalogs.get(0).getCatalogTableIdentifier(), mockCatalogSyncClient1,
targetCatalogs.get(1).getCatalogTableIdentifier(), mockCatalogSyncClient2)),
any()))
.thenReturn(buildSyncResult(syncMode, Instant.now(), Duration.ofSeconds(3)));
.thenReturn(
buildSyncResult(syncMode, syncStartTime, instantBeforeHour, Duration.ofSeconds(3)));
ConversionController conversionController =
new ConversionController(
mockConf,
Expand Down Expand Up @@ -523,11 +526,11 @@ private SyncResult buildSyncResult(SyncMode syncMode, Instant lastSyncedInstant)
}

private SyncResult buildSyncResult(
SyncMode syncMode, Instant lastSyncedInstant, Duration duration) {
SyncMode syncMode, Instant syncStartTime, Instant lastSyncedInstant, Duration duration) {
return SyncResult.builder()
.mode(syncMode)
.lastInstantSynced(lastSyncedInstant)
.syncStartTime(Instant.now().minusSeconds(duration.getSeconds()))
.syncStartTime(syncStartTime)
.syncDuration(duration)
.tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.xtable.spi.sync.CatalogSyncClient;

public class ITTestUtils {
public static final String TEST_CATALOG_TYPE = "test";

public static void validateTable(
InternalTable internalTable,
Expand All @@ -59,12 +60,19 @@ public static class TestCatalogSyncImpl implements CatalogSyncClient {
public TestCatalogSyncImpl(
ExternalCatalogConfig catalogConfig, String tableFormat, Configuration hadoopConf) {}

public TestCatalogSyncImpl() {}

@Override
public String getCatalogId() {
trackFunctionCall();
return null;
}

@Override
public String getCatalogType() {
return TEST_CATALOG_TYPE;
}

@Override
public String getStorageLocation(Object o) {
trackFunctionCall();
Expand Down Expand Up @@ -128,6 +136,8 @@ public static class TestCatalogConversionSourceImpl implements CatalogConversion
public TestCatalogConversionSourceImpl(
ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) {}

public TestCatalogConversionSourceImpl() {}

@Override
public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
return SourceTable.builder()
Expand All @@ -136,5 +146,10 @@ public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
.formatName("ICEBERG")
.build();
}

@Override
public String getCatalogType() {
return TEST_CATALOG_TYPE;
}g
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
##########################################################################
org.apache.xtable.testutil.ITTestUtils$TestCatalogConversionSourceImpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
##########################################################################
org.apache.xtable.testutil.ITTestUtils$TestCatalogSyncImpl
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import lombok.Builder;
import lombok.Value;
Expand Down Expand Up @@ -182,16 +182,17 @@ public static void main(String[] args) throws Exception {
.syncMode(SyncMode.INCREMENTAL)
.build();
List<String> tableFormats =
new ArrayList<>(Collections.singleton(sourceTable.getFormatName()));
tableFormats.addAll(
targetTables.stream().map(TargetTable::getFormatName).collect(Collectors.toList()));
tableFormats = tableFormats.stream().distinct().collect(Collectors.toList());
Stream.concat(
Stream.of(sourceTable.getFormatName()),
targetTables.stream().map(TargetTable::getFormatName))
.distinct()
.collect(Collectors.toList());
try {
conversionController.syncTableAcrossCatalogs(
conversionConfig,
getConversionSourceProviders(tableFormats, tableFormatConverters, hadoopConf));
} catch (Exception e) {
log.error(String.format("Error running sync for %s", sourceTable.getBasePath()), e);
log.error("Error running sync for {}", sourceTable.getBasePath(), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public static void main(String[] args) throws IOException {
try {
conversionController.sync(conversionConfig, conversionSourceProvider);
} catch (Exception e) {
log.error(String.format("Error running sync for %s", table.getTableBasePath()), e);
log.error("Error running sync for {}", table.getTableBasePath(), e);
}
}
}
Expand Down

0 comments on commit 0d5b763

Please sign in to comment.