Skip to content

Commit e6509c7

Browse files
ruanwenjunruanwenjun
authored andcommitted
Use stmtHandleChangeLock in KyuubiStatement to ensure thread safe access and updates to stmtHandle.
1 parent a1a08e7 commit e6509c7

File tree

3 files changed

+93
-47
lines changed

3 files changed

+93
-47
lines changed

kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable {
8888
private JdbcConnectionParams connParams;
8989
private TTransport transport;
9090
private TCLIService.Iface client;
91-
private boolean isClosed = true;
91+
private volatile boolean isClosed = true;
9292
private SQLWarning warningChain = null;
9393
private TSessionHandle sessHandle = null;
9494
private final List<TProtocolVersion> supportedProtocols = new LinkedList<>();

kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java

Lines changed: 63 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.google.common.annotations.VisibleForTesting;
2121
import java.sql.*;
2222
import java.util.*;
23+
import java.util.concurrent.locks.Lock;
24+
import java.util.concurrent.locks.ReentrantLock;
2325
import org.apache.commons.lang3.StringUtils;
2426
import org.apache.kyuubi.jdbc.hive.adapter.SQLStatement;
2527
import org.apache.kyuubi.jdbc.hive.cli.FetchType;
@@ -40,7 +42,10 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
4042
public static final String DEFAULT_ARROW_TIMESTAMP_AS_STRING = "false";
4143
private final KyuubiConnection connection;
4244
private TCLIService.Iface client;
43-
private TOperationHandle stmtHandle = null;
45+
private volatile TOperationHandle stmtHandle = null;
46+
// This lock must be acquired before modifying or judge stmt
47+
// to ensure there are no concurrent accesses or race conditions.
48+
private final Lock stmtHandleChangeLock = new ReentrantLock();
4449
private final TSessionHandle sessHandle;
4550
Map<String, String> sessConf = new HashMap<>();
4651
private int fetchSize = DEFAULT_FETCH_SIZE;
@@ -67,10 +72,10 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
6772
private SQLWarning warningChain = null;
6873

6974
/** Keep state so we can fail certain calls made after close(). */
70-
private boolean isClosed = false;
75+
private volatile boolean isClosed = false;
7176

7277
/** Keep state so we can fail certain calls made after cancel(). */
73-
private boolean isCancelled = false;
78+
private volatile boolean isCancelled = false;
7479

7580
/** Keep this state so we can know whether the query in this statement is closed. */
7681
private boolean isQueryClosed = false;
@@ -124,23 +129,25 @@ public KyuubiStatement(
124129

125130
@Override
126131
public void cancel() throws SQLException {
127-
checkConnection("cancel");
128-
if (isCancelled) {
129-
return;
130-
}
131-
132132
try {
133+
checkConnection("cancel");
134+
if (isCancelled) {
135+
return;
136+
}
137+
stmtHandleChangeLock.lock();
133138
if (stmtHandle != null) {
134139
TCancelOperationReq cancelReq = new TCancelOperationReq(stmtHandle);
135140
TCancelOperationResp cancelResp = client.CancelOperation(cancelReq);
136141
Utils.verifySuccessWithInfo(cancelResp.getStatus());
137142
}
143+
isCancelled = true;
138144
} catch (SQLException e) {
139145
throw e;
140146
} catch (Exception e) {
141147
throw new KyuubiSQLException(e.toString(), "08S01", e);
148+
} finally {
149+
stmtHandleChangeLock.unlock();
142150
}
143-
isCancelled = true;
144151
}
145152

146153
@Override
@@ -177,13 +184,18 @@ void closeClientOperation() throws SQLException {
177184

178185
@Override
179186
public void close() throws SQLException {
180-
if (isClosed) {
181-
return;
187+
try {
188+
stmtHandleChangeLock.lock();
189+
if (isClosed) {
190+
return;
191+
}
192+
closeClientOperation();
193+
client = null;
194+
closeResultSet();
195+
isClosed = true;
196+
} finally {
197+
stmtHandleChangeLock.unlock();
182198
}
183-
closeClientOperation();
184-
client = null;
185-
closeResultSet();
186-
isClosed = true;
187199
}
188200

189201
@Override
@@ -312,38 +324,43 @@ private void runAsyncOnServer(String sql) throws SQLException {
312324
}
313325

314326
private void runAsyncOnServer(String sql, Map<String, String> confOneTime) throws SQLException {
315-
checkConnection("execute");
316-
317-
reInitState();
318-
319-
TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
320-
/**
321-
* Run asynchronously whenever possible Currently only a SQLOperation can be run asynchronously,
322-
* in a background operation thread Compilation can run asynchronously or synchronously and
323-
* execution run asynchronously
324-
*/
325-
execReq.setRunAsync(true);
326-
if (confOneTime != null) {
327-
Map<String, String> confOverlay = new HashMap<String, String>(sessConf);
328-
confOverlay.putAll(confOneTime);
329-
execReq.setConfOverlay(confOverlay);
330-
} else {
331-
execReq.setConfOverlay(sessConf);
332-
}
333-
execReq.setQueryTimeout(queryTimeout);
327+
stmtHandleChangeLock.lock();
334328
try {
335-
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
336-
Utils.verifySuccessWithInfo(execResp.getStatus());
337-
stmtHandle = execResp.getOperationHandle();
338-
isExecuteStatementFailed = false;
339-
} catch (SQLException eS) {
340-
isExecuteStatementFailed = true;
341-
isLogBeingGenerated = false;
342-
throw eS;
343-
} catch (Exception ex) {
344-
isExecuteStatementFailed = true;
345-
isLogBeingGenerated = false;
346-
throw new KyuubiSQLException(ex.toString(), "08S01", ex);
329+
checkConnection("execute");
330+
331+
reInitState();
332+
333+
TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
334+
/**
335+
* Run asynchronously whenever possible Currently only a SQLOperation can be run
336+
* asynchronously, in a background operation thread Compilation can run asynchronously or
337+
* synchronously and execution run asynchronously
338+
*/
339+
execReq.setRunAsync(true);
340+
if (confOneTime != null) {
341+
Map<String, String> confOverlay = new HashMap<>(sessConf);
342+
confOverlay.putAll(confOneTime);
343+
execReq.setConfOverlay(confOverlay);
344+
} else {
345+
execReq.setConfOverlay(sessConf);
346+
}
347+
execReq.setQueryTimeout(queryTimeout);
348+
try {
349+
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
350+
Utils.verifySuccessWithInfo(execResp.getStatus());
351+
stmtHandle = execResp.getOperationHandle();
352+
isExecuteStatementFailed = false;
353+
} catch (SQLException eS) {
354+
isExecuteStatementFailed = true;
355+
isLogBeingGenerated = false;
356+
throw eS;
357+
} catch (Exception ex) {
358+
isExecuteStatementFailed = true;
359+
isLogBeingGenerated = false;
360+
throw new KyuubiSQLException(ex.toString(), "08S01", ex);
361+
}
362+
} finally {
363+
stmtHandleChangeLock.unlock();
347364
}
348365
}
349366

kyuubi-hive-jdbc/src/test/java/org/apache/kyuubi/jdbc/hive/KyuubiStatementTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818
package org.apache.kyuubi.jdbc.hive;
1919

2020
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertThrows;
2122

2223
import java.sql.SQLException;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
26+
import org.junit.Assert;
2327
import org.junit.Test;
2428

2529
public class KyuubiStatementTest {
@@ -54,4 +58,29 @@ public void testaddBatch() throws SQLException {
5458
stmt.addBatch(null);
5559
}
5660
}
61+
62+
@Test
63+
public void testThrowKyuubiSQLExceptionWhenExecuteSqlOnClosedStmt() throws SQLException {
64+
KyuubiStatement stmt = new KyuubiStatement(null, null, null);
65+
try {
66+
ExecutorService executorService = Executors.newFixedThreadPool(2);
67+
executorService.submit(
68+
() -> {
69+
try {
70+
stmt.close();
71+
} catch (SQLException e) {
72+
throw new RuntimeException(e);
73+
}
74+
});
75+
executorService.submit(
76+
() -> {
77+
Assert.assertEquals(
78+
"Can't exectue after statement has been closed",
79+
assertThrows(KyuubiSQLException.class, () -> stmt.execute("SELECT 1"))
80+
.getMessage());
81+
});
82+
} finally {
83+
stmt.close();
84+
}
85+
}
5786
}

0 commit comments

Comments
 (0)