Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/content/docs/dev/table/sql-gateway/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ $ ./sql-gateway -Dkey=value
<td>Duration</td>
<td>Keepalive time for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval.</td>
</tr>
<tr>
<td><h5>sql-gateway.read-only</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>When enabled, the SQL Gateway operates in read-only mode and will reject all modify operations.</td>
</tr>
<tr>
<td><h5>sql-gateway.worker.threads.max</h5></td>
<td style="word-wrap: break-word;">500</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,11 @@ public class SqlGatewayServiceConfigOptions {
.withDescription(
"Keepalive time for an idle worker thread. When the number of workers exceeds min workers, "
+ "excessive threads are killed after this time interval.");

public static final ConfigOption<Boolean> SQL_GATEWAY_READ_ONLY_MODE =
key("sql-gateway.read-only")
.booleanType()
.defaultValue(false)
.withDescription(
"When enabled, the SQL Gateway operates in read-only mode and will reject all modify operations.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.TableInfo;
Expand Down Expand Up @@ -520,13 +521,13 @@ private ResultFetcher executeOperation(
"No Statement Set to submit. 'END' statement should be used after 'BEGIN STATEMENT SET'.");
} else if (op instanceof ModifyOperation) {
return callModifyOperations(
tableEnv, handle, Collections.singletonList((ModifyOperation) op));
tableEnv, handle, Collections.singletonList((ModifyOperation) op), statement);
} else if (op instanceof CompileAndExecutePlanOperation
|| op instanceof ExecutePlanOperation) {
return callExecuteOperation(tableEnv, handle, op);
} else if (op instanceof StatementSetOperation) {
return callModifyOperations(
tableEnv, handle, ((StatementSetOperation) op).getOperations());
tableEnv, handle, ((StatementSetOperation) op).getOperations(), statement);
} else if (op instanceof QueryOperation) {
TableResultInternal result =
cachedPlan != null
Expand Down Expand Up @@ -673,14 +674,30 @@ private ResultFetcher callEndStatementSetOperation(
// there's no statement in the statement set, skip submitting
return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
} else {
return callModifyOperations(tableEnv, handle, statementSetOperations);
return callModifyOperations(tableEnv, handle, statementSetOperations, null);
}
}

private ResultFetcher callModifyOperations(
TableEnvironmentInternal tableEnv,
OperationHandle handle,
List<ModifyOperation> modifyOperations) {
List<ModifyOperation> modifyOperations,
String originalSql) {
// Check if SQL Gateway is in read-only mode
Configuration configuration = sessionContext.getSessionConf().clone();
configuration.addAll(executionConfig);
boolean isReadOnlyMode =
configuration.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_READ_ONLY_MODE);

if (isReadOnlyMode) {
String errorMessage =
(originalSql != null && !originalSql.trim().isEmpty())
? String.format(
"The following statement is not allowed: %s", originalSql)
: "Modification operations are not allowed.";
throw new SqlExecutionException("SQL Gateway is in read-only mode. " + errorMessage);
}

TableResultInternal result = tableEnv.executeInternal(modifyOperations);
// DeleteFromFilterOperation doesn't have a JobClient
if (modifyOperations.size() == 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
import static org.apache.flink.table.functions.FunctionKind.OTHER;
import static org.apache.flink.table.functions.FunctionKind.SCALAR;
import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_READ_ONLY_MODE;
import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD;
import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination;
import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.createInitializedSession;
Expand Down Expand Up @@ -1048,6 +1049,68 @@ void testGetOperationSchemaWhenOperationGetError() throws Exception {
.satisfies(anyCauseMatches(SqlGatewayException.class, msg)));
}

@Test
void testReadOnlyModeWithModificationOperations() {
Configuration config = new Configuration(MINI_CLUSTER.getClientConfiguration());
config.set(SQL_GATEWAY_READ_ONLY_MODE, true);

SessionEnvironment sessionEnvironment =
SessionEnvironment.newBuilder()
.setSessionEndpointVersion(MockedEndpointVersion.V1)
.build();

SessionHandle sessionHandle = service.openSession(sessionEnvironment);
String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";

service.executeStatement(sessionHandle, sourceDdl, -1, config);
service.executeStatement(sessionHandle, sinkDdl, -1, config);

OperationHandle insertOperationHandle =
service.executeStatement(
sessionHandle, "INSERT INTO sink SELECT * FROM source;", -1, config);

assertThatThrownBy(() -> fetchAllResults(service, sessionHandle, insertOperationHandle))
.satisfies(
anyCauseMatches(
SqlExecutionException.class,
"SQL Gateway is in read-only mode. The following statement is not allowed: INSERT INTO sink SELECT * FROM source"));
}

@Test
void testReadOnlyModeWithStatementSet() {
Configuration config = new Configuration(MINI_CLUSTER.getClientConfiguration());
config.set(SQL_GATEWAY_READ_ONLY_MODE, true);

SessionEnvironment sessionEnvironment =
SessionEnvironment.newBuilder()
.setSessionEndpointVersion(MockedEndpointVersion.V1)
.build();

SessionHandle sessionHandle = service.openSession(sessionEnvironment);
String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
String sinkDdl1 = "CREATE TABLE sink1 (a STRING) WITH ('connector'='blackhole');";
String sinkDdl2 = "CREATE TABLE sink2 (a STRING) WITH ('connector'='blackhole');";

service.executeStatement(sessionHandle, sourceDdl, -1, config);
service.executeStatement(sessionHandle, sinkDdl1, -1, config);
service.executeStatement(sessionHandle, sinkDdl2, -1, config);

service.executeStatement(sessionHandle, "BEGIN STATEMENT SET", -1, config);
service.executeStatement(
sessionHandle, "INSERT INTO sink1 SELECT * FROM source", -1, config);
service.executeStatement(
sessionHandle, "INSERT INTO sink2 SELECT * FROM source", -1, config);

OperationHandle endOpHandle = service.executeStatement(sessionHandle, "END", -1, config);

assertThatThrownBy(() -> fetchAllResults(service, sessionHandle, endOpHandle))
.satisfies(
anyCauseMatches(
SqlExecutionException.class,
"SQL Gateway is in read-only mode. Modification operations are not allowed."));
}

// --------------------------------------------------------------------------------------------

private OperationHandle submitDefaultOperation(
Expand Down