Skip to content

Commit

Permalink
Improve CDCImporter finished flag
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy committed Nov 6, 2023
1 parent 55c71a6 commit 152f56c
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
Expand All @@ -34,6 +33,7 @@
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
Expand Down Expand Up @@ -86,7 +86,7 @@ protected void runBlocking() {
} else {
doWithoutSorting(channelProgressPairs);
}
if (channelProgressPairs.isEmpty()) {
if (channelProgressPairs.isEmpty() && ackCache.estimatedSize() == 0) {
break;
}
}
Expand Down Expand Up @@ -239,6 +239,7 @@ public void ack(final String ackId) {
each.getLeft().getChannel().ack(Collections.singletonList(ackPosition.getLastRecord()));
each.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount()));
}
ackCache.invalidate(ackId);
}

@Override
Expand Down

0 comments on commit 152f56c

Please sign in to comment.