From b636874127c25a47bbcb661aea3ac6ca405c75d4 Mon Sep 17 00:00:00 2001 From: tryangul Date: Mon, 18 Sep 2023 10:57:13 -0700 Subject: [PATCH] Lower available buffer size for BQ. Add/edit comments to clarify how this all works. --- .../destination_async/GlobalMemoryManager.java | 14 ++++++++------ .../destination_async/buffers/BufferManager.java | 11 +++++++---- .../bigquery/BigQueryStagingConsumerFactory.java | 12 +++++++++++- 3 files changed, 26 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/GlobalMemoryManager.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/GlobalMemoryManager.java index 726e1c7a4cc1..cd6686738a36 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/GlobalMemoryManager.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/GlobalMemoryManager.java @@ -9,16 +9,18 @@ import org.apache.commons.io.FileUtils; /** - * Responsible for managing global memory across multiple queues in a thread-safe way. + * Responsible for managing buffer memory across multiple queues in a thread-safe way. This does + * not allocate or free memory in the traditional sense, but rather manages based off memory estimates + * provided by the callers. *

- * This means memory allocation and de-allocation for each queue can be dynamically adjusted + * The goal is to enable maximum allowed memory bounds for each queue to be dynamically adjusted * according to the overall available memory. Memory blocks are managed in chunks of * {@link #BLOCK_SIZE_BYTES}, and the total amount of memory managed is configured at creation time. *

- * As a destination has no information about incoming per-stream records, having static non-global + * As a destination has no information about incoming per-stream records, having static * queue sizes can cause unnecessary backpressure on a per-stream basis. By providing a dynamic, - * global view of memory management, this class allows each queue to free and consume memory - * dynamically, enabling effective sharing of global memory resources across all the queues. and + * global view of buffer memory management, this class allows each queue to release and request memory + * dynamically, enabling effective sharing of global memory resources across all the queues, and * avoiding accidental stream backpressure. *

* This becomes particularly useful in the following scenarios: @@ -77,7 +79,7 @@ public synchronized long requestMemory() { } /** - * Frees a block of memory of the given size. If the amount of memory freed exceeds the current + * Releases a block of memory of the given size. If the amount of memory released exceeds the current * memory allocation, a warning will be logged. * * @param bytes the size of the block to free, in bytes diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/buffers/BufferManager.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/buffers/BufferManager.java index 8991e7e3fd9b..9ae33e0d6b24 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/buffers/BufferManager.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination_async/buffers/BufferManager.java @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination_async.buffers; -import com.google.common.annotations.VisibleForTesting; import io.airbyte.integrations.destination_async.AirbyteFileUtils; import io.airbyte.integrations.destination_async.FlushWorkers; import io.airbyte.integrations.destination_async.GlobalMemoryManager; @@ -35,13 +34,17 @@ public class BufferManager { private final ScheduledExecutorService debugLoop; public BufferManager() { - this((long) (Runtime.getRuntime().maxMemory() * 0.8)); + this((long) (Runtime.getRuntime().maxMemory() * 0.7)); } - @VisibleForTesting + /** + * @param memoryLimit the amount of estimated memory we allow for all buffers. The GlobalMemoryManager will apply back pressure once this quota is + * filled. "Memory" can be released back once flushing finishes. This number should be large enough we don't block reading + * unnecessarily, but small enough we apply back pressure before OOMing. + */ public BufferManager(final long memoryLimit) { maxMemory = memoryLimit; - LOGGER.info("Memory available to the JVM {}", FileUtils.byteCountToDisplaySize(maxMemory)); + LOGGER.info("Max 'memory' available for buffer allocation {}", FileUtils.byteCountToDisplaySize(maxMemory)); memoryManager = new GlobalMemoryManager(maxMemory); this.stateManager = new GlobalAsyncStateManager(memoryManager); buffers = new ConcurrentHashMap<>(); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java index e5d52f70e285..73ce256e1dc1 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -64,10 +64,20 @@ public SerializedAirbyteMessageConsumer createAsync( () -> onCloseFunction(bigQueryGcsOperations, writeConfigsByDescriptor, typerDeduper).accept(false), flusher, catalog, - new BufferManager(), + new BufferManager(getBigQueryBufferMemoryLimit()), defaultNamespace); } + /** + * Out BigQuery's uploader threads use a fair amount of memory. We believe this is largely + * due to the sdk client we use. + * + * @return number of bytes to make available for message buffering. + */ + private long getBigQueryBufferMemoryLimit() { + return (long) (Runtime.getRuntime().maxMemory() * 0.4); + } + private Map createWriteConfigs(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final ParsedCatalog parsedCatalog,