diff --git a/query/src/main/java/tech/ydb/query/QuerySession.java b/query/src/main/java/tech/ydb/query/QuerySession.java index 9bbee82a..d492e33a 100644 --- a/query/src/main/java/tech/ydb/query/QuerySession.java +++ b/query/src/main/java/tech/ydb/query/QuerySession.java @@ -6,8 +6,16 @@ import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.operation.Operation; +import tech.ydb.core.operation.OperationTray; +import tech.ydb.proto.query.YdbQuery; +import tech.ydb.proto.scripting.ScriptingProtos; +import tech.ydb.query.result.OperationResult; import tech.ydb.query.settings.BeginTransactionSettings; import tech.ydb.query.settings.ExecuteQuerySettings; +import tech.ydb.query.settings.ExecuteScriptSettings; +import tech.ydb.query.settings.FetchScriptSettings; import tech.ydb.table.query.Params; /** @@ -68,6 +76,53 @@ public interface QuerySession extends AutoCloseable { */ QueryStream createQuery(String query, TxMode tx, Params params, ExecuteQuerySettings settings); + /** + * Executes a YQL script via the scripting service and returns its result as a completed future. + * + *

This method sends a YQL script for execution and collects the full result set in a single response. + * It uses {@link ScriptingProtos.ExecuteYqlRequest} under the hood and returns + * an {@link OperationResult} wrapped in {@link Result} to provide status and issues details.

+ * + * @param query the YQL script text to execute + * @param params input parameters for the script + * @param settings execution settings such as statistics collection or tracing + * @return a future that resolves to a {@link Result} containing {@link ScriptingProtos.ExecuteYqlResult} + */ + CompletableFuture> executeScriptYql(String query, + Params params, + ExecuteScriptSettings settings); + + + /** + * Submits a YQL script for asynchronous execution and returns a handle to the operation. + * Take a not that join return future will not guarantee that script is finished. It's guarantee that script is passed to ydb + * + *

This method executes the given script asynchronously and immediately returns + * a {@link CompletableFuture} for an {@link Operation}, which can be later monitored or fetched + * via {@link #waitForScript(CompletableFuture)} or {@link #fetchScriptResults(String, Params, FetchScriptSettings)}.

+ * + * @param query the YQL script text to execute + * @param params input parameters to pass to the script + * @param settings script execution options such as TTL, statistics mode, or resource pool + * @return a future resolving to an {@link Operation} representing the submitted script execution + */ + CompletableFuture> executeScript(String query, Params params, ExecuteScriptSettings settings); + + /** + * Fetches partial or complete results from a previously executed YQL script. + * + *

This method retrieves result sets produced by an asynchronous script execution. + * It supports incremental fetching using tokens, row limits, and result set index selection.

+ * + * @param query optional query text for context (not used by the server but may help debugging) + * @param params parameters used during script execution (typically empty) + * @param settings settings that define which operation to fetch results from, including fetch token, row limit, and index + * @return a future resolving to a {@link Result} containing {@link YdbQuery.FetchScriptResultsResponse} + */ + CompletableFuture> fetchScriptResults(String query, + Params params, + FetchScriptSettings settings); + @Override void close(); @@ -106,4 +161,46 @@ default QueryStream createQuery(String query, TxMode tx) { default CompletableFuture> beginTransaction(TxMode txMode) { return beginTransaction(txMode, BeginTransactionSettings.newBuilder().build()); } + + /** + * Executes a YQL script via the scripting service and returns its result as a completed future. + * + *

This method sends a YQL script for execution and collects the full result set in a single response. + * It uses {@link ScriptingProtos.ExecuteYqlRequest} under the hood and returns + * an {@link OperationResult} wrapped in {@link Result} to provide status and issues details.

+ * + * @param query the YQL script text to execute + * @return a future that resolves to a {@link Result} containing {@link ScriptingProtos.ExecuteYqlResult} + */ + default CompletableFuture> executeScriptYql(String query) { + return executeScriptYql(query, Params.empty(), ExecuteScriptSettings.newBuilder().build()); + } + + /** + * Submits a YQL script for asynchronous execution and returns a handle to the operation. + * Take a not that join return future will not guarantee that script is finished. It's guarantee that script is passed to ydb + * + *

This method executes the given script asynchronously and immediately returns + * a {@link CompletableFuture} for an {@link Operation}, which can be later monitored or fetched + * via {@link #waitForScript(CompletableFuture)} or {@link #fetchScriptResults(String, Params, FetchScriptSettings)}.

+ * + * @param query the YQL script text to execute + * @return a future resolving to an {@link Operation} representing the submitted script execution + */ + default CompletableFuture> executeScript(String query) { + return executeScript(query, Params.empty(), ExecuteScriptSettings.newBuilder().build()); + } + + /** + * Waits for a previously submitted script operation to complete. + * + *

This method polls or fetches the state of the running operation via {@link OperationTray#fetchOperation} + * until the operation completes successfully or fails. It is typically used after calling + * {@link #executeScript(String, Params, ExecuteScriptSettings)}.

+ * + * @param scriptFuture a {@link CompletableFuture} returned by {@link #executeScript(String, Params, ExecuteScriptSettings)} + * @return a future resolving to the final {@link Status} of the script execution + */ + CompletableFuture waitForScript(CompletableFuture> scriptFuture); + } diff --git a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java index e1ef7fc8..ee1318ef 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java @@ -5,15 +5,25 @@ import io.grpc.Context; import tech.ydb.core.Result; +import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.operation.Operation; +import tech.ydb.core.operation.OperationBinder; import tech.ydb.core.operation.StatusExtractor; -import tech.ydb.proto.OperationProtos; import tech.ydb.proto.query.YdbQuery; import tech.ydb.proto.query.v1.QueryServiceGrpc; +import tech.ydb.proto.scripting.ScriptingProtos; +import tech.ydb.proto.scripting.v1.ScriptingServiceGrpc; /** + * Low-level RPC client for YDB Query and Scripting services. + *

+ * Provides direct gRPC bindings for session management, query execution, + * transaction control, and script execution APIs. + *

+ * Used internally by higher-level query session and client abstractions. * * @author Aleksandr Gorshenin */ @@ -106,11 +116,52 @@ public GrpcReadStream executeQuery( return transport.readStreamCall(QueryServiceGrpc.getExecuteQueryMethod(), settings, request); } - public CompletableFuture> executeScript( + /** + * Executes a YQL script via the scripting service. + * + * @param request the {@link ScriptingProtos.ExecuteYqlRequest} containing the script definition + * @param settings gRPC request settings + * @return a future resolving to an {@link Operation} with {@link ScriptingProtos.ExecuteYqlResult} + */ + public CompletableFuture>> executeScriptYql( + ScriptingProtos.ExecuteYqlRequest request, GrpcRequestSettings settings) { + + return transport.unaryCall(ScriptingServiceGrpc.getExecuteYqlMethod(), settings, request) + .thenApply(OperationBinder.bindAsync( + transport, + ScriptingProtos.ExecuteYqlResponse::getOperation, + ScriptingProtos.ExecuteYqlResult.class) + ); + } + + /** + * Executes a YQL script using the Query service API. + * + * + * @param request the {@link YdbQuery.ExecuteScriptRequest} containing the script + * @param settings gRPC request settings + * @return a future resolving to an {@link Operation} representing the script execution + */ + public CompletableFuture> executeScript( YdbQuery.ExecuteScriptRequest request, GrpcRequestSettings settings) { - return transport.unaryCall(QueryServiceGrpc.getExecuteScriptMethod(), settings, request); + + return transport.unaryCall(QueryServiceGrpc.getExecuteScriptMethod(), settings, request) + .thenApply( + OperationBinder.bindAsync(transport, + op -> op + )); } + /** + * Fetches the results of a previously executed script. + * + *

This method retrieves the next portion of script execution results, + * supporting pagination and partial fetch using tokens.

+ * + * @param request the {@link YdbQuery.FetchScriptResultsRequest} specifying the fetch parameters + * @param settings gRPC request settings + * @return a future resolving to {@link Result} containing {@link YdbQuery.FetchScriptResultsResponse} + */ public CompletableFuture> fetchScriptResults( YdbQuery.FetchScriptResultsRequest request, GrpcRequestSettings settings) { return transport diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index c146018b..14862ff2 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -9,6 +9,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.base.Strings; +import com.google.protobuf.Duration; import com.google.protobuf.TextFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,14 +23,19 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.operation.Operation; +import tech.ydb.core.operation.OperationTray; import tech.ydb.core.operation.StatusExtractor; import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.utils.URITools; import tech.ydb.core.utils.UpdatableOptional; import tech.ydb.proto.query.YdbQuery; +import tech.ydb.proto.scripting.ScriptingProtos; +import tech.ydb.proto.table.YdbTable; import tech.ydb.query.QuerySession; import tech.ydb.query.QueryStream; import tech.ydb.query.QueryTransaction; +import tech.ydb.query.result.OperationResult; import tech.ydb.query.result.QueryInfo; import tech.ydb.query.result.QueryStats; import tech.ydb.query.settings.AttachSessionSettings; @@ -37,6 +44,8 @@ import tech.ydb.query.settings.CreateSessionSettings; import tech.ydb.query.settings.DeleteSessionSettings; import tech.ydb.query.settings.ExecuteQuerySettings; +import tech.ydb.query.settings.ExecuteScriptSettings; +import tech.ydb.query.settings.FetchScriptSettings; import tech.ydb.query.settings.QueryExecMode; import tech.ydb.query.settings.QueryStatsMode; import tech.ydb.query.settings.RollbackTransactionSettings; @@ -182,6 +191,19 @@ private static YdbQuery.ExecMode mapExecMode(QueryExecMode mode) { } } + private static YdbTable.QueryStatsCollection.Mode mapStatsCollectionMode(QueryStatsMode mode) { + switch (mode) { + case NONE: return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_NONE; + case BASIC: return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_BASIC; + case FULL: return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_FULL; + case PROFILE: return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_PROFILE; + + case UNSPECIFIED: + default: + return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_UNSPECIFIED; + } + } + private static YdbQuery.StatsMode mapStatsMode(QueryStatsMode mode) { switch (mode) { case NONE: return YdbQuery.StatsMode.STATS_MODE_NONE; @@ -244,6 +266,79 @@ void handleTxMeta(String txID) { }; } + @Override + public CompletableFuture> executeScriptYql( + String query, + Params params, + ExecuteScriptSettings settings) { + ScriptingProtos.ExecuteYqlRequest.Builder requestBuilder = ScriptingProtos.ExecuteYqlRequest.newBuilder() + .setScript(query) + .setCollectStats(mapStatsCollectionMode(settings.getStatsMode())); + + requestBuilder.putAllParameters(params.toPb()); + + GrpcRequestSettings.Builder options = makeOptions(settings); + + return rpc.executeScriptYql(requestBuilder.build(), options.build()).thenApply(OperationResult::new); + } + + @Override + public CompletableFuture> executeScript(String query, + Params params, + ExecuteScriptSettings settings) { + YdbQuery.ExecuteScriptRequest.Builder request = YdbQuery.ExecuteScriptRequest.newBuilder() + .setExecMode(mapExecMode(settings.getExecMode())) + .setStatsMode(mapStatsMode(settings.getStatsMode())) + .setScriptContent(YdbQuery.QueryContent.newBuilder() + .setSyntax(YdbQuery.Syntax.SYNTAX_YQL_V1) + .setText(query) + .build()); + + java.time.Duration ttl = settings.getTtl(); + if (ttl != null) { + request.setResultsTtl(Duration.newBuilder().setNanos(settings.getTtl().getNano())); + } + + String resourcePool = settings.getResourcePool(); + if (resourcePool != null && !resourcePool.isEmpty()) { + request.setPoolId(resourcePool); + } + + request.putAllParameters(params.toPb()); + + GrpcRequestSettings.Builder options = makeOptions(settings); + + return rpc.executeScript(request.build(), options.build()); + } + + @Override + public CompletableFuture waitForScript(CompletableFuture> scriptFuture) { + return scriptFuture.thenCompose(operation -> OperationTray.fetchOperation(operation, 1)); + } + + @Override + public CompletableFuture> + fetchScriptResults(String query, Params params, FetchScriptSettings settings) { + YdbQuery.FetchScriptResultsRequest.Builder requestBuilder = YdbQuery.FetchScriptResultsRequest.newBuilder(); + + if (!Strings.isNullOrEmpty(settings.getFetchToken())) { + requestBuilder.setFetchToken(settings.getFetchToken()); + } + + if (settings.getRowsLimit() > 0) { + requestBuilder.setRowsLimit(settings.getRowsLimit()); + } + + requestBuilder.setOperationId(settings.getOperationId()); + + if (settings.getSetResultSetIndex() >= 0) { + requestBuilder.setResultSetIndex(settings.getSetResultSetIndex()); + } + + GrpcRequestSettings.Builder options = makeOptions(settings); + return rpc.fetchScriptResults(requestBuilder.build(), options.build()); + } + public CompletableFuture> delete(DeleteSessionSettings settings) { YdbQuery.DeleteSessionRequest request = YdbQuery.DeleteSessionRequest.newBuilder() .setSessionId(sessionId) diff --git a/query/src/main/java/tech/ydb/query/result/OperationResult.java b/query/src/main/java/tech/ydb/query/result/OperationResult.java new file mode 100644 index 00000000..2a07c33d --- /dev/null +++ b/query/src/main/java/tech/ydb/query/result/OperationResult.java @@ -0,0 +1,84 @@ +package tech.ydb.query.result; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import javax.annotation.Nonnull; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.UnexpectedResultException; +import tech.ydb.core.operation.Operation; +import tech.ydb.query.QuerySession; +import tech.ydb.query.settings.ExecuteScriptSettings; +import tech.ydb.table.query.Params; + +/** + * Represents the result of an executed YQL script operation. + *

+ * This class wraps a {@link Operation} that contains a {@link Result} object + * and provides convenient access to both the operation metadata and the + * actual execution result. + *

+ * + *

Typically used as the return type for + * {@link QuerySession#executeScriptYql(String, Params, ExecuteScriptSettings)} + * and similar asynchronous script execution APIs.

+ * + * @param the type of value contained in the result + * + *

Author: Evgeny Kuvardin + */ +public class OperationResult implements Result { + + private final Operation> operation; + private final Result result; + + public OperationResult(Operation> resultOperation) { + this.operation = resultOperation; + this.result = operation.getValue(); + } + + /** + * Returns the underlying {@link Operation} associated with this result. + *

+ * The operation object contains metadata such as operation ID, execution status, + * and timing information. + *

+ * + * @return the wrapped {@link Operation} object + */ + public Operation> getOperation() { + return operation; + } + + @Nonnull + @Override + public Status getStatus() { + return result.getStatus(); + } + + @Nonnull + @Override + public T getValue() throws UnexpectedResultException { + return result.getValue(); + } + + @Nonnull + @Override + public Result map(@Nonnull Function mapper) { + return result.map(mapper); + } + + @Nonnull + @Override + public CompletableFuture> mapResultFuture(@Nonnull Function>> mapper) { + return result.mapResultFuture(mapper); + } + + @Nonnull + @Override + public CompletableFuture mapStatusFuture(@Nonnull Function> mapper) { + return result.mapStatusFuture(mapper); + } +} diff --git a/query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java b/query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java new file mode 100644 index 00000000..214dc994 --- /dev/null +++ b/query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java @@ -0,0 +1,153 @@ +package tech.ydb.query.settings; + +import java.time.Duration; + +import tech.ydb.core.settings.BaseRequestSettings; + +/** + * Settings for configuring script execution requests. + *

+ * Used by {@code QuerySession.executeScript(...)} and similar APIs. + * + *

Author: Evgeny Kuvardin + */ +public class ExecuteScriptSettings extends BaseRequestSettings { + private final QueryExecMode execMode; + private final QueryStatsMode statsMode; + private final String resourcePool; + private final Duration ttl; + + private ExecuteScriptSettings(Builder builder) { + super(builder); + this.execMode = builder.execMode; + this.statsMode = builder.statsMode; + this.ttl = builder.ttl; + this.resourcePool = builder.resourcePool; + } + + /** + * Returns the execution mode for the script. + * + *

Defines how the script should be processed, e.g. executed, explained, validated, or parsed.

+ * + * @return the {@link QueryExecMode} used for execution + */ + public QueryExecMode getExecMode() { + return this.execMode; + } + + /** + * Returns the time-to-live (TTL) duration for the script results. + * + *

Specifies how long results of the executed script will be kept available + * before automatic cleanup on the server.

+ * + * @return the TTL value, or {@code null} if not set + */ + public Duration getTtl() { + return ttl; + } + + /** + * Returns the statistics collection mode for script execution. + * + *

Determines how detailed execution statistics should be gathered + * (none, basic, full, or profiling level).

+ * + * @return the {@link QueryStatsMode} used for statistics collection + */ + public QueryStatsMode getStatsMode() { + return this.statsMode; + } + + /** + * Returns the name of the resource pool assigned to the script execution. + * + *

Resource pools define isolated resource groups for workload management. + * If not specified, the default pool is used.

+ * + * @return the resource pool name, or {@code null} if not set + */ + public String getResourcePool() { + return this.resourcePool; + } + + + /** + * Creates a new {@link Builder} instance for constructing {@link ExecuteScriptSettings}. + * + * @return a new builder + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for creating immutable {@link ExecuteScriptSettings} instances. + *

+ * Provides fluent configuration for script execution settings + */ + public static class Builder extends BaseBuilder { + private QueryExecMode execMode = QueryExecMode.EXECUTE; + private QueryStatsMode statsMode = QueryStatsMode.NONE; + private String resourcePool = null; + private Duration ttl = null; + + /** + * Sets the execution mode for the script. + * + * @param mode the desired execution mode + * @return this builder instance for chaining + * @see QueryExecMode + */ + public Builder withExecMode(QueryExecMode mode) { + this.execMode = mode; + return this; + } + + /** + * Sets the statistics collection mode for the script execution. + * + * @param mode the desired statistics mode + * @return this builder instance for chaining + * @see QueryStatsMode + */ + public Builder withStatsMode(QueryStatsMode mode) { + this.statsMode = mode; + return this; + } + + /** + * Sets the time-to-live (TTL) duration for script results. + * + *

After this duration expires, stored script results may be deleted + * from the server automatically.

+ * + * @param value the TTL duration + * @return this builder instance for chaining + */ + public Builder withTtl(Duration value) { + this.ttl = value; + return this; + } + + /** + * Specifies the resource pool to use for query execution. + *

+ * If no pool is specified, or the ID is empty, or equal to {@code "default"}, + * the unremovable resource pool "default" will be used. + * + * @param poolId resource pool identifier + * @return this builder instance for chaining + */ + public Builder withResourcePool(String poolId) { + this.resourcePool = poolId; + return this; + } + + @Override + public ExecuteScriptSettings build() { + return new ExecuteScriptSettings(this); + } + } +} diff --git a/query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java b/query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java new file mode 100644 index 00000000..cd33de38 --- /dev/null +++ b/query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java @@ -0,0 +1,112 @@ +package tech.ydb.query.settings; + +import tech.ydb.core.settings.BaseRequestSettings; + +/** + * Settings for configuring the fetch phase of a previously executed YQL script. + *

+ * These settings define which operation results to fetch, pagination options, + * row limits, and which result set index to retrieve. + * Used with {@code QuerySession.fetchScriptResults(...)} and similar APIs. + * + *

Author: Evgeny Kuvardin + */ +public class FetchScriptSettings extends BaseRequestSettings { + private final String operationId; + private final String fetchToken; + private final int rowsLimit; + private final int setResultSetIndex; + + private FetchScriptSettings(Builder builder) { + super(builder); + this.operationId = builder.operationId; + this.fetchToken = builder.fetchToken; + this.rowsLimit = builder.rowsLimit; + this.setResultSetIndex = builder.setResultSetIndex; + } + + /** + * Returns the identifier of the operation whose results should be fetched. + * + *

This ID corresponds to the operation returned by + * {@code QuerySession.executeScript(...)} or a similar asynchronous call.

+ * + * @return the operation ID string + */ + public String getOperationId() { + return operationId; + } + + /** + * Returns the fetch token used to continue fetching paginated results. + * + *

When a previous fetch request indicates more data is available, + * this token can be used to retrieve the next portion of results.

+ * + * @return the fetch token, or an empty string if not set + */ + public String getFetchToken() { + return fetchToken; + } + + /** + * Returns the maximum number of rows to retrieve in this fetch request. + * + *

If not set , the server will use its default limit.

+ * + * @return the maximum number of rows to fetch + */ + public int getRowsLimit() { + return rowsLimit; + } + + /** + * Returns the index of the result set to fetch from the executed script. + * + *

When the executed script produces multiple result sets, + * this value specifies which one to retrieve (starting from 0).

+ * + * @return the result set index + */ + public int getSetResultSetIndex() { + return setResultSetIndex; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder extends BaseBuilder { + + private int rowsLimit = 0; + private int setResultSetIndex = 0; + private String operationId = ""; + private String fetchToken = ""; + + @Override + public FetchScriptSettings build() { + return new FetchScriptSettings(this); + } + + public Builder withEOperationId(String operationId) { + this.operationId = operationId; + return this; + } + + public Builder withFetchToken(String fetchToken) { + this.fetchToken = fetchToken; + return this; + } + + public Builder withRowsLimit(int rowsLimit) { + this.rowsLimit = rowsLimit; + return this; + } + + public Builder withSetResultSetIndex(int setResultSetIndex) { + this.setResultSetIndex = setResultSetIndex; + return this; + } + + } +} diff --git a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java index cbecca71..34ec9d4b 100644 --- a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java +++ b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java @@ -10,6 +10,7 @@ import java.util.function.Function; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.ParametersAreNonnullByDefault; import com.google.common.base.Preconditions; @@ -19,6 +20,7 @@ import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; +import tech.ydb.core.operation.Operation; import tech.ydb.core.utils.FutureTools; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; @@ -70,6 +72,13 @@ public CompletableFuture supplyStatus(Function> supplyOperation(Function>> fn) { + RetryableOperationTask task = new RetryableOperationTask(fn); + task.requestSession(); + return task.getFuture(); + } + private boolean canRetry(StatusCode code) { return code.isRetryable(idempotent) || (retryNotFound && code == StatusCode.NOT_FOUND); } @@ -285,6 +294,82 @@ Status toFailedResult(Result sessionResult) { } } + /** + * RETRYABLE OPERATION TASK + */ + private final class RetryableOperationTask extends BaseRetryableTask> { + RetryableOperationTask(Function>> fn) { + super(fn); + } + + @Override + StatusCode toStatusCode(Operation result) { + if (result.getValue() == null) { + return StatusCode.SUCCESS; + } + return result.getValue().getCode(); + } + + @Override + Operation toFailedResult(Result sessionResult) { + return new FailedOperationTask<>(null, sessionResult.getStatus()); + } + + /** + * Failed Operation task which will be return in case of fail + * @param - type of the operation result + */ + class FailedOperationTask implements Operation { + + final Status status; + final R value; + + FailedOperationTask(@Nullable R value, Status status) { + this.status = status; + this.value = value; + } + + @Nullable + @Override + public String getId() { + return ""; + } + + @Override + public boolean isReady() { + return false; + } + + @Nullable + @Override + public R getValue() { + return value; + } + + @Override + public CompletableFuture cancel() { + return CompletableFuture.completedFuture(status); + } + + @Override + public CompletableFuture forget() { + return CompletableFuture.completedFuture(status); + } + + @Override + public CompletableFuture> fetch() { + return CompletableFuture.completedFuture(Result.fail(status)); + } + + @Override + public Operation transform(Function mapper) { + return new FailedOperationTask<>(mapper.apply(value), status); + } + + + } + } + /** * BUILDER */ diff --git a/query/src/test/java/tech/ydb/query/ScriptExampleTest.java b/query/src/test/java/tech/ydb/query/ScriptExampleTest.java new file mode 100644 index 00000000..db4d6882 --- /dev/null +++ b/query/src/test/java/tech/ydb/query/ScriptExampleTest.java @@ -0,0 +1,390 @@ +package tech.ydb.query; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.operation.Operation; +import tech.ydb.core.operation.OperationTray; +import tech.ydb.proto.ValueProtos; +import tech.ydb.proto.query.YdbQuery; +import tech.ydb.proto.scripting.ScriptingProtos; +import tech.ydb.query.settings.ExecuteScriptSettings; +import tech.ydb.query.settings.FetchScriptSettings; +import tech.ydb.query.settings.QueryExecMode; +import tech.ydb.query.tools.QueryReader; +import tech.ydb.query.tools.SessionRetryContext; +import tech.ydb.table.query.Params; +import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.result.impl.ProtoValueReaders; +import tech.ydb.table.values.ListType; +import tech.ydb.table.values.ListValue; +import tech.ydb.table.values.PrimitiveType; +import tech.ydb.table.values.PrimitiveValue; +import tech.ydb.table.values.StructType; +import tech.ydb.test.junit4.GrpcTransportRule; + + +/** + * Integration tests that validate the execution of YQL scripts + * using the YDB Query API and scripting features. + * + *

Tests cover: + *

    + *
  • Script execution with and without parameters
  • + *
  • Error handling in scripts
  • + *
  • Sequential script execution
  • + *
  • Fetching results from executed scripts
  • + *
+ * + *

Author: Evgeny Kuvardin + */ +public class ScriptExampleTest { + + @ClassRule + public final static GrpcTransportRule ydbRule = new GrpcTransportRule(); + + private static QueryClient client; + private static SessionRetryContext retryCtx; + + // Create type for struct of series + StructType seriesType = StructType.of( + "series_id", PrimitiveType.Uint64, + "title", PrimitiveType.Text, + "release_date", PrimitiveType.Date, + "series_info", PrimitiveType.Text + ); + // Create and fill list of series + ListValue seriesData = ListType.of(seriesType).newValue( + TestExampleData.SERIES.stream().map(series -> seriesType.newValue( + "series_id", PrimitiveValue.newUint64(series.seriesID()), + "title", PrimitiveValue.newText(series.title()), + "series_info", PrimitiveValue.newText(series.seriesInfo()), + "release_date", PrimitiveValue.newDate(series.releaseDate()) + )).collect(Collectors.toList()) + ); + + // Create type for struct of season + StructType seasonType = StructType.of( + "series_id", PrimitiveType.Uint64, + "season_id", PrimitiveType.Uint64, + "title", PrimitiveType.Text, + "first_aired", PrimitiveType.Date, + "last_aired", PrimitiveType.Date + ); + // Create and fill list of seasons + ListValue seasonsData = ListType.of(seasonType).newValue( + TestExampleData.SEASONS.stream().map(season -> seasonType.newValue( + "series_id", PrimitiveValue.newUint64(season.seriesID()), + "season_id", PrimitiveValue.newUint64(season.seasonID()), + "title", PrimitiveValue.newText(season.title()), + "first_aired", PrimitiveValue.newDate(season.firstAired()), + "last_aired", PrimitiveValue.newDate(season.lastAired()) + )).collect(Collectors.toList()) + ); + + @BeforeClass + public static void init() { + client = QueryClient.newClient(ydbRule) + .sessionPoolMaxSize(5) + .build(); + retryCtx = SessionRetryContext.create(client).build(); + + Assert.assertNotNull(client.getScheduler()); + } + + @After + public void clean() { + retryCtx.supplyResult(session -> session.createQuery("DROP TABLE series;", TxMode.NONE).execute()) + .join(); + retryCtx.supplyResult(session -> session.createQuery("DROP TABLE seasons;", TxMode.NONE).execute()) + .join(); + } + + @AfterClass + public static void cleanAll() { + client.close(); + } + + @Test + public void createScript() { + Status status = runCreateSuccessScript(); + Assert.assertTrue(status.isSuccess()); + + String query + = "SELECT series_id, title, release_date " + + "FROM series WHERE series_id = 1"; + + // Executes data query with specified transaction control settings. + QueryReader result = retryCtx.supplyResult( + session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + ).join().getValue(); + + ResultSetReader rs = result.getResultSet(0); + + // Check that table exists and contains no data + Assert.assertFalse(rs.next()); + } + + /** + * Ensures that script execution fails when it contains syntax errors. + *

+ * Attempts to execute a malformed YQL script and verifies that the result + * indicates failure. + */ + @Test + public void createScriptShouldFail() { + Status statusOperation = runCreateScript("CREATE TABLE series (" + + " series_id UInt64," + + " title Text," + + " series_info Text," + + " release_date Date," + + " PRIMARY KEY(series_id)" + + ");" + + "ZCREATE TABLE seasons (" + + " series_id UInt64," + + " season_id UInt64," + + " title Text," + + " first_aired Date," + + " last_aired Date," + + " PRIMARY KEY(series_id, season_id)" + + ")"); + + Assert.assertFalse(statusOperation.isSuccess()); + + String query + = "SELECT series_id, title, release_date " + + "FROM series WHERE series_id = 1"; + + // Executes data query with specified transaction control settings. + Result result = retryCtx.supplyResult( + session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + ).join(); + + + // Check that table exists and contains no data + Assert.assertFalse(result.isSuccess()); + } + + /** + * Verifies creation and data insertion using the {@link ScriptingProtos.ExecuteYqlResult} proto interface. + *

+ * Creates the necessary tables, inserts test data via declared parameters, + * and validates that the data was successfully persisted. + */ + @Test + public void createInsertYqlScript() { + runCreateSuccessScript(); + + ExecuteScriptSettings executeScriptSettings = tech.ydb.query.settings.ExecuteScriptSettings.newBuilder() + // .withExecMode(QueryExecMode.EXECUTE) + .build(); + + retryCtx.supplyResult(session -> session.executeScriptYql("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings) + ).join(); + + String query + = "SELECT series_id " + + "FROM seasons WHERE series_id = 1"; + + // Executes data query with specified transaction control settings. + Result result = retryCtx.supplyResult( + session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + ).join(); + + ResultSetReader rs = result.getValue().getResultSet(0); + + Assert.assertTrue(rs.next()); + Assert.assertEquals(1, rs.getColumn("series_id").getUint64()); + } + + /** + * Validates sequential script execution using QueryClient.executeScript. + *

+ * Creates tables, then inserts data in a separate script execution, and + * verifies data persistence. + */ + @Test + public void createInsertQueryScript() { + runCreateSuccessScript(); + + ExecuteScriptSettings executeScriptSettings = tech.ydb.query.settings.ExecuteScriptSettings.newBuilder() + .withExecMode(QueryExecMode.EXECUTE) + .withTtl(Duration.ofSeconds(10)) + .build(); + + CompletableFuture> test = retryCtx.supplyOperation(querySession -> querySession.executeScript("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings) + ); + + + retryCtx.supplyStatus( + ss -> + test.thenCompose(operation -> OperationTray.fetchOperation(operation, 1)) + ).join(); + + String query + = "SELECT series_id " + + "FROM seasons WHERE series_id = 1"; + + // Executes data query with specified transaction control settings. + Result result = retryCtx.supplyResult( + session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + ).join(); + + ResultSetReader rs = result.getValue().getResultSet(0); + + Assert.assertTrue(rs.next()); + Assert.assertEquals(1, rs.getColumn("series_id").getUint64()); + } + + /** + * Tests fetching results from an executed script using {@link FetchScriptSettings}. + * + *

Scenario: + *

    + *
  1. Create tables
  2. + *
  3. Insert sample data via parameterized script
  4. + *
  5. Fetch the result set from the executed operation
  6. + *
+ * + * @throws ExecutionException if the script future fails + * @throws InterruptedException if the fetch operation is interrupted + */ + @Test + public void fetchScript() throws ExecutionException, InterruptedException { + runCreateSuccessScript(); + + ExecuteScriptSettings executeScriptSettings = tech.ydb.query.settings.ExecuteScriptSettings.newBuilder() + .withExecMode(QueryExecMode.EXECUTE) + .build(); + + CompletableFuture> updateScript = + retryCtx.supplyOperation(querySession -> querySession.executeScript("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);" + + "SELECT season_id FROM seasons where series_id = 1 order by series_id;", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings)); + + retryCtx.supplyStatus( + ss -> + updateScript.thenCompose(operation -> OperationTray.fetchOperation(operation, 1)) + ).join(); + + + FetchScriptSettings fetchScriptSettings1 = FetchScriptSettings.newBuilder() + .withRowsLimit(1) + .withSetResultSetIndex(0) + .withEOperationId(updateScript.get().getId()) + .withFetchToken("") + .build(); + + YdbQuery.FetchScriptResultsResponse rs = checkFetch(fetchScriptSettings1, 1); + + FetchScriptSettings fetchScriptSettings2 = FetchScriptSettings.newBuilder() + .withRowsLimit(1) + .withSetResultSetIndex(0) + .withEOperationId(updateScript.get().getId()) + .withFetchToken(rs.getNextFetchToken()) + .build(); + + checkFetch(fetchScriptSettings2, 2); + } + + private YdbQuery.FetchScriptResultsResponse checkFetch(FetchScriptSettings fetchScriptSettings, int value) { + Result test = retryCtx.supplyResult( + session -> session.fetchScriptResults("" + + "SELECT season_id FROM seasons;", + Params.empty(), fetchScriptSettings) + ).join(); + + ValueProtos.ResultSet resultSet = test.getValue().getResultSet(); + Assert.assertEquals(1, resultSet.getRowsCount()); + + ResultSetReader reader = ProtoValueReaders.forResultSet(resultSet); + reader.next(); + Assert.assertEquals(value, reader.getColumn(0).getUint64()); + return test.getValue(); + } + + private Status runCreateSuccessScript() { + return runCreateScript("" + + "CREATE TABLE series (" + + " series_id UInt64," + + " title Text," + + " series_info Text," + + " release_date Date," + + " PRIMARY KEY(series_id)" + + ");" + + "" + + "CREATE TABLE seasons (" + + " series_id UInt64," + + " season_id UInt64," + + " title Text," + + " first_aired Date," + + " last_aired Date," + + " PRIMARY KEY(series_id, season_id)" + + ")"); + } + + private Status runCreateScript(String query) { + return retryCtx.supplyStatus( + querySession -> querySession.waitForScript( + retryCtx.supplyOperation(ss -> querySession.executeScript(query)))).join(); + } +}