Skip to content

Commit

Permalink
Merge pull request #2086 from ClickHouse/adjusting-retry-mechanism
Browse files Browse the repository at this point in the history
Adjusting retry mechanism
  • Loading branch information
Paultagoras authored Jan 14, 2025
2 parents fa53726 + 316bbac commit 7a15ecb
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 30 deletions.
37 changes: 15 additions & 22 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
Expand Down Expand Up @@ -1336,7 +1337,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
// Selecting some node
ClickHouseNode selectedNode = getNextAliveNode();

ClientException lastException = null;
RuntimeException lastException = null;
for (int i = 0; i <= maxRetries; i++) {
// Execute request
try (ClassicHttpResponse httpResponse =
Expand Down Expand Up @@ -1376,19 +1377,17 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
metrics.operationComplete();
metrics.setQueryId(queryId);
return new InsertResponse(metrics);
} catch (NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException | ConnectException e) {
lastException = httpClientHelper.wrapException("Insert request initiation failed", e);
} catch (Exception e) {
lastException = httpClientHelper.wrapException("Query request failed (Attempt " + (i + 1) + "/" + (maxRetries + 1) + ")", e);
if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) {
LOG.warn("Retrying", e);
LOG.warn("Retrying.", e);
selectedNode = getNextAliveNode();
} else {
throw lastException;
}
} catch (IOException e) {
throw new ClientException("Insert request failed", e);
}
}
throw new ClientException("Insert request failed after retries", lastException);
throw new ClientException("Insert request failed after attempts: " + (maxRetries + 1), lastException);
};

return runAsyncOperation(supplier, settings.getAllSettings());
Expand Down Expand Up @@ -1513,7 +1512,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
// Selecting some node
ClickHouseNode selectedNode = getNextAliveNode();

ClientException lastException = null;
RuntimeException lastException = null;
for (int i = 0; i <= maxRetries; i++) {
// Execute request
try (ClassicHttpResponse httpResponse =
Expand All @@ -1538,16 +1537,14 @@ public CompletableFuture<InsertResponse> insert(String tableName,
metrics.operationComplete();
metrics.setQueryId(queryId);
return new InsertResponse(metrics);
} catch (NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException | ConnectException e) {
lastException = httpClientHelper.wrapException("Insert request initiation failed", e);
} catch (Exception e) {
lastException = httpClientHelper.wrapException("Query request failed (Attempt " + (i + 1) + "/" + (maxRetries + 1) + ")", e);
if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) {
LOG.warn("Retrying", e);
LOG.warn("Retrying.", e);
selectedNode = getNextAliveNode();
} else {
throw lastException;
}
} catch (IOException e) {
throw new ClientException("Insert request failed", e);
}

if (i < maxRetries) {
Expand All @@ -1558,7 +1555,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
}
}
}
throw new ClientException("Insert request failed after retries", lastException);
throw new ClientException("Insert request failed after attempts: " + (maxRetries + 1), lastException);
};
} else {
responseSupplier = () -> {
Expand Down Expand Up @@ -1672,7 +1669,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
responseSupplier = () -> {
// Selecting some node
ClickHouseNode selectedNode = getNextAliveNode();
ClientException lastException = null;
RuntimeException lastException = null;
for (int i = 0; i <= maxRetries; i++) {
try {
ClassicHttpResponse httpResponse =
Expand All @@ -1699,22 +1696,18 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec

return new QueryResponse(httpResponse, finalSettings.getFormat(), finalSettings, metrics);

} catch (NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException | ConnectException e) {
lastException = httpClientHelper.wrapException("Query request initiation failed", e);
} catch (Exception e) {
lastException = httpClientHelper.wrapException("Query request failed (Attempt " + (i + 1) + "/" + (maxRetries + 1) + ")", e);
if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) {
LOG.warn("Retrying.", e);
selectedNode = getNextAliveNode();
} else {
throw lastException;
}
} catch (ClientException | ServerException e) {
throw e;
} catch (Exception e) {
throw new ClientException("Query request failed", e);
}
}

throw new ClientException("Query request failed after retries", lastException);
throw new ClientException("Query request failed after attempts: " + (maxRetries + 1), lastException);
};
} else {
ClickHouseRequest<?> request = oldClient.read(getServerNode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ public enum ClientFaultCause {
NoHttpResponse,
ConnectTimeout,
ConnectionRequestTimeout,
SocketTimeout,
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
Expand Down Expand Up @@ -108,7 +109,7 @@ public HttpAPIClientHelper(Map<String, String> configuration) {
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
LOG.info("client compression: {}, server compression: {}, http compression: {}", usingClientCompression, usingServerCompression, useHttpCompression);

defaultRetryCauses = SerializerUtils.parseEnumList(chConfiguration.get("client_retry_on_failures"), ClientFaultCause.class);
defaultRetryCauses = SerializerUtils.parseEnumList(chConfiguration.get(ClientConfigProperties.CLIENT_RETRY_ON_FAILURE.getKey()), ClientFaultCause.class);
if (defaultRetryCauses.contains(ClientFaultCause.None)) {
defaultRetryCauses.removeIf(c -> c != ClientFaultCause.None);
}
Expand Down Expand Up @@ -398,7 +399,7 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
} catch (ConnectException | NoRouteToHostException e) {
LOG.warn("Failed to connect to '{}': {}", server.getHost(), e.getMessage());
throw new ClientException("Failed to connect", e);
} catch (ConnectionRequestTimeoutException | ServerException | NoHttpResponseException | ClientException e) {
} catch (ConnectionRequestTimeoutException | ServerException | NoHttpResponseException | ClientException | SocketTimeoutException e) {
throw e;
} catch (Exception e) {
throw new ClientException("Failed to execute request", e);
Expand Down Expand Up @@ -576,32 +577,46 @@ public static <T> T getHeaderVal(Header header, T defaultValue, Function<String,
return converter.apply(header.getValue());
}

public boolean shouldRetry(Exception ex, Map<String, Object> requestSettings) {
public boolean shouldRetry(Throwable ex, Map<String, Object> requestSettings) {
Set<ClientFaultCause> retryCauses = (Set<ClientFaultCause>)
requestSettings.getOrDefault("retry_on_failures", defaultRetryCauses);
requestSettings.getOrDefault(ClientConfigProperties.CLIENT_RETRY_ON_FAILURE.getKey(), defaultRetryCauses);

if (retryCauses.contains(ClientFaultCause.None)) {
return false;
}

if (ex instanceof NoHttpResponseException ) {
if (ex instanceof NoHttpResponseException
|| ex.getCause() instanceof NoHttpResponseException) {
return retryCauses.contains(ClientFaultCause.NoHttpResponse);
}

if (ex instanceof ConnectException || ex instanceof ConnectTimeoutException) {
if (ex instanceof ConnectException
|| ex instanceof ConnectTimeoutException
|| ex.getCause() instanceof ConnectException
|| ex.getCause() instanceof ConnectTimeoutException) {
return retryCauses.contains(ClientFaultCause.ConnectTimeout);
}

if (ex instanceof ConnectionRequestTimeoutException) {
if (ex instanceof ConnectionRequestTimeoutException
|| ex.getCause() instanceof ConnectionRequestTimeoutException) {
return retryCauses.contains(ClientFaultCause.ConnectionRequestTimeout);
}

if (ex instanceof SocketTimeoutException
|| ex.getCause() instanceof SocketTimeoutException) {
return retryCauses.contains(ClientFaultCause.SocketTimeout);
}

return false;
}

// This method wraps some client specific exceptions into specific ClientException or just ClientException
// ClientException will be also wrapped
public ClientException wrapException(String message, Exception cause) {
public RuntimeException wrapException(String message, Exception cause) {
if (cause instanceof ClientException || cause instanceof ServerException) {
return (RuntimeException) cause;
}

if (cause instanceof ConnectionRequestTimeoutException ||
cause instanceof NoHttpResponseException ||
cause instanceof ConnectTimeoutException ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,8 @@ public void testServerSettings() {
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Unexpected exception", e);
} finally {
mockServer.stop();
}
}
}
Expand Down Expand Up @@ -1062,6 +1064,59 @@ public void testWithDefaultTimeouts() {
}
}


@Test(groups = { "integration" })
public void testTimeoutsWithRetry() {
if (isCloud()) {
return; // mocked server
}

WireMockServer faultyServer = new WireMockServer( WireMockConfiguration
.options().port(9090).notifier(new ConsoleNotifier(false)));
faultyServer.start();

// First request gets no response
faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl())
.inScenario("Timeout")
.withRequestBody(WireMock.containing("SELECT 1"))
.whenScenarioStateIs(STARTED)
.willSetStateTo("Failed")
.willReturn(WireMock.aResponse()
.withStatus(HttpStatus.SC_OK)
.withFixedDelay(5000)
.withHeader("X-ClickHouse-Summary",
"{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build());

// Second request gets a response (retry)
faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl())
.inScenario("Timeout")
.withRequestBody(WireMock.containing("SELECT 1"))
.whenScenarioStateIs("Failed")
.willSetStateTo("Done")
.willReturn(WireMock.aResponse()
.withStatus(HttpStatus.SC_OK)
.withFixedDelay(1000)
.withHeader("X-ClickHouse-Summary",
"{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build());

try (Client client = new Client.Builder().addEndpoint(Protocol.HTTP, "localhost", faultyServer.port(), false)
.setUsername("default")
.setPassword("")
.setSocketTimeout(3000)
.retryOnFailures(ClientFaultCause.SocketTimeout)
.build()) {
int startTime = (int) System.currentTimeMillis();
try {
client.query("SELECT 1").get();
} catch (Exception e) {
Assert.fail("Elapsed Time: " + (System.currentTimeMillis() - startTime), e);
}
} finally {
faultyServer.stop();
}
}


protected Client.Builder newClient() {
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
boolean isSecure = isCloud();
Expand Down

0 comments on commit 7a15ecb

Please sign in to comment.