diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannel.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannel.java index a1d2cf2dd1372..a2a6212fc0e5a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannel.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannel.java @@ -57,17 +57,13 @@ public List fetch(final int batchSize, final long timeoutMillis) { long startMillis = System.currentTimeMillis(); int recordsCount = 0; do { - List records = queue.poll(); + List records = queue.poll(Math.max(0, timeoutMillis - (System.currentTimeMillis() - startMillis)), TimeUnit.MILLISECONDS); if (null == records || records.isEmpty()) { - TimeUnit.MILLISECONDS.sleep(Math.min(100L, timeoutMillis)); - } else { - recordsCount += records.size(); - result.addAll(records); + continue; } - if (recordsCount >= batchSize) { - break; - } - } while (System.currentTimeMillis() - startMillis < timeoutMillis); + recordsCount += records.size(); + result.addAll(records); + } while (recordsCount < batchSize && System.currentTimeMillis() - startMillis < timeoutMillis); return result; } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java index 78dcfa672413c..7899b8529778e 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java @@ -40,7 +40,15 @@ void assertZeroQueueSizeWorks() { List records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition())); Thread thread = new Thread(() -> channel.push(records)); thread.start(); - assertThat(channel.fetch(1, 500L), is(records)); + assertThat(channel.fetch(1, 5L), is(records)); thread.join(); } + + @Test + void assertFetchWithZeroTimeout() { + MemoryPipelineChannel channel = new MemoryPipelineChannel(100, new InventoryTaskAckCallback(new AtomicReference<>())); + List records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition())); + channel.push(records); + assertThat(channel.fetch(10, 0L), is(records)); + } } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index e4eb4a36da900..1ce11748f68cf 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -146,7 +146,7 @@ private void initIncrementalTask(final CDCJobItemContext jobItemContext, final A .createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader()); boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX(); Importer importer = importerUsed.get() ? null - : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 300L, jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm()); + : new CDCImporter(channelProgressPairs, 1, 300L, jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm()); PipelineTask incrementalTask = new CDCIncrementalTask( dumperContext.getCommonContext().getDataSourceName(), jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, importer, taskProgress); jobItemContext.getIncrementalTasks().add(incrementalTask); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java index e3167fc60f288..930fd9a9a9f41 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java @@ -207,16 +207,16 @@ private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) importerConfig.getConcurrency(), jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(), taskProgress); Dumper dumper = DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()) .createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader); - Collection importers = createImporters(importerConfig, jobItemContext.getSink(), channel, jobItemContext); + Collection importers = createIncrementalImporters(importerConfig, jobItemContext.getSink(), channel, jobItemContext); PipelineTask incrementalTask = new IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress); jobItemContext.getIncrementalTasks().add(incrementalTask); } - private Collection createImporters(final ImporterConfiguration importerConfig, final PipelineSink sink, final PipelineChannel channel, - final PipelineJobProgressListener jobProgressListener) { + private Collection createIncrementalImporters(final ImporterConfiguration importerConfig, final PipelineSink sink, final PipelineChannel channel, + final PipelineJobProgressListener jobProgressListener) { Collection result = new LinkedList<>(); for (int i = 0; i < importerConfig.getConcurrency(); i++) { - result.add(new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3000L, sink, jobProgressListener)); + result.add(new SingleChannelConsumerImporter(channel, 1, 5L, sink, jobProgressListener)); } return result; }