Skip to content

Commit

Permalink
Lower available buffer size for BQ. Add/edit comments to clarify how …
Browse files Browse the repository at this point in the history
…this all works.
  • Loading branch information
tryangul committed Sep 18, 2023
1 parent 4643e70 commit b636874
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@
import org.apache.commons.io.FileUtils;

/**
* Responsible for managing global memory across multiple queues in a thread-safe way.
* Responsible for managing buffer memory across multiple queues in a thread-safe way. This does
* not allocate or free memory in the traditional sense, but rather manages based off memory estimates
* provided by the callers.
* <p>
* This means memory allocation and de-allocation for each queue can be dynamically adjusted
* The goal is to enable maximum allowed memory bounds for each queue to be dynamically adjusted
* according to the overall available memory. Memory blocks are managed in chunks of
* {@link #BLOCK_SIZE_BYTES}, and the total amount of memory managed is configured at creation time.
* <p>
* As a destination has no information about incoming per-stream records, having static non-global
* As a destination has no information about incoming per-stream records, having static
* queue sizes can cause unnecessary backpressure on a per-stream basis. By providing a dynamic,
* global view of memory management, this class allows each queue to free and consume memory
* dynamically, enabling effective sharing of global memory resources across all the queues. and
* global view of buffer memory management, this class allows each queue to release and request memory
* dynamically, enabling effective sharing of global memory resources across all the queues, and
* avoiding accidental stream backpressure.
* <p>
* This becomes particularly useful in the following scenarios:
Expand Down Expand Up @@ -77,7 +79,7 @@ public synchronized long requestMemory() {
}

/**
* Frees a block of memory of the given size. If the amount of memory freed exceeds the current
* Releases a block of memory of the given size. If the amount of memory released exceeds the current
* memory allocation, a warning will be logged.
*
* @param bytes the size of the block to free, in bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.integrations.destination_async.buffers;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.integrations.destination_async.AirbyteFileUtils;
import io.airbyte.integrations.destination_async.FlushWorkers;
import io.airbyte.integrations.destination_async.GlobalMemoryManager;
Expand Down Expand Up @@ -35,13 +34,17 @@ public class BufferManager {
private final ScheduledExecutorService debugLoop;

public BufferManager() {
this((long) (Runtime.getRuntime().maxMemory() * 0.8));
this((long) (Runtime.getRuntime().maxMemory() * 0.7));
}

@VisibleForTesting
/**
* @param memoryLimit the amount of estimated memory we allow for all buffers. The GlobalMemoryManager will apply back pressure once this quota is
* filled. "Memory" can be released back once flushing finishes. This number should be large enough we don't block reading
* unnecessarily, but small enough we apply back pressure before OOMing.
*/
public BufferManager(final long memoryLimit) {
maxMemory = memoryLimit;
LOGGER.info("Memory available to the JVM {}", FileUtils.byteCountToDisplaySize(maxMemory));
LOGGER.info("Max 'memory' available for buffer allocation {}", FileUtils.byteCountToDisplaySize(maxMemory));
memoryManager = new GlobalMemoryManager(maxMemory);
this.stateManager = new GlobalAsyncStateManager(memoryManager);
buffers = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,20 @@ public SerializedAirbyteMessageConsumer createAsync(
() -> onCloseFunction(bigQueryGcsOperations, writeConfigsByDescriptor, typerDeduper).accept(false),
flusher,
catalog,
new BufferManager(),
new BufferManager(getBigQueryBufferMemoryLimit()),
defaultNamespace);
}

/**
* Out BigQuery's uploader threads use a fair amount of memory. We believe this is largely
* due to the sdk client we use.
*
* @return number of bytes to make available for message buffering.
*/
private long getBigQueryBufferMemoryLimit() {
return (long) (Runtime.getRuntime().maxMemory() * 0.4);
}

private Map<StreamDescriptor, BigQueryWriteConfig> createWriteConfigs(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final ParsedCatalog parsedCatalog,
Expand Down

0 comments on commit b636874

Please sign in to comment.