Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjusting cancel and ensure close #2046

Merged
merged 7 commits into from
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.io.Reader;
import java.math.BigDecimal;
import java.net.MalformedURLException;
import java.net.SocketException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.sql.*;
Expand Down Expand Up @@ -68,23 +69,34 @@ public boolean next() throws SQLException {
@Override
public void close() throws SQLException {
closed = true;
if (reader != null) {
try {
reader.close();
} catch (Exception e) {
throw ExceptionUtils.toSqlState(e);
}

reader = null;
Exception e = null;
try {
if (reader != null) {
try {
reader.close();
} catch (Exception re) {
log.debug("Error closing reader", re);
e = re;
} finally {
reader = null;
}
}
} finally {
if (response != null) {
try {
response.close();
} catch (Exception re) {
log.debug("Error closing response", re);
e = re;
} finally {
response = null;
}
}
}

if (response != null) {
try {
response.close();
} catch (Exception e) {
throw ExceptionUtils.toSqlState(e);
}
response = null;
if (e != null) {
throw ExceptionUtils.toSqlState(e);
}
}

Expand Down
38 changes: 29 additions & 9 deletions jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

public class StatementImpl implements Statement, JdbcV2Wrapper {
Expand All @@ -34,7 +35,7 @@ public class StatementImpl implements Statement, JdbcV2Wrapper {
private OperationMetrics metrics;
private List<String> batch;
private String lastSql;
private String lastQueryId;
private volatile String lastQueryId;
private String schema;
private int maxRows;

Expand Down Expand Up @@ -146,6 +147,14 @@ public ResultSetImpl executeQuery(String sql, QuerySettings settings) throws SQL
checkClosed();
QuerySettings mergedSettings = QuerySettings.merge(connection.getDefaultQuerySettings(), settings);

if (mergedSettings.getQueryId() != null) {
lastQueryId = mergedSettings.getQueryId();
} else {
lastQueryId = UUID.randomUUID().toString();
mergedSettings.setQueryId(lastQueryId);
}
LOG.debug("Query ID: {}", lastQueryId);

try {
lastSql = parseJdbcEscapeSyntax(sql);
QueryResponse response;
Expand All @@ -169,7 +178,6 @@ public ResultSetImpl executeQuery(String sql, QuerySettings settings) throws SQL
}
currentResultSet = new ResultSetImpl(this, response, reader);
metrics = response.getMetrics();
lastQueryId = response.getQueryId();
} catch (Exception e) {
throw ExceptionUtils.toSqlState(e);
}
Expand All @@ -193,6 +201,13 @@ public int executeUpdate(String sql, QuerySettings settings) throws SQLException

QuerySettings mergedSettings = QuerySettings.merge(connection.getDefaultQuerySettings(), settings);

if (mergedSettings.getQueryId() != null) {
lastQueryId = mergedSettings.getQueryId();
} else {
lastQueryId = UUID.randomUUID().toString();
mergedSettings.setQueryId(lastQueryId);
}

lastSql = parseJdbcEscapeSyntax(sql);
int updateCount = 0;
try (QueryResponse response = queryTimeout == 0 ? connection.client.query(lastSql, mergedSettings).get()
Expand All @@ -212,8 +227,13 @@ public int executeUpdate(String sql, QuerySettings settings) throws SQLException
public void close() throws SQLException {
closed = true;
if (currentResultSet != null) {
currentResultSet.close();
currentResultSet = null;
try {
currentResultSet.close();
} catch (Exception e) {
LOG.debug("Failed to close current result set", e);
} finally {
currentResultSet = null;
}
}
}

Expand Down Expand Up @@ -267,10 +287,10 @@ public void cancel() throws SQLException {
return;
}

try {
connection.client.query(String.format("KILL QUERY%sWHERE query_id = '%s'",
connection.onCluster ? " ON CLUSTER " + connection.cluster + " " : " ",
lastQueryId), connection.getDefaultQuerySettings()).get();
try (QueryResponse response = connection.client.query(String.format("KILL QUERY%sWHERE query_id = '%s'",
connection.onCluster ? " ON CLUSTER " + connection.cluster + " " : " ",
lastQueryId), connection.getDefaultQuerySettings()).get()){
Paultagoras marked this conversation as resolved.
Show resolved Hide resolved
LOG.debug("Query {} was killed by {}", lastQueryId, response.getQueryId());
} catch (Exception e) {
throw new SQLException(e);
}
Expand Down Expand Up @@ -298,7 +318,7 @@ public boolean execute(String sql) throws SQLException {
return execute(sql, new QuerySettings().setDatabase(schema));
}

private boolean execute(String sql, QuerySettings settings) throws SQLException {
public boolean execute(String sql, QuerySettings settings) throws SQLException {
checkClosed();
StatementType type = parseStatementType(sql);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,12 @@ public static SQLException toSqlState(String message, Exception cause) {

return new SQLException(exceptionMessage, SQL_STATE_CLIENT_ERROR, cause);//Default
}

public static Throwable getRootCause(Throwable throwable) {
Throwable cause = throwable;
while (cause.getCause() != null) {
cause = cause.getCause();
}
return cause;
}
}
36 changes: 36 additions & 0 deletions jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import com.clickhouse.client.api.ClientConfigProperties;
import com.clickhouse.client.api.query.GenericRecord;
import com.clickhouse.client.api.query.QuerySettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import com.clickhouse.data.ClickHouseVersion;
import org.apache.commons.lang3.RandomStringUtils;
import org.testng.annotations.Test;
Expand All @@ -18,6 +22,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand All @@ -27,6 +32,8 @@


public class StatementTest extends JdbcIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(StatementTest.class);

@Test(groups = { "integration" })
public void testExecuteQuerySimpleNumbers() throws Exception {
try (Connection conn = getJdbcConnection()) {
Expand Down Expand Up @@ -492,4 +499,33 @@ public void testConnectionExhaustion() throws Exception {
}
}
}

@Test(groups = { "integration" })
public void testConcurrentCancel() throws Exception {
int maxNumConnections = 3;

try (Connection conn = getJdbcConnection()) {
try (StatementImpl stmt = (StatementImpl) conn.createStatement()) {
Paultagoras marked this conversation as resolved.
Show resolved Hide resolved
stmt.executeQuery("SELECT number FROM system.numbers LIMIT 10000000000");
stmt.cancel();
}
for (int i = 0; i < maxNumConnections; i++) {
try (StatementImpl stmt = (StatementImpl) conn.createStatement()) {
final int threadNum = i;
log.info("Starting thread {}", threadNum);
new Thread(() -> {
try {
ResultSet rs = stmt.executeQuery("SELECT number FROM system.numbers LIMIT 10000000000");
rs.next();
log.info(rs.getObject(1).toString());
} catch (SQLException e) {
log.error("Error in thread {}", threadNum, e);
}
}).start();

stmt.cancel();
}
}
}
}
}
Loading