Skip to content

Commit

Permalink
improve sleep interrupt
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy committed Dec 5, 2023
1 parent a493509 commit 9f4b2b4
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public void pushRecords(final List<Record> records) {
queue.put(records);
}

@SneakyThrows(InterruptedException.class)
// TODO thread-safe?
@Override
public List<Record> fetchRecords(final int batchSize, final long timeout, final TimeUnit timeUnit) {
Expand All @@ -60,7 +59,11 @@ public List<Record> fetchRecords(final int batchSize, final long timeout, final
do {
List<Record> records = queue.poll();
if (null == records || records.isEmpty()) {
TimeUnit.MILLISECONDS.sleep(Math.min(100, timeoutMillis));
try {
TimeUnit.MILLISECONDS.sleep(Math.min(100, timeoutMillis));
} catch (final InterruptedException ignored) {
return Collections.emptyList();
}
} else {
recordsCount += records.size();
result.addAll(records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ protected void runBlocking() {
int times = reconnectTimes.incrementAndGet();
log.error("Connect failed, reconnect times={}", times, ex);
if (isRunning()) {
sleepIgnoreInterrupt(5000L);
try {
Thread.sleep(5000L);
} catch (final InterruptedException ignored) {
break;
}
}
if (times >= 5) {
throw new IngestException(ex);
Expand All @@ -103,13 +107,6 @@ protected void runBlocking() {
}
}

private void sleepIgnoreInterrupt(final long millis) {
try {
Thread.sleep(millis);
} catch (final InterruptedException ignored) {
}
}

private void dump() throws SQLException {
PGReplicationStream stream = null;
try (PgConnection connection = getReplicationConnectionUnwrap()) {
Expand All @@ -119,7 +116,11 @@ private void dump() throws SQLException {
while (isRunning()) {
ByteBuffer message = stream.readPending();
if (null == message) {
sleepIgnoreInterrupt(5000L);
try {
Thread.sleep(5000L);
} catch (final InterruptedException ignored) {
break;
}
continue;
}
AbstractWALEvent event = decodingPlugin.decode(message, new OpenGaussLogSequenceNumber(stream.getLastReceiveLSN()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.exception.SchemaNotFoundException;
import org.apache.shardingsphere.infra.exception.TableNotExistsException;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
Expand Down Expand Up @@ -57,13 +58,19 @@ public static Map<String, Set<String>> parseTableExpressionWithSchema(final Shar
return parseTableExpressionWithAllTables(database, systemSchemas);
}
Map<String, Set<String>> result = new HashMap<>();
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(database.getProtocolType()).getDialectDatabaseMetaData();
for (SchemaTable each : schemaTables) {
if ("*".equals(each.getSchema())) {
result.putAll(parseTableExpressionWithAllSchema(database, systemSchemas, each));
} else if ("*".equals(each.getTable())) {
result.putAll(parseTableExpressionWithAllTable(database, each));
} else {
result.computeIfAbsent(each.getSchema(), ignored -> new HashSet<>()).add(each.getTable());
String schemaName = each.getSchema();
if (dialectDatabaseMetaData.getDefaultSchema().isPresent() && schemaName.isEmpty()) {
schemaName = dialectDatabaseMetaData.getDefaultSchema().get();
}
ShardingSpherePreconditions.checkNotNull(database.getSchema(schemaName).getTable(each.getTable()), () -> new TableNotExistsException(each.getTable()));
result.computeIfAbsent(schemaName, ignored -> new HashSet<>()).add(each.getTable());
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void assertParseTableExpression() {
assertThat(actual, is(expected));
schemaTables = Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
actual = CDCSchemaTableUtils.parseTableExpressionWithSchema(database, schemaTables);
expected = Collections.singletonMap("", Collections.singleton("t_order"));
expected = Collections.singletonMap("public", Collections.singleton("t_order"));
assertThat(actual, is(expected));
schemaTables = Collections.singletonList(SchemaTable.newBuilder().setSchema("*").setTable("t_order").build());
actual = CDCSchemaTableUtils.parseTableExpressionWithSchema(database, schemaTables);
Expand Down

0 comments on commit 9f4b2b4

Please sign in to comment.