Skip to content

Commit

Permalink
Revoke sleep interrupt changes
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy committed Dec 5, 2023
1 parent 9f4b2b4 commit 9546df5
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void pushRecords(final List<Record> records) {
queue.put(records);
}

@SneakyThrows(InterruptedException.class)
// TODO thread-safe?
@Override
public List<Record> fetchRecords(final int batchSize, final long timeout, final TimeUnit timeUnit) {
Expand All @@ -59,11 +60,7 @@ public List<Record> fetchRecords(final int batchSize, final long timeout, final
do {
List<Record> records = queue.poll();
if (null == records || records.isEmpty()) {
try {
TimeUnit.MILLISECONDS.sleep(Math.min(100, timeoutMillis));
} catch (final InterruptedException ignored) {
return Collections.emptyList();
}
TimeUnit.MILLISECONDS.sleep(Math.min(100, timeoutMillis));
} else {
recordsCount += records.size();
result.addAll(records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

package org.apache.shardingsphere.data.pipeline.opengauss.ingest;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
Expand Down Expand Up @@ -83,6 +84,7 @@ public OpenGaussWALDumper(final IncrementalDumperContext dumperContext, final In
this.decodeWithTX = dumperContext.isDecodeWithTX();
}

@SneakyThrows(InterruptedException.class)
@Override
protected void runBlocking() {
AtomicInteger reconnectTimes = new AtomicInteger();
Expand All @@ -94,11 +96,7 @@ protected void runBlocking() {
int times = reconnectTimes.incrementAndGet();
log.error("Connect failed, reconnect times={}", times, ex);
if (isRunning()) {
try {
Thread.sleep(5000L);
} catch (final InterruptedException ignored) {
break;
}
Thread.sleep(5000L);
}
if (times >= 5) {
throw new IngestException(ex);
Expand All @@ -107,6 +105,7 @@ protected void runBlocking() {
}
}

@SneakyThrows(InterruptedException.class)
private void dump() throws SQLException {
PGReplicationStream stream = null;
try (PgConnection connection = getReplicationConnectionUnwrap()) {
Expand All @@ -116,11 +115,7 @@ private void dump() throws SQLException {
while (isRunning()) {
ByteBuffer message = stream.readPending();
if (null == message) {
try {
Thread.sleep(5000L);
} catch (final InterruptedException ignored) {
break;
}
Thread.sleep(10L);
continue;
}
AbstractWALEvent event = decodingPlugin.decode(message, new OpenGaussLogSequenceNumber(stream.getLastReceiveLSN()));
Expand Down

0 comments on commit 9546df5

Please sign in to comment.