From af41f97974f94d8b7607e27e3c4875ad22a51b61 Mon Sep 17 00:00:00 2001 From: benmoriceau Date: Mon, 16 Dec 2024 16:16:09 -0800 Subject: [PATCH] Rm logs --- .../io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt | 4 ---- .../io/airbyte/cdk/load/task/internal/InputConsumerTask.kt | 1 - 2 files changed, 5 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt index 469929efe832a..6a828afad6f34 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt @@ -36,7 +36,6 @@ class DefaultProcessFileTask( override suspend fun execute() { outputQueue.use { inputQueue.consume().collect { (streamDescriptor, file, index) -> - log.error { "consuming index $index" } val streamLoader = syncManager.getOrAwaitStreamLoader(streamDescriptor) val acc = accumulators.getOrPut(streamDescriptor) { @@ -52,11 +51,9 @@ class DefaultProcessFileTask( if (read == -1) { handleFilePart(file, ByteArray(0), index, true, acc, streamDescriptor, index) - log.error { "end of file" } break } else if (read < bytePart.size) { handleFilePart(file, bytePart.copyOfRange(0, read), index, true, acc, streamDescriptor, index) - log.error { "end of file" } break } else { handleFilePart(file, bytePart, index, false, acc, streamDescriptor, index) @@ -64,7 +61,6 @@ class DefaultProcessFileTask( } localFile.delete() } - log.error { "Closing input queue" } } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt index e492da34df419..11e6df7d035d4 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt @@ -106,7 +106,6 @@ class DefaultInputConsumerTask( reserved.release() // safe because multiple calls conflate manager.markEndOfStream(true) fileTransferQueue.close() - log.error { "End Of Stream" } val envelope = BatchEnvelope( SimpleBatch(Batch.State.COMPLETE),