Skip to content

Commit

Permalink
cdk fixes for dv2 (#33506)
Browse files Browse the repository at this point in the history
Signed-off-by: Gireesh Sreepathi <[email protected]>
Co-authored-by: Gireesh Sreepathi <[email protected]>
  • Loading branch information
edgao and gisripa authored Dec 18, 2023
1 parent 743ab29 commit 3a70f0c
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 12 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.8.0 | 2023-12-18 | [\#33506](https://github.com/airbytehq/airbyte/pull/33506) | Improve async destination shutdown logic; more JDBC async migration work; improve DAT test schema handling |
| 0.7.8 | 2023-12-18 | [\#33365](https://github.com/airbytehq/airbyte/pull/33365) | Emit stream statuses more consistently |
| 0.7.7 | 2023-12-18 | [\#33434](https://github.com/airbytehq/airbyte/pull/33307) | Remove LEGACY state |
| 0.7.6 | 2023-12-14 | [\#32328](https://github.com/airbytehq/airbyte/pull/33307) | Add schema less mode for mongodb CDC. Fixes for non standard mongodb id type. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,17 @@ public void close() throws Exception {
// before shutting down the supervisor, flush all state.
emitStateMessages(stateManager.flushStates());
supervisorThread.shutdown();
final var supervisorShut = supervisorThread.awaitTermination(5L, TimeUnit.MINUTES);
log.info("Closing flush workers -- Supervisor shutdown status: {}", supervisorShut);
while (!supervisorThread.awaitTermination(5L, TimeUnit.MINUTES)) {
log.info("Waiting for flush worker supervisor to shut down");
}
log.info("Closing flush workers -- supervisor shut down");

log.info("Closing flush workers -- Starting worker pool shutdown..");
workerPool.shutdown();
final var workersShut = workerPool.awaitTermination(5L, TimeUnit.MINUTES);
log.info("Closing flush workers -- Workers shutdown status: {}", workersShut);
while (!workerPool.awaitTermination(5L, TimeUnit.MINUTES)) {
log.info("Waiting for flush workers to shut down");
}
log.info("Closing flush workers -- workers shut down");

debugLoop.shutdownNow();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.7.8
version=0.8.0
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.io.File;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -110,13 +109,15 @@ protected String createTableQueryV2(final String schemaName, final String tableN

// TODO: This method seems to be used by Postgres and others while staging to local temp files.
// Should there be a Local staging operations equivalent
protected void writeBatchToFile(final File tmpFile, final List<AirbyteRecordMessage> records) throws Exception {
protected void writeBatchToFile(final File tmpFile, final List<PartialAirbyteMessage> records) throws Exception {
try (final PrintWriter writer = new PrintWriter(tmpFile, StandardCharsets.UTF_8);
final CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT)) {
for (final AirbyteRecordMessage record : records) {
for (final PartialAirbyteMessage record : records) {
final var uuid = UUID.randomUUID().toString();
final var jsonData = Jsons.serialize(formatData(record.getData()));
final var extractedAt = Timestamp.from(Instant.ofEpochMilli(record.getEmittedAt()));
// TODO we only need to do this is formatData is overridden. If not, we can just do jsonData =
// record.getSerialized()
final var jsonData = Jsons.serialize(formatData(Jsons.deserializeExact(record.getSerialized())));
final var extractedAt = Timestamp.from(Instant.ofEpochMilli(record.getRecord().getEmittedAt()));
if (TypingAndDedupingFlag.isDestinationV2()) {
csvPrinter.printRecord(uuid, jsonData, extractedAt, null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@

public abstract class DestinationAcceptanceTest {

protected static final HashSet<String> TEST_SCHEMAS = new HashSet<>();
protected HashSet<String> TEST_SCHEMAS;

private static final Random RANDOM = new Random();
private static final String NORMALIZATION_VERSION = "dev";
Expand Down Expand Up @@ -357,7 +357,7 @@ void setUpInternal() throws Exception {
LOGGER.info("localRoot: {}", localRoot);
testEnv = new TestDestinationEnv(localRoot);
mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater.class);

TEST_SCHEMAS = new HashSet<>();
setup(testEnv, TEST_SCHEMAS);

processFactory = new DockerProcessFactory(
Expand Down

0 comments on commit 3a70f0c

Please sign in to comment.