Skip to content

Commit ac941a5

Browse files
committed
[FLINK-38516][Table SQL/Gateway] Add config for read-only sql gateway
1 parent 58035ea commit ac941a5

File tree

4 files changed

+97
-4
lines changed

4 files changed

+97
-4
lines changed

docs/content/docs/dev/table/sql-gateway/overview.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,12 @@ $ ./sql-gateway -Dkey=value
250250
<td>Duration</td>
251251
<td>Keepalive time for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval.</td>
252252
</tr>
253+
<tr>
254+
<td><h5>sql-gateway.read-only</h5></td>
255+
<td style="word-wrap: break-word;">false</td>
256+
<td>Boolean</td>
257+
<td>When enabled, the SQL Gateway operates in read-only mode and will reject all modify operations.</td>
258+
</tr>
253259
<tr>
254260
<td><h5>sql-gateway.worker.threads.max</h5></td>
255261
<td style="word-wrap: break-word;">500</td>

flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/config/SqlGatewayServiceConfigOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,11 @@ public class SqlGatewayServiceConfigOptions {
9797
.withDescription(
9898
"Keepalive time for an idle worker thread. When the number of workers exceeds min workers, "
9999
+ "excessive threads are killed after this time interval.");
100+
101+
public static final ConfigOption<Boolean> SQL_GATEWAY_READ_ONLY_MODE =
102+
key("sql-gateway.read-only")
103+
.booleanType()
104+
.defaultValue(false)
105+
.withDescription(
106+
"When enabled, the SQL Gateway operates in read-only mode and will reject all modify operations.");
100107
}

flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.flink.table.factories.PlannerFactoryUtil;
6161
import org.apache.flink.table.functions.FunctionDefinition;
6262
import org.apache.flink.table.functions.FunctionIdentifier;
63+
import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions;
6364
import org.apache.flink.table.gateway.api.operation.OperationHandle;
6465
import org.apache.flink.table.gateway.api.results.FunctionInfo;
6566
import org.apache.flink.table.gateway.api.results.TableInfo;
@@ -520,13 +521,13 @@ private ResultFetcher executeOperation(
520521
"No Statement Set to submit. 'END' statement should be used after 'BEGIN STATEMENT SET'.");
521522
} else if (op instanceof ModifyOperation) {
522523
return callModifyOperations(
523-
tableEnv, handle, Collections.singletonList((ModifyOperation) op));
524+
tableEnv, handle, Collections.singletonList((ModifyOperation) op), statement);
524525
} else if (op instanceof CompileAndExecutePlanOperation
525526
|| op instanceof ExecutePlanOperation) {
526527
return callExecuteOperation(tableEnv, handle, op);
527528
} else if (op instanceof StatementSetOperation) {
528529
return callModifyOperations(
529-
tableEnv, handle, ((StatementSetOperation) op).getOperations());
530+
tableEnv, handle, ((StatementSetOperation) op).getOperations(), statement);
530531
} else if (op instanceof QueryOperation) {
531532
TableResultInternal result =
532533
cachedPlan != null
@@ -673,14 +674,30 @@ private ResultFetcher callEndStatementSetOperation(
673674
// there's no statement in the statement set, skip submitting
674675
return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
675676
} else {
676-
return callModifyOperations(tableEnv, handle, statementSetOperations);
677+
return callModifyOperations(tableEnv, handle, statementSetOperations, null);
677678
}
678679
}
679680

680681
private ResultFetcher callModifyOperations(
681682
TableEnvironmentInternal tableEnv,
682683
OperationHandle handle,
683-
List<ModifyOperation> modifyOperations) {
684+
List<ModifyOperation> modifyOperations,
685+
String originalSql) {
686+
// Check if SQL Gateway is in read-only mode
687+
Configuration configuration = sessionContext.getSessionConf().clone();
688+
configuration.addAll(executionConfig);
689+
boolean isReadOnlyMode =
690+
configuration.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_READ_ONLY_MODE);
691+
692+
if (isReadOnlyMode) {
693+
String errorMessage =
694+
(originalSql != null && !originalSql.trim().isEmpty())
695+
? String.format(
696+
"The following statement is not allowed: %s", originalSql)
697+
: "Modification operations are not allowed.";
698+
throw new SqlExecutionException("SQL Gateway is in read-only mode. " + errorMessage);
699+
}
700+
684701
TableResultInternal result = tableEnv.executeInternal(modifyOperations);
685702
// DeleteFromFilterOperation doesn't have a JobClient
686703
if (modifyOperations.size() == 1

flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
111111
import static org.apache.flink.table.functions.FunctionKind.OTHER;
112112
import static org.apache.flink.table.functions.FunctionKind.SCALAR;
113+
import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_READ_ONLY_MODE;
113114
import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD;
114115
import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination;
115116
import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.createInitializedSession;
@@ -1048,6 +1049,68 @@ void testGetOperationSchemaWhenOperationGetError() throws Exception {
10481049
.satisfies(anyCauseMatches(SqlGatewayException.class, msg)));
10491050
}
10501051

1052+
@Test
1053+
void testReadOnlyModeWithModificationOperations() {
1054+
Configuration config = new Configuration(MINI_CLUSTER.getClientConfiguration());
1055+
config.set(SQL_GATEWAY_READ_ONLY_MODE, true);
1056+
1057+
SessionEnvironment sessionEnvironment =
1058+
SessionEnvironment.newBuilder()
1059+
.setSessionEndpointVersion(MockedEndpointVersion.V1)
1060+
.build();
1061+
1062+
SessionHandle sessionHandle = service.openSession(sessionEnvironment);
1063+
String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
1064+
String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
1065+
1066+
service.executeStatement(sessionHandle, sourceDdl, -1, config);
1067+
service.executeStatement(sessionHandle, sinkDdl, -1, config);
1068+
1069+
OperationHandle insertOperationHandle =
1070+
service.executeStatement(
1071+
sessionHandle, "INSERT INTO sink SELECT * FROM source;", -1, config);
1072+
1073+
assertThatThrownBy(() -> fetchAllResults(service, sessionHandle, insertOperationHandle))
1074+
.satisfies(
1075+
anyCauseMatches(
1076+
SqlExecutionException.class,
1077+
"SQL Gateway is in read-only mode. The following statement is not allowed: INSERT INTO sink SELECT * FROM source"));
1078+
}
1079+
1080+
@Test
1081+
void testReadOnlyModeWithStatementSet() {
1082+
Configuration config = new Configuration(MINI_CLUSTER.getClientConfiguration());
1083+
config.set(SQL_GATEWAY_READ_ONLY_MODE, true);
1084+
1085+
SessionEnvironment sessionEnvironment =
1086+
SessionEnvironment.newBuilder()
1087+
.setSessionEndpointVersion(MockedEndpointVersion.V1)
1088+
.build();
1089+
1090+
SessionHandle sessionHandle = service.openSession(sessionEnvironment);
1091+
String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
1092+
String sinkDdl1 = "CREATE TABLE sink1 (a STRING) WITH ('connector'='blackhole');";
1093+
String sinkDdl2 = "CREATE TABLE sink2 (a STRING) WITH ('connector'='blackhole');";
1094+
1095+
service.executeStatement(sessionHandle, sourceDdl, -1, config);
1096+
service.executeStatement(sessionHandle, sinkDdl1, -1, config);
1097+
service.executeStatement(sessionHandle, sinkDdl2, -1, config);
1098+
1099+
service.executeStatement(sessionHandle, "BEGIN STATEMENT SET", -1, config);
1100+
service.executeStatement(
1101+
sessionHandle, "INSERT INTO sink1 SELECT * FROM source", -1, config);
1102+
service.executeStatement(
1103+
sessionHandle, "INSERT INTO sink2 SELECT * FROM source", -1, config);
1104+
1105+
OperationHandle endOpHandle = service.executeStatement(sessionHandle, "END", -1, config);
1106+
1107+
assertThatThrownBy(() -> fetchAllResults(service, sessionHandle, endOpHandle))
1108+
.satisfies(
1109+
anyCauseMatches(
1110+
SqlExecutionException.class,
1111+
"SQL Gateway is in read-only mode. Modification operations are not allowed."));
1112+
}
1113+
10511114
// --------------------------------------------------------------------------------------------
10521115

10531116
private OperationHandle submitDefaultOperation(

0 commit comments

Comments
 (0)