Skip to content

Commit

Permalink
fix: Ensure Flight SQL acquires shared lock (#6462)
Browse files Browse the repository at this point in the history
This ensures that the Flight SQL server acquires the shared lock when
executing queries, which is necessary when performing some table
operations against refreshing tables.
  • Loading branch information
devinrsmith authored Dec 11, 2024
1 parent 2da9dd5 commit 12b6c9c
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.util.ColumnHolder;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.TableTools;
import io.deephaven.extensions.barrage.util.ArrowIpcUtil;
import io.deephaven.extensions.barrage.util.BarrageUtil;
Expand All @@ -33,10 +35,11 @@
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.backplane.grpc.ExportNotification;
import io.deephaven.proto.util.ByteHelper;
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.table.ParentsVisitor;
import io.deephaven.qst.table.TableSpec;
import io.deephaven.qst.table.TicketTable;
import io.deephaven.server.auth.AuthorizationProvider;
import io.deephaven.server.console.ScopeTicketResolver;
import io.deephaven.server.session.ActionResolver;
import io.deephaven.server.session.CommandResolver;
import io.deephaven.server.session.SessionState;
Expand Down Expand Up @@ -92,6 +95,7 @@
import java.security.SecureRandom;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -165,10 +169,6 @@ public final class FlightSqlResolver implements ActionResolver, CommandResolver
@VisibleForTesting
static final Schema DATASET_SCHEMA_SENTINEL = new Schema(List.of(Field.nullable("DO_NOT_USE", Utf8.INSTANCE)));

// Unable to depends on TicketRouter, would be a circular dependency atm (since TicketRouter depends on all the
// TicketResolvers).
// private final TicketRouter router;
private final ScopeTicketResolver scopeTicketResolver;
private final Scheduler scheduler;
private final Authorization authorization;
private final KeyedObjectHashMap<ByteString, QueryBase> queries;
Expand All @@ -177,10 +177,8 @@ public final class FlightSqlResolver implements ActionResolver, CommandResolver
@Inject
public FlightSqlResolver(
final AuthorizationProvider authProvider,
final ScopeTicketResolver scopeTicketResolver,
final Scheduler scheduler) {
this.authorization = Objects.requireNonNull(authProvider.getTicketResolverAuthorization());
this.scopeTicketResolver = Objects.requireNonNull(scopeTicketResolver);
this.scheduler = Objects.requireNonNull(scheduler);
this.queries = new KeyedObjectHashMap<>(QUERY_KEY);
this.preparedStatements = new KeyedObjectHashMap<>(PREPARED_STATEMENT_KEY);
Expand Down Expand Up @@ -642,26 +640,82 @@ interface TicketHandlerReleasable extends TicketHandler {
void release();
}

private Table executeSqlQuery(SessionState session, String sql) {
// See SQLTODO(catalog-reader-implementation)
final QueryScope queryScope = ExecutionContext.getContext().getQueryScope();
// noinspection unchecked,rawtypes
final Map<String, Table> queryScopeTables =
(Map<String, Table>) (Map) queryScope.toMap(queryScope::unwrapObject, (n, t) -> t instanceof Table);
final TableSpec tableSpec = Sql.parseSql(sql, queryScopeTables, TicketTable::fromQueryScopeField, null);
// Note: this is doing io.deephaven.server.session.TicketResolver.Authorization.transform, but not
// io.deephaven.auth.ServiceAuthWiring
// TODO(deephaven-core#6307): Declarative server-side table execution logic that preserves authorization logic
private Table executeSqlQuery(String sql) {
final ExecutionContext executionContext = ExecutionContext.getContext();
final QueryScope queryScope = executionContext.getQueryScope();
// We aren't managing the liveness of Tables that come verbatim (authorization un-transformed) from the query
// scope (we are ensuring that any transformed, or operation created, tables don't escape to a higher-layer's
// liveness scope). In the case where they either are already not live, or become not live by the time the
// operation logic is executed, an appropriate exception will be thrown. While this is a liveness race, it isn't
// technically much different than a liveness race possible via ScopeTicketResolver.resolve.
//
// The proper way to do this would be to re-model the table execution logic of GrpcTableOperation (gRPC) into a
// QST form, whereby table dependencies are presented as properly-scoped, liveness-managed Exports for the
// duration of the operation.
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
final Table table = tableSpec.logic()
.create(new TableCreatorScopeTickets(TableCreatorImpl.INSTANCE, scopeTicketResolver, session));
if (table.isRefreshing()) {
table.retainReference();
// See SQLTODO(catalog-reader-implementation)
// Unfortunately, we must do authorization.transform on all the query scope tables up-front; to make this
// on-demand, we need to implement a Calcite Catalog Reader (non-trivial). Technically, parseSql only needs
// to know the definitions of the tables; we could consider a table-specific authorization interface that
// presents the specialization Authorization.transformedDefinition(Table) to make this cheaper in a lot of
// cases.
final Map<String, Table> queryScopeTables =
queryScope.toMap(o -> queryScopeAuthorizedTableMapper(queryScope, o), (n, t) -> t != null);
final TableSpec tableSpec =
Sql.parseSql(sql, queryScopeTables, TableCreatorScopeTickets::ticketTable, null);
final TableCreator<Table> tableCreator =
new TableCreatorScopeTickets(TableCreatorImpl.INSTANCE, queryScopeTables);
// We could consider doing finer-grained sharedLock in the future; right now, taking it for the whole
// operation if any of the TicketTable sources are refreshing.
final List<Table> refreshingTables = new ArrayList<>();
for (final TableSpec node : ParentsVisitor.reachable(List.of(tableSpec))) {
// Of the source tables, SQL can produce a NewTable or a TicketTable (until we introduce custom
// functions, where we could conceivable have it produce EmptyTable, TimeTable, etc).
if (!(node instanceof TicketTable)) {
continue;
}
final Table sourceTable = tableCreator.of((TicketTable) node);
if (sourceTable.isRefreshing()) {
refreshingTables.add(sourceTable);
}
}
final UpdateGraph updateGraph = refreshingTables.isEmpty()
? null
: NotificationQueue.Dependency.getUpdateGraph(null, refreshingTables.toArray(new Table[0]));
// Note: Authorization.transform has already been performed, but we are _not_ doing
// io.deephaven.auth.ServiceAuthWiring checks.
// TODO(deephaven-core#6307): Declarative server-side table execution logic that preserves authorization
// logic
try (
final SafeCloseable ignored0 =
updateGraph == null ? null : executionContext.withUpdateGraph(updateGraph).open();
final SafeCloseable ignored1 =
updateGraph == null ? null : updateGraph.sharedLock().lockCloseable()) {
final Table table = tableSpec.logic().create(tableCreator);
if (table.isRefreshing()) {
table.retainReference();
}
return table;
}
return table;
}
}

private Table queryScopeTableMapper(QueryScope queryScope, Object object) {
if (object == null) {
return null;
}
object = queryScope.unwrapObject(object);
if (!(object instanceof Table)) {
return null;
}
return (Table) object;
}

private Table queryScopeAuthorizedTableMapper(QueryScope queryScope, Object object) {
final Table table = queryScopeTableMapper(queryScope, object);
return table == null ? null : authorization.transform(table);
}

/**
* This is the base class for "easy" commands; that is, commands that have a fixed schema and are cheap to
* initialize.
Expand Down Expand Up @@ -841,7 +895,7 @@ private synchronized QueryBase<C> executeImpl(C command) {

protected void executeSql(String sql) {
try {
table = executeSqlQuery(session, sql);
table = executeSqlQuery(sql);
} catch (SqlParseException e) {
throw error(Code.INVALID_ARGUMENT, "query can't be parsed", e);
} catch (UnsupportedSqlOperation e) {
Expand Down Expand Up @@ -1319,8 +1373,11 @@ private Table getTablesEmpty(boolean includeSchema, Map<String, Object> attribut
private Table getTables(boolean includeSchema, QueryScope queryScope, Map<String, Object> attributes,
Predicate<String> tableNameFilter) {
Objects.requireNonNull(attributes);
final Map<String, Table> queryScopeTables =
(Map<String, Table>) (Map) queryScope.toMap(queryScope::unwrapObject, (n, t) -> t instanceof Table);
// Note: _not_ using queryScopeAuthorizedTable mapper; we can have a more efficient implementation when
// !includeSchema that only needs to check authorization.isDeniedAccess.
final Map<String, Table> queryScopeTables = queryScope.toMap(
o -> queryScopeTableMapper(queryScope, o),
(tableName, table) -> table != null && tableNameFilter.test(tableName));
final int size = queryScopeTables.size();
final String[] catalogNames = new String[size];
final String[] dbSchemaNames = new String[size];
Expand All @@ -1330,9 +1387,6 @@ private Table getTables(boolean includeSchema, QueryScope queryScope, Map<String
int count = 0;
for (Entry<String, Table> e : queryScopeTables.entrySet()) {
final String tableName = e.getKey();
if (!tableNameFilter.test(tableName)) {
continue;
}
final Schema schema;
if (includeSchema) {
final Table table = authorization.transform(e.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,35 @@
//
package io.deephaven.server.flightsql;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.table.Table;
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.TableCreatorDelegate;
import io.deephaven.qst.table.TicketTable;
import io.deephaven.server.console.ScopeTicketResolver;
import io.deephaven.server.session.SessionState;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;

final class TableCreatorScopeTickets extends TableCreatorDelegate<Table> {

private final ScopeTicketResolver scopeTicketResolver;
private final SessionState session;
static TicketTable ticketTable(String variableName) {
return TicketTable.fromQueryScopeField(variableName);
}

private final Map<String, Table> map;

TableCreatorScopeTickets(TableCreator<Table> delegate, ScopeTicketResolver scopeTicketResolver,
SessionState session) {
TableCreatorScopeTickets(TableCreator<Table> delegate, Map<String, Table> map) {
super(delegate);
this.scopeTicketResolver = Objects.requireNonNull(scopeTicketResolver);
this.session = session;
this.map = Objects.requireNonNull(map);
}

@Override
public Table of(TicketTable ticketTable) {
return scopeTicketResolver.<Table>resolve(session, ByteBuffer.wrap(ticketTable.ticket()),
TableCreatorScopeTickets.class.getSimpleName()).get();
final byte[] ticket = ticketTable.ticket();
Assert.gt(ticket.length, "ticket.length", 2);
Assert.eq(ticket[0], "ticket[0]", (byte) 's');
Assert.eq(ticket[1], "ticket[1]", (byte) '/');
final String variableName = new String(ticket, 2, ticket.length - 2);
return Objects.requireNonNull(map.get(variableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import dagger.Component;
import dagger.Module;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
Expand Down Expand Up @@ -86,6 +87,8 @@
import javax.inject.Singleton;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -893,6 +896,55 @@ public void unknownAction() {
actionNoResolver(() -> doAction(action), type);
}

@Test
public void refreshingTableTest() throws Exception {
// Set a start time so we can test we get out the expected number of rows
final Instant startTime = Instant.now().minus(Duration.ofHours(1));
final Table tt1 = TableTools.timeTableBuilder()
.startTime(startTime)
.period(Duration.ofSeconds(1))
.build()
.view("Timestamp1=Timestamp", "Id=ii % 11")
.lastBy("Id");
final Table tt2 = TableTools.timeTableBuilder()
.startTime(startTime)
.period(Duration.ofSeconds(5))
.build()
.view("Timestamp2=Timestamp", "Id=ii % 11")
.lastBy("Id");
final QueryScope queryScope = ExecutionContext.getContext().getQueryScope();
queryScope.putParam("my_table_1", tt1);
queryScope.putParam("my_table_2", tt2);
try {
final String query = "SELECT\n" +
" my_table_1.Id,\n" +
" my_table_1.Timestamp1,\n" +
" my_table_2.Timestamp2\n" +
"FROM\n" +
" my_table_1\n" +
" INNER JOIN my_table_2 ON my_table_1.Id = my_table_2.Id";
{
final FlightInfo info = flightSqlClient.execute(query);
consume(info, 1, 11, false);
}
{
final PreparedStatement prepared = flightSqlClient.prepare(query);
{
final FlightInfo info = prepared.execute();
consume(info, 1, 11, false);
}
{
final FlightInfo info = prepared.execute();
consume(info, 1, 11, false);
}
}
} finally {
queryScope.putParam("my_table_2", null);
queryScope.putParam("my_table_1", null);
}

}

private Result doAction(Action action) {
final Iterator<Result> it = flightClient.doAction(action);
if (!it.hasNext()) {
Expand Down

0 comments on commit 12b6c9c

Please sign in to comment.