diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeue.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeue.java index 5487027f1795..d9f70dea9a87 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeue.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeue.java @@ -19,6 +19,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Represents the minimal interface over the underlying buffer queues required for dequeue @@ -30,6 +32,9 @@ // todo (cgardens) - make all the metadata methods more efficient. public class BufferDequeue { + private static final Logger LOGGER = LoggerFactory.getLogger(BufferDequeue.class); + + private final GlobalMemoryManager memoryManager; private final ConcurrentMap<StreamDescriptor, StreamAwareQueue> buffers; private final GlobalAsyncStateManager stateManager; @@ -74,6 +79,10 @@ public MemoryAwareMessageBatch take(final StreamDescriptor streamDescriptor, fin } } + LOGGER.info("TAKE QUEUE"); + LOGGER.info(String.valueOf(queue.size())); + LOGGER.info(streamDescriptor.getName()); + memoryManager.ghettoLog(String.format("Take queue size is: %d, name - %s | %s", queue.size(), "" + streamDescriptor.getNamespace(), streamDescriptor.getName())); if (queue.isEmpty()) {