diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index 6955d41cc..d2e364d0f 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -63,6 +63,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.ConnectException; +import java.net.SocketTimeoutException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -1336,7 +1337,7 @@ public CompletableFuture insert(String tableName, List data, // Selecting some node ClickHouseNode selectedNode = getNextAliveNode(); - ClientException lastException = null; + RuntimeException lastException = null; for (int i = 0; i <= maxRetries; i++) { // Execute request try (ClassicHttpResponse httpResponse = @@ -1376,19 +1377,17 @@ public CompletableFuture insert(String tableName, List data, metrics.operationComplete(); metrics.setQueryId(queryId); return new InsertResponse(metrics); - } catch (NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException | ConnectException e) { - lastException = httpClientHelper.wrapException("Insert request initiation failed", e); + } catch (Exception e) { + lastException = httpClientHelper.wrapException("Query request failed (Attempt " + (i + 1) + "/" + (maxRetries + 1) + ")", e); if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) { - LOG.warn("Retrying", e); + LOG.warn("Retrying.", e); selectedNode = getNextAliveNode(); } else { throw lastException; } - } catch (IOException e) { - throw new ClientException("Insert request failed", e); } } - throw new ClientException("Insert request failed after retries", lastException); + throw new ClientException("Insert request failed after attempts: " + (maxRetries + 1), lastException); }; return runAsyncOperation(supplier, settings.getAllSettings()); @@ -1513,7 +1512,7 @@ public CompletableFuture insert(String tableName, // Selecting some node ClickHouseNode selectedNode = getNextAliveNode(); - ClientException lastException = null; + RuntimeException lastException = null; for (int i = 0; i <= maxRetries; i++) { // Execute request try (ClassicHttpResponse httpResponse = @@ -1538,16 +1537,14 @@ public CompletableFuture insert(String tableName, metrics.operationComplete(); metrics.setQueryId(queryId); return new InsertResponse(metrics); - } catch (NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException | ConnectException e) { - lastException = httpClientHelper.wrapException("Insert request initiation failed", e); + } catch (Exception e) { + lastException = httpClientHelper.wrapException("Query request failed (Attempt " + (i + 1) + "/" + (maxRetries + 1) + ")", e); if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) { - LOG.warn("Retrying", e); + LOG.warn("Retrying.", e); selectedNode = getNextAliveNode(); } else { throw lastException; } - } catch (IOException e) { - throw new ClientException("Insert request failed", e); } if (i < maxRetries) { @@ -1558,7 +1555,7 @@ public CompletableFuture insert(String tableName, } } } - throw new ClientException("Insert request failed after retries", lastException); + throw new ClientException("Insert request failed after attempts: " + (maxRetries + 1), lastException); }; } else { responseSupplier = () -> { @@ -1672,7 +1669,7 @@ public CompletableFuture query(String sqlQuery, Map { // Selecting some node ClickHouseNode selectedNode = getNextAliveNode(); - ClientException lastException = null; + RuntimeException lastException = null; for (int i = 0; i <= maxRetries; i++) { try { ClassicHttpResponse httpResponse = @@ -1699,22 +1696,18 @@ public CompletableFuture query(String sqlQuery, Map request = oldClient.read(getServerNode()); diff --git a/client-v2/src/main/java/com/clickhouse/client/api/ClientFaultCause.java b/client-v2/src/main/java/com/clickhouse/client/api/ClientFaultCause.java index 8d66f8136..d0e4f3212 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/ClientFaultCause.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/ClientFaultCause.java @@ -7,4 +7,5 @@ public enum ClientFaultCause { NoHttpResponse, ConnectTimeout, ConnectionRequestTimeout, + SocketTimeout, } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 9dc5c648d..47073da96 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -60,6 +60,7 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.NoRouteToHostException; +import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; @@ -108,7 +109,7 @@ public HttpAPIClientHelper(Map configuration) { boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true"); LOG.info("client compression: {}, server compression: {}, http compression: {}", usingClientCompression, usingServerCompression, useHttpCompression); - defaultRetryCauses = SerializerUtils.parseEnumList(chConfiguration.get("client_retry_on_failures"), ClientFaultCause.class); + defaultRetryCauses = SerializerUtils.parseEnumList(chConfiguration.get(ClientConfigProperties.CLIENT_RETRY_ON_FAILURE.getKey()), ClientFaultCause.class); if (defaultRetryCauses.contains(ClientFaultCause.None)) { defaultRetryCauses.removeIf(c -> c != ClientFaultCause.None); } @@ -398,7 +399,7 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map T getHeaderVal(Header header, T defaultValue, Function requestSettings) { + public boolean shouldRetry(Throwable ex, Map requestSettings) { Set retryCauses = (Set) - requestSettings.getOrDefault("retry_on_failures", defaultRetryCauses); + requestSettings.getOrDefault(ClientConfigProperties.CLIENT_RETRY_ON_FAILURE.getKey(), defaultRetryCauses); if (retryCauses.contains(ClientFaultCause.None)) { return false; } - if (ex instanceof NoHttpResponseException ) { + if (ex instanceof NoHttpResponseException + || ex.getCause() instanceof NoHttpResponseException) { return retryCauses.contains(ClientFaultCause.NoHttpResponse); } - if (ex instanceof ConnectException || ex instanceof ConnectTimeoutException) { + if (ex instanceof ConnectException + || ex instanceof ConnectTimeoutException + || ex.getCause() instanceof ConnectException + || ex.getCause() instanceof ConnectTimeoutException) { return retryCauses.contains(ClientFaultCause.ConnectTimeout); } - if (ex instanceof ConnectionRequestTimeoutException) { + if (ex instanceof ConnectionRequestTimeoutException + || ex.getCause() instanceof ConnectionRequestTimeoutException) { return retryCauses.contains(ClientFaultCause.ConnectionRequestTimeout); } + if (ex instanceof SocketTimeoutException + || ex.getCause() instanceof SocketTimeoutException) { + return retryCauses.contains(ClientFaultCause.SocketTimeout); + } + return false; } // This method wraps some client specific exceptions into specific ClientException or just ClientException // ClientException will be also wrapped - public ClientException wrapException(String message, Exception cause) { + public RuntimeException wrapException(String message, Exception cause) { + if (cause instanceof ClientException || cause instanceof ServerException) { + return (RuntimeException) cause; + } + if (cause instanceof ConnectionRequestTimeoutException || cause instanceof NoHttpResponseException || cause instanceof ConnectTimeoutException || diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index 21a392847..7d2527cff 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -607,6 +607,8 @@ public void testServerSettings() { } catch (Exception e) { e.printStackTrace(); Assert.fail("Unexpected exception", e); + } finally { + mockServer.stop(); } } } @@ -1062,6 +1064,59 @@ public void testWithDefaultTimeouts() { } } + + @Test(groups = { "integration" }) + public void testTimeoutsWithRetry() { + if (isCloud()) { + return; // mocked server + } + + WireMockServer faultyServer = new WireMockServer( WireMockConfiguration + .options().port(9090).notifier(new ConsoleNotifier(false))); + faultyServer.start(); + + // First request gets no response + faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .inScenario("Timeout") + .withRequestBody(WireMock.containing("SELECT 1")) + .whenScenarioStateIs(STARTED) + .willSetStateTo("Failed") + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_OK) + .withFixedDelay(5000) + .withHeader("X-ClickHouse-Summary", + "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build()); + + // Second request gets a response (retry) + faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .inScenario("Timeout") + .withRequestBody(WireMock.containing("SELECT 1")) + .whenScenarioStateIs("Failed") + .willSetStateTo("Done") + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_OK) + .withFixedDelay(1000) + .withHeader("X-ClickHouse-Summary", + "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build()); + + try (Client client = new Client.Builder().addEndpoint(Protocol.HTTP, "localhost", faultyServer.port(), false) + .setUsername("default") + .setPassword("") + .setSocketTimeout(3000) + .retryOnFailures(ClientFaultCause.SocketTimeout) + .build()) { + int startTime = (int) System.currentTimeMillis(); + try { + client.query("SELECT 1").get(); + } catch (Exception e) { + Assert.fail("Elapsed Time: " + (System.currentTimeMillis() - startTime), e); + } + } finally { + faultyServer.stop(); + } + } + + protected Client.Builder newClient() { ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); boolean isSecure = isCloud();