diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt index 09ef96217d18..2378ef72c627 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt @@ -29,7 +29,6 @@ class DefaultProcessBatchTask( val streamLoader = syncManager.getOrAwaitStreamLoader(batchEnvelope.streamDescriptor) val nextBatch = streamLoader.processBatch(batchEnvelope.batch) val nextWrapped = batchEnvelope.withBatch(nextBatch) - log.error { "processing batch ${batchEnvelope}" } taskLauncher.handleNewBatch(nextWrapped.streamDescriptor, nextWrapped) } } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt index 042b5d1af292..852ee7375935 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FilePartAccumulator.kt @@ -60,10 +60,7 @@ class FilePartAccumulator( index: Long,) { val wrapped = BatchEnvelope(batch, Range.singleton(index), streamDescriptor) - taskLauncher.handleNewBatch(streamDescriptor, wrapped) - if (batch.requiresProcessing) { - outputQueue.publish(wrapped) - } + outputQueue.publish(wrapped) } }