diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index da468104..b7a75252 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,7 +25,6 @@ jobs: services: ydb: - # image: ghcr.io/ydb-platform/local-ydb:nightly image: ydbplatform/local-ydb:24.1 ports: - 2135:2135 diff --git a/CHANGELOG.md b/CHANGELOG.md index 4453dac1..685dd8d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,27 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +## [5.4.0](https://github.com/ydb-platform/ydb-nodejs-sdk/compare/v5.3.5...v5.4.0) (2024-09-27) + + +### Features + +* add idempotent option to tableClient session.executeQuery ([89df57e](https://github.com/ydb-platform/ydb-nodejs-sdk/commit/89df57eeb39a322af7dfbe84ce25c20549cce70b)) + +## [5.3.5](https://github.com/ydb-platform/ydb-nodejs-sdk/compare/v5.3.4...v5.3.5) (2024-09-26) + + +### Bug Fixes + +* "Call cancelled" error ([ee14b9f](https://github.com/ydb-platform/ydb-nodejs-sdk/commit/ee14b9f386059a97b41ebe417fab905ea72b18ba)) + +## [5.3.4](https://github.com/ydb-platform/ydb-nodejs-sdk/compare/v5.3.3...v5.3.4) (2024-08-20) + + +### Bug Fixes + +* initial version of topic service client ([2db4f42](https://github.com/ydb-platform/ydb-nodejs-sdk/commit/2db4f424f8da9f8751a7efe847eb8cad3a02633d)) + ## [5.3.3](https://github.com/ydb-platform/ydb-nodejs-sdk/compare/v5.3.2...v5.3.3) (2024-07-04) diff --git a/examples/basic-example-v2-with-query-service/index.ts b/examples/basic-example-v2-with-query-service/index.ts index ac94e875..e3170c41 100644 --- a/examples/basic-example-v2-with-query-service/index.ts +++ b/examples/basic-example-v2-with-query-service/index.ts @@ -156,8 +156,7 @@ async function selectWithParameters(driver: Driver, data: ThreeIds[], logger: Lo DECLARE $seasonId AS Uint64; DECLARE $episodeId AS Uint64; - SELECT title,| - air_date + SELECT title, air_date FROM episodes WHERE series_id = $seriesId AND season_id = $seasonId diff --git a/package-lock.json b/package-lock.json index 2957cd74..0e97687b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "ydb-sdk", - "version": "5.3.3", + "version": "5.4.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "ydb-sdk", - "version": "5.3.3", + "version": "5.4.0", "license": "Apache", "dependencies": { "@grpc/grpc-js": "^1.5.3", diff --git a/package.json b/package.json index b5083ecb..78ec0607 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ydb-sdk", - "version": "5.3.3", + "version": "5.4.0", "description": "Node.js bindings for working with YDB API over gRPC", "main": "build/cjs/src/index.js", "module": "build/esm/src/index.js", diff --git a/src/__tests__/e2e/connection.test.ts b/src/__tests__/e2e/connection.test.ts index 8d1e75f7..bf6b6042 100644 --- a/src/__tests__/e2e/connection.test.ts +++ b/src/__tests__/e2e/connection.test.ts @@ -1,6 +1,8 @@ if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); import {initDriver, destroyDriver} from "../../utils/test"; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); + describe('Connection', () => { it('Test GRPC connection', async () => { let driver = await initDriver({endpoint: process.env.YDB_ENDPOINT || 'grpc://localhost:2136'}); diff --git a/src/__tests__/e2e/query-service/method-execute.ts b/src/__tests__/e2e/query-service/method-execute.ts index 1c72c14a..7201cdb0 100644 --- a/src/__tests__/e2e/query-service/method-execute.ts +++ b/src/__tests__/e2e/query-service/method-execute.ts @@ -14,6 +14,8 @@ import ExecMode = Ydb.Query.ExecMode; import {RetryParameters} from "../../../retries/retryParameters"; import {RetryStrategy} from "../../../retries/retryStrategy"; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); + const DATABASE = '/local'; const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136'; const TABLE_NAME = 'test_table_1' diff --git a/src/__tests__/e2e/query-service/query-service-client.ts b/src/__tests__/e2e/query-service/query-service-client.ts index d3825b25..05aeff22 100644 --- a/src/__tests__/e2e/query-service/query-service-client.ts +++ b/src/__tests__/e2e/query-service/query-service-client.ts @@ -7,6 +7,8 @@ import fs from "fs"; import {AUTO_TX} from "../../../table"; import {QuerySession, IExecuteResult} from "../../../query"; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); + const DATABASE = '/local'; const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136'; diff --git a/src/__tests__/e2e/query-service/rows-conversion.ts b/src/__tests__/e2e/query-service/rows-conversion.ts index 8c824826..888aa347 100644 --- a/src/__tests__/e2e/query-service/rows-conversion.ts +++ b/src/__tests__/e2e/query-service/rows-conversion.ts @@ -12,6 +12,8 @@ import {Context} from "../../../context"; import {RetryParameters} from "../../../retries/retryParameters"; import {RetryStrategy} from "../../../retries/retryStrategy"; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); + const DATABASE = '/local'; const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136'; const TABLE_NAME = 'test_table_3' diff --git a/src/__tests__/e2e/query-service/transactions.ts b/src/__tests__/e2e/query-service/transactions.ts index 2b9a4009..4cc371e4 100644 --- a/src/__tests__/e2e/query-service/transactions.ts +++ b/src/__tests__/e2e/query-service/transactions.ts @@ -11,6 +11,8 @@ import {Context} from "../../../context"; import {RetryParameters} from "../../../retries/retryParameters"; import {RetryStrategy} from "../../../retries/retryStrategy"; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); + const DATABASE = '/local'; const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136'; diff --git a/src/__tests__/e2e/retries.test.ts b/src/__tests__/e2e/retries.test.ts index 633bba60..ccb1ae3b 100644 --- a/src/__tests__/e2e/retries.test.ts +++ b/src/__tests__/e2e/retries.test.ts @@ -1,4 +1,3 @@ -if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); import Driver from '../../driver'; import { Aborted, @@ -26,7 +25,7 @@ import {pessimizable} from "../../utils"; import {destroyDriver, initDriver} from "../../utils/test"; import {LogLevel, SimpleLogger} from "../../logger/simple-logger"; -const MAX_RETRIES = 3; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); const logger = new SimpleLogger({level: LogLevel.error}); class ErrorThrower { diff --git a/src/__tests__/e2e/table-service/alter-table.test.ts b/src/__tests__/e2e/table-service/alter-table.test.ts index 96e942f8..ab74307f 100644 --- a/src/__tests__/e2e/table-service/alter-table.test.ts +++ b/src/__tests__/e2e/table-service/alter-table.test.ts @@ -11,6 +11,8 @@ import { } from "../../../table"; import {initDriver, destroyDriver} from "../../../utils/test"; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); + const getTableName = () => `table_alter_${Math.trunc(1000 * Math.random())}`; describe('Alter table', () => { diff --git a/src/__tests__/e2e/table-service/bulk-upsert.test.ts b/src/__tests__/e2e/table-service/bulk-upsert.test.ts index f248c264..90dcd92e 100644 --- a/src/__tests__/e2e/table-service/bulk-upsert.test.ts +++ b/src/__tests__/e2e/table-service/bulk-upsert.test.ts @@ -4,6 +4,8 @@ import Driver from '../../../driver'; import {TableSession} from "../../../table"; import {Row, initDriver, destroyDriver, createTable, fillTableWithData, TABLE} from "../../../utils/test"; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); + async function readTable(session: TableSession): Promise { const rows: Row[] = []; diff --git a/src/__tests__/e2e/table-service/bytestring-identity.test.ts b/src/__tests__/e2e/table-service/bytestring-identity.test.ts index ce6bbc47..23d45677 100644 --- a/src/__tests__/e2e/table-service/bytestring-identity.test.ts +++ b/src/__tests__/e2e/table-service/bytestring-identity.test.ts @@ -5,6 +5,8 @@ import {withRetries} from '../../../retries_obsoleted'; import {Column, TableSession, TableDescription} from "../../../table"; import {initDriver, destroyDriver, TABLE} from "../../../utils/test"; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); + async function createTable(session: TableSession) { await session.dropTable(TABLE); await session.createTable( diff --git a/src/__tests__/e2e/table-service/create-table.test.ts b/src/__tests__/e2e/table-service/create-table.test.ts index 85161961..a26ad7c0 100644 --- a/src/__tests__/e2e/table-service/create-table.test.ts +++ b/src/__tests__/e2e/table-service/create-table.test.ts @@ -6,6 +6,8 @@ import {Ydb} from 'ydb-sdk-proto'; import {Column, DescribeTableSettings, TableDescription} from "../../../table"; import {initDriver, destroyDriver} from "../../../utils/test"; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); + const getTableName = () => `table_create_${Math.trunc(100000 * Math.random())}`; describe('Create table', () => { diff --git a/src/__tests__/e2e/table-service/graceful-session-close.test.ts b/src/__tests__/e2e/table-service/graceful-session-close.test.ts index cd8e851c..8e0a4c11 100644 --- a/src/__tests__/e2e/table-service/graceful-session-close.test.ts +++ b/src/__tests__/e2e/table-service/graceful-session-close.test.ts @@ -7,6 +7,8 @@ import {initDriver, destroyDriver} from "../../../utils/test"; const SHUTDOWN_URL = process.env.YDB_SHUTDOWN_URL || 'http://localhost:8765/actors/kqp_proxy?force_shutdown=all'; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); + describe('Graceful session close', () => { // TODO: Fix and enable test nce issue will be resolved https://github.com/ydb-platform/ydb/issues/2981 diff --git a/src/__tests__/e2e/table-service/read-table.test.ts b/src/__tests__/e2e/table-service/read-table.test.ts index d2d5cf02..dd62ce1c 100644 --- a/src/__tests__/e2e/table-service/read-table.test.ts +++ b/src/__tests__/e2e/table-service/read-table.test.ts @@ -4,6 +4,8 @@ import {TypedValues, TypedData} from '../../../types'; import {ReadTableSettings, TableSession} from "../../../table"; import {Row, initDriver, destroyDriver, createTable, fillTableWithData, TABLE} from "../../../utils/test"; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); + async function readTable(session: TableSession, settings: ReadTableSettings): Promise { const rows: TypedData[] = []; diff --git a/src/__tests__/e2e/table-service/scan-query.test.ts b/src/__tests__/e2e/table-service/scan-query.test.ts index 07b0c797..1e85292f 100644 --- a/src/__tests__/e2e/table-service/scan-query.test.ts +++ b/src/__tests__/e2e/table-service/scan-query.test.ts @@ -4,6 +4,8 @@ import {TypedData} from '../../../types'; import {TableSession} from "../../../table"; import {Row, initDriver, destroyDriver, createTable, fillTableWithData, TABLE} from "../../../utils/test"; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); + async function executeScanQuery(session: TableSession): Promise { const query = `SELECT * FROM ${TABLE};`; diff --git a/src/__tests__/e2e/table-service/types.test.ts b/src/__tests__/e2e/table-service/types.test.ts index 5210900e..259d4da0 100644 --- a/src/__tests__/e2e/table-service/types.test.ts +++ b/src/__tests__/e2e/table-service/types.test.ts @@ -6,6 +6,8 @@ import {TypedData, TypedValues, Types} from '../../../types'; import NullValue = google.protobuf.NullValue; import {initDriver, destroyDriver} from "../../../utils/test"; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); + describe('Types', () => { let driver: Driver; diff --git a/src/__tests__/e2e/topic-service/internal.test.ts b/src/__tests__/e2e/topic-service/internal.test.ts index 247d1545..4ad0e44b 100644 --- a/src/__tests__/e2e/topic-service/internal.test.ts +++ b/src/__tests__/e2e/topic-service/internal.test.ts @@ -17,6 +17,8 @@ import {Context} from "../../../context"; import {RetryParameters} from "../../../retries/retryParameters"; import {RetryStrategy} from "../../../retries/retryStrategy"; +if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); + const DATABASE = '/local'; const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136'; diff --git a/src/__tests__/e2e/topic-service/send-messages.test.ts b/src/__tests__/e2e/topic-service/send-messages.test.ts index e35ecb23..f769c039 100644 --- a/src/__tests__/e2e/topic-service/send-messages.test.ts +++ b/src/__tests__/e2e/topic-service/send-messages.test.ts @@ -1,9 +1,100 @@ +import {AnonymousAuthService, Driver as YDB} from '../../../index'; +import {google, Ydb} from "ydb-sdk-proto"; + if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config(); -// import {AnonymousAuthService, Driver as YDB} from '../../../index'; -// import {google, Ydb} from "ydb-sdk-proto"; // create topic +xdescribe('Topic: Send messages', () => { + let ydb: YDB | undefined; + + beforeEach(async () => { + ydb = new YDB({ + connectionString: 'grpc://localhost:2136/?database=local', + authService: new AnonymousAuthService(), + }); + }); + + afterEach(async () => { + if (ydb) { + await ydb.destroy(); + ydb = undefined; + } + }); + + it('General', async () => { + const topicClient = await ydb!.topic; + + await topicClient.createTopic({ + path: 'testTopic' + }); + + const writer = await topicClient.createWriter({ + path: 'testTopic' + }); + + const res1 = await writer.sendMessages({ + // tx: + codec: Ydb.Topic.Codec.CODEC_RAW, + messages: [{ + data: Buffer.alloc(10, '1234567890'), + uncompressedSize: '1234567890'.length, + seqNo: 1, + createdAt: google.protobuf.Timestamp.create({ + seconds: 123 /* Math.trunc(Date.now() / 1000) */, + nanos: 456 /* Date.now() % 1000 */, + }), + messageGroupId: 'abc', // TODO: Check examples + partitionId: 1, + // metadataItems: // TODO: Should I use this? + }], + }); + + console.info('res1:', res1); + + const res2 = await writer.sendMessages({ + // tx: + codec: Ydb.Topic.Codec.CODEC_RAW, + messages: [{ + data: Buffer.alloc(10, '1234567890'), + uncompressedSize: '1234567890'.length, + seqNo: 1, + createdAt: google.protobuf.Timestamp.create({ + seconds: 123 /*Date.now() / 1000*/, + nanos: 456 /*Date.now() % 1000*/, + }), + messageGroupId: 'abc', // TODO: Check examples + partitionId: 1, + // metadataItems: // TODO: Should I use this? + }], + }); + + console.info('res2:', res2); + + const res3 = await writer.sendMessages({ + // tx: + codec: Ydb.Topic.Codec.CODEC_RAW, + messages: [{ + data: Buffer.alloc(10, '1234567890'), + uncompressedSize: '1234567890'.length, + seqNo: 1, + createdAt: google.protobuf.Timestamp.create({ + seconds: 123 /*Date.now() / 1000*/, + nanos: 456 /*Date.now() % 1000*/, + }), + messageGroupId: 'abc', // TODO: Check examples + partitionId: 1, + // metadataItems: // TODO: Should I use this? + }], + }); + + console.info('res3:', res3); + + // TODO: Send few messages + + // TODO: Wait for ack + + // TODO: Close before all messages are acked // xdescribe('Topic: Send messages', () => { // let ydb: YDB | undefined; diff --git a/src/errors.ts b/src/errors.ts index 92137d07..1cef9d4b 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -46,6 +46,7 @@ export enum StatusCode { UNAUTHENTICATED = CLIENT_STATUSES_FIRST + 30, // SDK local SESSION_POOL_EMPTY = CLIENT_STATUSES_FIRST + 40, // SDK local + RETRIES_EXCEEDED = CLIENT_STATUSES_FIRST + 50, // SDK local } /** @@ -314,15 +315,20 @@ export class ClientResourceExhausted extends TransportError { public readonly [RetryPolicySymbol] = retryPolicy(Backoff.Slow, false, true, true); } +export class ClientCancelled extends TransportError { // TODO: "Call cancelled" error appears also when connection string is wrong - would be right to avoid such dead lock retrying + static status = StatusCode.CLIENT_CANCELED; + public readonly [RetryPolicySymbol] = retryPolicy(Backoff.Fast, false, true, false); +} + const TRANSPORT_ERROR_CODES = new Map([ - [GrpcStatus.CANCELLED, Cancelled], + [GrpcStatus.CANCELLED, ClientCancelled], [GrpcStatus.UNAVAILABLE, TransportUnavailable], [GrpcStatus.DEADLINE_EXCEEDED, ClientDeadlineExceeded], [GrpcStatus.RESOURCE_EXHAUSTED, ClientResourceExhausted] ]); -export class ClientCancelled extends YdbError { - static status = StatusCode.CLIENT_CANCELED; +export class RetriesExceeded extends YdbError { + static status = StatusCode.RETRIES_EXCEEDED; public readonly [RetryPolicySymbol] = retryPolicy(Backoff.No, false, false, false); constructor(public readonly cause: Error) { diff --git a/src/retries/retryStrategy.ts b/src/retries/retryStrategy.ts index 520a5e7b..26d792a4 100644 --- a/src/retries/retryStrategy.ts +++ b/src/retries/retryStrategy.ts @@ -1,4 +1,4 @@ -import {Backoff, ClientCancelled, SpecificErrorRetryPolicy} from "../errors"; +import {Backoff, RetriesExceeded, SpecificErrorRetryPolicy} from "../errors"; import {HasLogger} from "../logger/has-logger"; import {Logger} from "../logger/simple-logger"; import {RetryParameters} from "./retryParameters"; @@ -45,7 +45,7 @@ export class RetryStrategy implements HasLogger { while (true) { if (maxRetries !== 0 && attemptsCounter >= maxRetries) { // to support the old logic for a while this.logger.debug(tooManyAttempts, attemptsCounter); - throw new ClientCancelled(new Error(`Too many attempts: ${attemptsCounter}`)); + throw new RetriesExceeded(new Error(`Too many attempts: ${attemptsCounter}`)); } let r: RetryLambdaResult; try { diff --git a/src/retries_obsoleted.ts b/src/retries_obsoleted.ts index d27de982..139f6107 100644 --- a/src/retries_obsoleted.ts +++ b/src/retries_obsoleted.ts @@ -2,7 +2,6 @@ import {YdbError, TransportError} from './errors'; import * as errors from './errors'; import * as utils from "./utils"; import {Logger} from "./logger/simple-logger"; -// import {getDefaultLogger} from "./logger/get-default-logger"; export class BackoffSettings { /** @@ -56,10 +55,11 @@ const RETRYABLE_ERRORS_FAST = [ errors.NotFound, errors.TransportUnavailable, errors.ClientDeadlineExceeded, + errors.ClientCancelled, ]; const RETRYABLE_ERRORS_SLOW = [errors.Overloaded, errors.ClientResourceExhausted]; -class RetryStrategy { +export class RetryStrategy { // private logger: Logger; constructor( public methodName = 'UnknownClass::UnknownMethod', diff --git a/src/table/table-session.ts b/src/table/table-session.ts index eb70808b..a36fe92a 100644 --- a/src/table/table-session.ts +++ b/src/table/table-session.ts @@ -21,7 +21,7 @@ import * as grpc from "@grpc/grpc-js"; import EventEmitter from "events"; import {ICreateSessionResult, SessionEvent, TableService} from "./table-session-pool"; import {Endpoint} from "../discovery"; -import {retryable} from "../retries_obsoleted"; +import {retryable, RetryParameters, RetryStrategy} from "../retries_obsoleted"; import {MissingStatus, MissingValue, SchemeError, YdbError} from "../errors"; import {ResponseMetadataKeys} from "../constants"; import {pessimizable} from "../utils"; @@ -171,15 +171,21 @@ export class PrepareQuerySettings extends OperationParamsSettings { } export class ExecuteQuerySettings extends OperationParamsSettings { - keepInCache: boolean = false; + keepInCache?: boolean = false; collectStats?: Ydb.Table.QueryStatsCollection.Mode; onResponseMetadata?: (metadata: grpc.Metadata) => void; + idempotent: boolean = false; withKeepInCache(keepInCache: boolean) { this.keepInCache = keepInCache; return this; } + withIdempotent(idempotent: boolean) { + this.idempotent = idempotent; + return this; + } + withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) { this.collectStats = collectStats; return this; @@ -258,6 +264,8 @@ export class ExecuteScanQuerySettings { } } +let executeQueryRetryer: RetryStrategy; + export class TableSession extends EventEmitter implements ICreateSessionResult { private beingDeleted = false; private free = true; @@ -518,7 +526,13 @@ export class TableSession extends EventEmitter implements ICreateSessionResult { if (keepInCache) { request.queryCachePolicy = {keepInCache}; } - const response = await this.api.executeDataQuery(request); + + if (!executeQueryRetryer) executeQueryRetryer = new RetryStrategy('TableSession:executeQuery', new RetryParameters(), this.logger); + + const response = + settings?.idempotent + ? await executeQueryRetryer.retry(() => this.api.executeDataQuery(request)) + : await this.api.executeDataQuery(request); const payload = getOperationPayload(this.processResponseMetadata(request, response, settings?.onResponseMetadata)); return ExecuteQueryResult.decode(payload); } diff --git a/src/utils/test/create-table.ts b/src/utils/test/create-table.ts index 3564c406..a20ab94e 100644 --- a/src/utils/test/create-table.ts +++ b/src/utils/test/create-table.ts @@ -1,5 +1,5 @@ -import {Column, TableDescription, TableSession} from "../../table"; -import {withRetries} from "../../retries_obsoleted"; +import {AUTO_TX, Column, ExecuteQuerySettings, TableDescription, TableSession} from "../../table"; +// import {withRetries} from "../../retries_obsoleted"; import {Types} from "../../types"; import {Row} from "./row"; @@ -29,10 +29,23 @@ DECLARE $data AS List>; REPLACE INTO ${TABLE} SELECT * FROM AS_TABLE($data);`; - await withRetries(async () => { - const preparedQuery = await session.prepareQuery(query); - await session.executeQuery(preparedQuery, { + // Now we can specify that the operation should be repeated in case of an error by specifying that it is idempotent + + // Old code: + + // await withRetries(async () => { + // const preparedQuery = await session.prepareQuery(query); + // await session.executeQuery(preparedQuery, { + // '$data': Row.asTypedCollection(rows), + // }); + // }); + + // New code variant: + + const preparedQuery = await session.prepareQuery(query); + await session.executeQuery(preparedQuery, { '$data': Row.asTypedCollection(rows), - }); - }); + }, + AUTO_TX, + new ExecuteQuerySettings().withIdempotent(true)); }