Skip to content

Commit

Permalink
Add flusher level count
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Oct 2, 2023
1 parent 0cfe7a1 commit 89077b0
Showing 1 changed file with 4 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -30,17 +31,20 @@ public BigQueryAsyncStandardFlush(BigQuery bigQuery,
@Override
public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessage> stream) throws Exception {
ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMapSupplied = uploaderMap.get();
AtomicInteger recordCount = new AtomicInteger();
stream.forEach(aibyteMessage -> {
try {
AirbyteStreamNameNamespacePair sd = new AirbyteStreamNameNamespacePair(aibyteMessage.getRecord().getStream(),
aibyteMessage.getRecord().getNamespace());
uploaderMapSupplied.get(sd).upload(aibyteMessage);
recordCount.getAndIncrement();
} catch (Exception e) {
log.error("BQ async standard flush");
log.error(aibyteMessage.toString());
throw e;
}
});
log.error("Record count for standard flush: " + recordCount.get());
uploaderMapSupplied.values().forEach(test -> test.closeAfterPush());
}

Expand Down

0 comments on commit 89077b0

Please sign in to comment.