Skip to content

Commit

Permalink
Proper close
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Oct 10, 2023
1 parent 3be9554 commit 2671973
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
},
(hasFailed) -> {
try {
writeConfigs.forEach((streamId, uploader) -> {
uploader.closeAfterPush();
});
writeConfigs.forEach((streamId, uploader) -> uploader.closeWithoutState(hasFailed));
Thread.sleep(30 * 1000); // 30 seconds
typerDeduper.typeAndDedupe();
typerDeduper.commitFinalTables();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ public void close(final boolean hasFailed, final Consumer<AirbyteMessage> output
}
}

public void closeWithoutState(final boolean hasFailed) {
try {
recordFormatter.printAndCleanFieldFails();

this.writer.close(hasFailed);

this.postProcessAction(hasFailed);
} catch (final Exception e) {
LOGGER.error(String.format("Failed to close %s writer, \n details: %s", this, e.getMessage()));
printHeapMemoryConsumption();
throw new RuntimeException(e);
}
}

public void closeAfterPush() {
try {
this.writer.close(false);
Expand Down

0 comments on commit 2671973

Please sign in to comment.