diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java index 64cf29ef12ba..6efc13189cec 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java @@ -11,6 +11,7 @@ import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.v0.StreamDescriptor; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; @@ -30,17 +31,20 @@ public BigQueryAsyncStandardFlush(BigQuery bigQuery, @Override public void flush(final StreamDescriptor decs, final Stream stream) throws Exception { ConcurrentMap> uploaderMapSupplied = uploaderMap.get(); + AtomicInteger recordCount = new AtomicInteger(); stream.forEach(aibyteMessage -> { try { AirbyteStreamNameNamespacePair sd = new AirbyteStreamNameNamespacePair(aibyteMessage.getRecord().getStream(), aibyteMessage.getRecord().getNamespace()); uploaderMapSupplied.get(sd).upload(aibyteMessage); + recordCount.getAndIncrement(); } catch (Exception e) { log.error("BQ async standard flush"); log.error(aibyteMessage.toString()); throw e; } }); + log.error("Record count for standard flush: " + recordCount.get()); uploaderMapSupplied.values().forEach(test -> test.closeAfterPush()); }