Skip to content

Commit

Permalink
Improve MemoryPipelineChannel.fetch (#29664)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandynz authored Jan 7, 2024
1 parent 998b6bd commit 518f352
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,13 @@ public List<Record> fetch(final int batchSize, final long timeoutMillis) {
long startMillis = System.currentTimeMillis();
int recordsCount = 0;
do {
List<Record> records = queue.poll();
List<Record> records = queue.poll(Math.max(0, timeoutMillis - (System.currentTimeMillis() - startMillis)), TimeUnit.MILLISECONDS);
if (null == records || records.isEmpty()) {
TimeUnit.MILLISECONDS.sleep(Math.min(100L, timeoutMillis));
} else {
recordsCount += records.size();
result.addAll(records);
continue;
}
if (recordsCount >= batchSize) {
break;
}
} while (System.currentTimeMillis() - startMillis < timeoutMillis);
recordsCount += records.size();
result.addAll(records);
} while (recordsCount < batchSize && System.currentTimeMillis() - startMillis < timeoutMillis);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,15 @@ void assertZeroQueueSizeWorks() {
List<Record> records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition()));
Thread thread = new Thread(() -> channel.push(records));
thread.start();
assertThat(channel.fetch(1, 500L), is(records));
assertThat(channel.fetch(1, 5L), is(records));
thread.join();
}

@Test
void assertFetchWithZeroTimeout() {
MemoryPipelineChannel channel = new MemoryPipelineChannel(100, new InventoryTaskAckCallback(new AtomicReference<>()));
List<Record> records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition()));
channel.push(records);
assertThat(channel.fetch(10, 0L), is(records));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private void initIncrementalTask(final CDCJobItemContext jobItemContext, final A
.createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader());
boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
Importer importer = importerUsed.get() ? null
: new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 300L, jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm());
: new CDCImporter(channelProgressPairs, 1, 300L, jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm());
PipelineTask incrementalTask = new CDCIncrementalTask(
dumperContext.getCommonContext().getDataSourceName(), jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, importer, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,16 +207,16 @@ private void initIncrementalTasks(final MigrationJobItemContext jobItemContext)
importerConfig.getConcurrency(), jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(), taskProgress);
Dumper dumper = DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
.createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader);
Collection<Importer> importers = createImporters(importerConfig, jobItemContext.getSink(), channel, jobItemContext);
Collection<Importer> importers = createIncrementalImporters(importerConfig, jobItemContext.getSink(), channel, jobItemContext);
PipelineTask incrementalTask = new IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
}

private Collection<Importer> createImporters(final ImporterConfiguration importerConfig, final PipelineSink sink, final PipelineChannel channel,
final PipelineJobProgressListener jobProgressListener) {
private Collection<Importer> createIncrementalImporters(final ImporterConfiguration importerConfig, final PipelineSink sink, final PipelineChannel channel,
final PipelineJobProgressListener jobProgressListener) {
Collection<Importer> result = new LinkedList<>();
for (int i = 0; i < importerConfig.getConcurrency(); i++) {
result.add(new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3000L, sink, jobProgressListener));
result.add(new SingleChannelConsumerImporter(channel, 1, 5L, sink, jobProgressListener));
}
return result;
}
Expand Down

0 comments on commit 518f352

Please sign in to comment.