From 3a70f0c4557769184c14ba30137301372a6c2476 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 18 Dec 2023 13:05:31 -0800 Subject: [PATCH] cdk fixes for dv2 (#33506) Signed-off-by: Gireesh Sreepathi Co-authored-by: Gireesh Sreepathi --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../integrations/destination_async/FlushWorkers.java | 12 ++++++++---- .../core/src/main/resources/version.properties | 2 +- .../destination/jdbc/JdbcSqlOperations.java | 11 ++++++----- .../destination/DestinationAcceptanceTest.java | 4 ++-- 5 files changed, 18 insertions(+), 12 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index fb8eadee4419..10439f81f7d7 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -156,6 +156,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.8.0 | 2023-12-18 | [\#33506](https://github.com/airbytehq/airbyte/pull/33506) | Improve async destination shutdown logic; more JDBC async migration work; improve DAT test schema handling | | 0.7.8 | 2023-12-18 | [\#33365](https://github.com/airbytehq/airbyte/pull/33365) | Emit stream statuses more consistently | | 0.7.7 | 2023-12-18 | [\#33434](https://github.com/airbytehq/airbyte/pull/33307) | Remove LEGACY state | | 0.7.6 | 2023-12-14 | [\#32328](https://github.com/airbytehq/airbyte/pull/33307) | Add schema less mode for mongodb CDC. Fixes for non standard mongodb id type. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java index b02ebdf131b1..78ad409196cd 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java @@ -222,13 +222,17 @@ public void close() throws Exception { // before shutting down the supervisor, flush all state. emitStateMessages(stateManager.flushStates()); supervisorThread.shutdown(); - final var supervisorShut = supervisorThread.awaitTermination(5L, TimeUnit.MINUTES); - log.info("Closing flush workers -- Supervisor shutdown status: {}", supervisorShut); + while (!supervisorThread.awaitTermination(5L, TimeUnit.MINUTES)) { + log.info("Waiting for flush worker supervisor to shut down"); + } + log.info("Closing flush workers -- supervisor shut down"); log.info("Closing flush workers -- Starting worker pool shutdown.."); workerPool.shutdown(); - final var workersShut = workerPool.awaitTermination(5L, TimeUnit.MINUTES); - log.info("Closing flush workers -- Workers shutdown status: {}", workersShut); + while (!workerPool.awaitTermination(5L, TimeUnit.MINUTES)) { + log.info("Waiting for flush workers to shut down"); + } + log.info("Closing flush workers -- workers shut down"); debugLoop.shutdownNow(); } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 0d8bc8dfe0ed..b21853fac3ad 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.7.8 +version=0.8.0 diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java index 99b82dd8f8c5..a7db620058fc 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java @@ -11,7 +11,6 @@ import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import java.io.File; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; @@ -110,13 +109,15 @@ protected String createTableQueryV2(final String schemaName, final String tableN // TODO: This method seems to be used by Postgres and others while staging to local temp files. // Should there be a Local staging operations equivalent - protected void writeBatchToFile(final File tmpFile, final List records) throws Exception { + protected void writeBatchToFile(final File tmpFile, final List records) throws Exception { try (final PrintWriter writer = new PrintWriter(tmpFile, StandardCharsets.UTF_8); final CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT)) { - for (final AirbyteRecordMessage record : records) { + for (final PartialAirbyteMessage record : records) { final var uuid = UUID.randomUUID().toString(); - final var jsonData = Jsons.serialize(formatData(record.getData())); - final var extractedAt = Timestamp.from(Instant.ofEpochMilli(record.getEmittedAt())); + // TODO we only need to do this is formatData is overridden. If not, we can just do jsonData = + // record.getSerialized() + final var jsonData = Jsons.serialize(formatData(Jsons.deserializeExact(record.getSerialized()))); + final var extractedAt = Timestamp.from(Instant.ofEpochMilli(record.getRecord().getEmittedAt())); if (TypingAndDedupingFlag.isDestinationV2()) { csvPrinter.printRecord(uuid, jsonData, extractedAt, null); } else { diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.java index e3204dd54899..c0938ba6d4bc 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.java @@ -99,7 +99,7 @@ public abstract class DestinationAcceptanceTest { - protected static final HashSet TEST_SCHEMAS = new HashSet<>(); + protected HashSet TEST_SCHEMAS; private static final Random RANDOM = new Random(); private static final String NORMALIZATION_VERSION = "dev"; @@ -357,7 +357,7 @@ void setUpInternal() throws Exception { LOGGER.info("localRoot: {}", localRoot); testEnv = new TestDestinationEnv(localRoot); mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater.class); - + TEST_SCHEMAS = new HashSet<>(); setup(testEnv, TEST_SCHEMAS); processFactory = new DockerProcessFactory(