From e90511824f322146bce1a5ad77ef9609e165aa40 Mon Sep 17 00:00:00 2001 From: Paultagoras Date: Fri, 20 Dec 2024 16:34:24 -0500 Subject: [PATCH 1/6] Adjusting cancel and ensure close --- .../com/clickhouse/jdbc/ResultSetImpl.java | 40 ++++++++++++------- .../com/clickhouse/jdbc/StatementImpl.java | 19 +++++---- .../jdbc/internal/ExceptionUtils.java | 8 ++++ 3 files changed, 46 insertions(+), 21 deletions(-) diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java index b71f4fa62..290c2499f 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java @@ -6,6 +6,7 @@ import java.io.Reader; import java.math.BigDecimal; import java.net.MalformedURLException; +import java.net.SocketException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.sql.*; @@ -68,23 +69,34 @@ public boolean next() throws SQLException { @Override public void close() throws SQLException { closed = true; - if (reader != null) { - try { - reader.close(); - } catch (Exception e) { - throw ExceptionUtils.toSqlState(e); - } - reader = null; + Exception e = null; + try { + if (reader != null) { + try { + reader.close(); + } catch (Exception re) { + log.debug("Error closing reader", re); + e = re; + } finally { + reader = null; + } + } + } finally { + if (response != null) { + try { + response.close(); + } catch (Exception re) { + log.debug("Error closing response", re); + e = re; + } finally { + response = null; + } + } } - if (response != null) { - try { - response.close(); - } catch (Exception e) { - throw ExceptionUtils.toSqlState(e); - } - response = null; + if (e != null) { + throw ExceptionUtils.toSqlState(e); } } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java index 5484db68d..18bc8cfdd 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java @@ -32,7 +32,7 @@ public class StatementImpl implements Statement, JdbcV2Wrapper { private OperationMetrics metrics; private List batch; private String lastSql; - private String lastQueryId; + private volatile String lastQueryId; private String schema; private int maxRows; @@ -207,8 +207,13 @@ public int executeUpdate(String sql, QuerySettings settings) throws SQLException public void close() throws SQLException { closed = true; if (currentResultSet != null) { - currentResultSet.close(); - currentResultSet = null; + try { + currentResultSet.close(); + } catch (Exception e) { + LOG.debug("Failed to close current result set", e); + } finally { + currentResultSet = null; + } } } @@ -262,10 +267,10 @@ public void cancel() throws SQLException { return; } - try { - connection.client.query(String.format("KILL QUERY%sWHERE query_id = '%s'", - connection.onCluster ? " ON CLUSTER " + connection.cluster + " " : " ", - lastQueryId), connection.getDefaultQuerySettings()).get(); + try (QueryResponse response = connection.client.query(String.format("KILL QUERY%sWHERE query_id = '%s'", + connection.onCluster ? " ON CLUSTER " + connection.cluster + " " : " ", + lastQueryId), connection.getDefaultQuerySettings()).get()){ + LOG.debug("Query {} was killed by {}", lastQueryId, response.getQueryId()); } catch (Exception e) { throw new SQLException(e); } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/ExceptionUtils.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/ExceptionUtils.java index 58bbc54fa..82cd00180 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/ExceptionUtils.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/ExceptionUtils.java @@ -61,4 +61,12 @@ public static SQLException toSqlState(String message, Exception cause) { return new SQLException(exceptionMessage, SQL_STATE_CLIENT_ERROR, cause);//Default } + + public static Throwable getRootCause(Throwable throwable) { + Throwable cause = throwable; + while (cause.getCause() != null) { + cause = cause.getCause(); + } + return cause; + } } From 2c015320a7347e5b2e944de828bb89184b3faeef Mon Sep 17 00:00:00 2001 From: Paultagoras Date: Fri, 20 Dec 2024 17:56:37 -0500 Subject: [PATCH 2/6] Update StatementImpl.java --- .../java/com/clickhouse/jdbc/StatementImpl.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java index 18bc8cfdd..ead3d8433 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.concurrent.TimeUnit; public class StatementImpl implements Statement, JdbcV2Wrapper { @@ -143,6 +144,13 @@ public ResultSetImpl executeQuery(String sql, QuerySettings settings) throws SQL checkClosed(); QuerySettings mergedSettings = QuerySettings.merge(connection.getDefaultQuerySettings(), settings); + if (settings.getQueryId() != null) { + lastQueryId = settings.getQueryId(); + } else { + lastQueryId = UUID.randomUUID().toString(); + settings.setQueryId(lastQueryId); + } + try { lastSql = parseJdbcEscapeSyntax(sql); QueryResponse response; @@ -166,7 +174,6 @@ public ResultSetImpl executeQuery(String sql, QuerySettings settings) throws SQL } currentResultSet = new ResultSetImpl(this, response, reader); metrics = response.getMetrics(); - lastQueryId = response.getQueryId(); } catch (Exception e) { throw ExceptionUtils.toSqlState(e); } @@ -190,6 +197,13 @@ public int executeUpdate(String sql, QuerySettings settings) throws SQLException QuerySettings mergedSettings = QuerySettings.merge(connection.getDefaultQuerySettings(), settings); + if (settings.getQueryId() != null) { + lastQueryId = settings.getQueryId(); + } else { + lastQueryId = UUID.randomUUID().toString(); + settings.setQueryId(lastQueryId); + } + lastSql = parseJdbcEscapeSyntax(sql); try (QueryResponse response = queryTimeout == 0 ? connection.client.query(lastSql, mergedSettings).get() : connection.client.query(lastSql, mergedSettings).get(queryTimeout, TimeUnit.SECONDS)) { From af70e859c6e077189b18422e72e298fc966df128 Mon Sep 17 00:00:00 2001 From: Paultagoras Date: Fri, 20 Dec 2024 18:15:38 -0500 Subject: [PATCH 3/6] Update StatementImpl.java --- jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java index ead3d8433..9d7e06d15 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java @@ -150,6 +150,7 @@ public ResultSetImpl executeQuery(String sql, QuerySettings settings) throws SQL lastQueryId = UUID.randomUUID().toString(); settings.setQueryId(lastQueryId); } + LOG.debug("Query ID: {}", lastQueryId); try { lastSql = parseJdbcEscapeSyntax(sql); @@ -312,7 +313,7 @@ public boolean execute(String sql) throws SQLException { return execute(sql, new QuerySettings().setDatabase(schema)); } - private boolean execute(String sql, QuerySettings settings) throws SQLException { + public boolean execute(String sql, QuerySettings settings) throws SQLException { checkClosed(); StatementType type = parseStatementType(sql); From 202b0a49651221c3ad1a49db48f6078dbe56fd65 Mon Sep 17 00:00:00 2001 From: Paultagoras Date: Fri, 20 Dec 2024 22:09:05 -0500 Subject: [PATCH 4/6] Update StatementTest.java --- .../com/clickhouse/jdbc/StatementTest.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java index 314e7be92..bbc3407ac 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java @@ -3,6 +3,9 @@ import com.clickhouse.client.api.Client; import com.clickhouse.client.api.ClientConfigProperties; import com.clickhouse.client.api.query.GenericRecord; +import com.clickhouse.client.api.query.QuerySettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.Test; @@ -18,6 +21,7 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.TimeUnit; import static org.testng.Assert.*; @@ -29,6 +33,8 @@ public class StatementTest extends JdbcIntegrationTest { + private static final Logger log = LoggerFactory.getLogger(StatementTest.class); + @Test(groups = { "integration" }) public void testExecuteQuerySimpleNumbers() throws Exception { try (Connection conn = getJdbcConnection()) { @@ -471,4 +477,28 @@ public void testConnectionExhaustion() throws Exception { } } } + + @Test(groups = { "integration" }) + public void testConcurrentCancel() throws Exception { + try (Connection conn = getJdbcConnection()) { + try (StatementImpl stmt = (StatementImpl) conn.createStatement()) { + stmt.executeQuery("SELECT number FROM system.numbers LIMIT 10000000000"); + stmt.cancel(); + } + try (StatementImpl stmt = (StatementImpl) conn.createStatement()) { + new Thread(() -> { + try { + ResultSet rs = stmt.executeQuery("SELECT number FROM system.numbers LIMIT 10000000000"); + rs.next(); + log.info(rs.getObject(1).toString()); + } catch (SQLException e) { + log.error("Error in thread", e); + } + }).start(); + + Thread.sleep(1000); + stmt.cancel(); + } + } + } } From 3459dccdc0a236c693c4c7a93acdf90ebc863c02 Mon Sep 17 00:00:00 2001 From: Paultagoras Date: Sat, 21 Dec 2024 13:51:29 -0500 Subject: [PATCH 5/6] Update StatementImpl.java --- .../main/java/com/clickhouse/jdbc/StatementImpl.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java index 9d7e06d15..0647aede7 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java @@ -144,11 +144,11 @@ public ResultSetImpl executeQuery(String sql, QuerySettings settings) throws SQL checkClosed(); QuerySettings mergedSettings = QuerySettings.merge(connection.getDefaultQuerySettings(), settings); - if (settings.getQueryId() != null) { - lastQueryId = settings.getQueryId(); + if (mergedSettings.getQueryId() != null) { + lastQueryId = mergedSettings.getQueryId(); } else { lastQueryId = UUID.randomUUID().toString(); - settings.setQueryId(lastQueryId); + mergedSettings.setQueryId(lastQueryId); } LOG.debug("Query ID: {}", lastQueryId); @@ -198,11 +198,11 @@ public int executeUpdate(String sql, QuerySettings settings) throws SQLException QuerySettings mergedSettings = QuerySettings.merge(connection.getDefaultQuerySettings(), settings); - if (settings.getQueryId() != null) { - lastQueryId = settings.getQueryId(); + if (mergedSettings.getQueryId() != null) { + lastQueryId = mergedSettings.getQueryId(); } else { lastQueryId = UUID.randomUUID().toString(); - settings.setQueryId(lastQueryId); + mergedSettings.setQueryId(lastQueryId); } lastSql = parseJdbcEscapeSyntax(sql); From 9208ba9428dc6faf249919dd6605adac10118641 Mon Sep 17 00:00:00 2001 From: Paultagoras Date: Sat, 21 Dec 2024 14:05:10 -0500 Subject: [PATCH 6/6] Update StatementTest.java --- .../com/clickhouse/jdbc/StatementTest.java | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java index bbc3407ac..832e13d6c 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java @@ -480,24 +480,29 @@ public void testConnectionExhaustion() throws Exception { @Test(groups = { "integration" }) public void testConcurrentCancel() throws Exception { + int maxNumConnections = 3; + try (Connection conn = getJdbcConnection()) { try (StatementImpl stmt = (StatementImpl) conn.createStatement()) { stmt.executeQuery("SELECT number FROM system.numbers LIMIT 10000000000"); stmt.cancel(); } - try (StatementImpl stmt = (StatementImpl) conn.createStatement()) { - new Thread(() -> { - try { - ResultSet rs = stmt.executeQuery("SELECT number FROM system.numbers LIMIT 10000000000"); - rs.next(); - log.info(rs.getObject(1).toString()); - } catch (SQLException e) { - log.error("Error in thread", e); - } - }).start(); - - Thread.sleep(1000); - stmt.cancel(); + for (int i = 0; i < maxNumConnections; i++) { + try (StatementImpl stmt = (StatementImpl) conn.createStatement()) { + final int threadNum = i; + log.info("Starting thread {}", threadNum); + new Thread(() -> { + try { + ResultSet rs = stmt.executeQuery("SELECT number FROM system.numbers LIMIT 10000000000"); + rs.next(); + log.info(rs.getObject(1).toString()); + } catch (SQLException e) { + log.error("Error in thread {}", threadNum, e); + } + }).start(); + + stmt.cancel(); + } } } }