diff --git a/src/errors.ts b/src/errors.ts index e185589c..1cef9d4b 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -315,9 +315,9 @@ export class ClientResourceExhausted extends TransportError { public readonly [RetryPolicySymbol] = retryPolicy(Backoff.Slow, false, true, true); } -export class ClientCancelled extends TransportError { +export class ClientCancelled extends TransportError { // TODO: "Call cancelled" error appears also when connection string is wrong - would be right to avoid such dead lock retrying static status = StatusCode.CLIENT_CANCELED; - public readonly [RetryPolicySymbol] = retryPolicy(Backoff.No, false, false, false); + public readonly [RetryPolicySymbol] = retryPolicy(Backoff.Fast, false, true, false); } const TRANSPORT_ERROR_CODES = new Map([ diff --git a/src/retries_obsoleted.ts b/src/retries_obsoleted.ts index 2f1a3486..139f6107 100644 --- a/src/retries_obsoleted.ts +++ b/src/retries_obsoleted.ts @@ -59,7 +59,7 @@ const RETRYABLE_ERRORS_FAST = [ ]; const RETRYABLE_ERRORS_SLOW = [errors.Overloaded, errors.ClientResourceExhausted]; -class RetryStrategy { +export class RetryStrategy { // private logger: Logger; constructor( public methodName = 'UnknownClass::UnknownMethod', diff --git a/src/table/table-session.ts b/src/table/table-session.ts index eb70808b..a36fe92a 100644 --- a/src/table/table-session.ts +++ b/src/table/table-session.ts @@ -21,7 +21,7 @@ import * as grpc from "@grpc/grpc-js"; import EventEmitter from "events"; import {ICreateSessionResult, SessionEvent, TableService} from "./table-session-pool"; import {Endpoint} from "../discovery"; -import {retryable} from "../retries_obsoleted"; +import {retryable, RetryParameters, RetryStrategy} from "../retries_obsoleted"; import {MissingStatus, MissingValue, SchemeError, YdbError} from "../errors"; import {ResponseMetadataKeys} from "../constants"; import {pessimizable} from "../utils"; @@ -171,15 +171,21 @@ export class PrepareQuerySettings extends OperationParamsSettings { } export class ExecuteQuerySettings extends OperationParamsSettings { - keepInCache: boolean = false; + keepInCache?: boolean = false; collectStats?: Ydb.Table.QueryStatsCollection.Mode; onResponseMetadata?: (metadata: grpc.Metadata) => void; + idempotent: boolean = false; withKeepInCache(keepInCache: boolean) { this.keepInCache = keepInCache; return this; } + withIdempotent(idempotent: boolean) { + this.idempotent = idempotent; + return this; + } + withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) { this.collectStats = collectStats; return this; @@ -258,6 +264,8 @@ export class ExecuteScanQuerySettings { } } +let executeQueryRetryer: RetryStrategy; + export class TableSession extends EventEmitter implements ICreateSessionResult { private beingDeleted = false; private free = true; @@ -518,7 +526,13 @@ export class TableSession extends EventEmitter implements ICreateSessionResult { if (keepInCache) { request.queryCachePolicy = {keepInCache}; } - const response = await this.api.executeDataQuery(request); + + if (!executeQueryRetryer) executeQueryRetryer = new RetryStrategy('TableSession:executeQuery', new RetryParameters(), this.logger); + + const response = + settings?.idempotent + ? await executeQueryRetryer.retry(() => this.api.executeDataQuery(request)) + : await this.api.executeDataQuery(request); const payload = getOperationPayload(this.processResponseMetadata(request, response, settings?.onResponseMetadata)); return ExecuteQueryResult.decode(payload); } diff --git a/src/utils/test/create-table.ts b/src/utils/test/create-table.ts index 3564c406..a20ab94e 100644 --- a/src/utils/test/create-table.ts +++ b/src/utils/test/create-table.ts @@ -1,5 +1,5 @@ -import {Column, TableDescription, TableSession} from "../../table"; -import {withRetries} from "../../retries_obsoleted"; +import {AUTO_TX, Column, ExecuteQuerySettings, TableDescription, TableSession} from "../../table"; +// import {withRetries} from "../../retries_obsoleted"; import {Types} from "../../types"; import {Row} from "./row"; @@ -29,10 +29,23 @@ DECLARE $data AS List>; REPLACE INTO ${TABLE} SELECT * FROM AS_TABLE($data);`; - await withRetries(async () => { - const preparedQuery = await session.prepareQuery(query); - await session.executeQuery(preparedQuery, { + // Now we can specify that the operation should be repeated in case of an error by specifying that it is idempotent + + // Old code: + + // await withRetries(async () => { + // const preparedQuery = await session.prepareQuery(query); + // await session.executeQuery(preparedQuery, { + // '$data': Row.asTypedCollection(rows), + // }); + // }); + + // New code variant: + + const preparedQuery = await session.prepareQuery(query); + await session.executeQuery(preparedQuery, { '$data': Row.asTypedCollection(rows), - }); - }); + }, + AUTO_TX, + new ExecuteQuerySettings().withIdempotent(true)); }