Skip to content

Commit

Permalink
only use queue
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Dec 17, 2024
1 parent 5adcfb3 commit a43a9c9
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class DefaultProcessBatchTask(
val streamLoader = syncManager.getOrAwaitStreamLoader(batchEnvelope.streamDescriptor)
val nextBatch = streamLoader.processBatch(batchEnvelope.batch)
val nextWrapped = batchEnvelope.withBatch(nextBatch)
log.error { "processing batch ${batchEnvelope}" }
taskLauncher.handleNewBatch(nextWrapped.streamDescriptor, nextWrapped)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ class FilePartAccumulator(
index: Long,) {

val wrapped = BatchEnvelope(batch, Range.singleton(index), streamDescriptor)
taskLauncher.handleNewBatch(streamDescriptor, wrapped)
if (batch.requiresProcessing) {
outputQueue.publish(wrapped)
}
outputQueue.publish(wrapped)

}
}

0 comments on commit a43a9c9

Please sign in to comment.