diff --git a/CHANGELOG.md b/CHANGELOG.md index 333edd4f..9f1c8d08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -286,7 +286,7 @@ All notable changes to this project will be documented in this file. See [standa - decimal value is present as string instead of bigint (it wasn't working for float values before) - fix uuid and tz-date types conversion (it wasn't working before) -* signatures of most methods in TableSession are changed: +* signatures of most methods in QuerySession are changed: - executeQuery Before: `(query, params, txControl, operationParams?, settings?, collectStats?)` After: `(query, params, txControl, settings?)` @@ -336,7 +336,7 @@ All notable changes to this project will be documented in this file. See [standa * drop support of old environment variables ([963819a](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/963819af9209a45749f5118077f1da4bdb390fa6)) * reorganize signature of SchemeClient's methods ([734d57a](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/734d57a2dd7c655cf727b96df415212504339cf8)) -* reorganize signatures of TableSession's methods ([431f149](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/431f1491bf880f3ba9541d9455d8dd2f2b7849e6)) +* reorganize signatures of QuerySession's methods ([431f149](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/431f1491bf880f3ba9541d9455d8dd2f2b7849e6)) * use identity names conversion in TypedData ([275598a](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/275598aa444e1e977386a3dadd02bbc9ba01f38e)) ### [2.9.2](https://www.github.com/ydb-platform/ydb-nodejs-sdk/compare/v2.9.1...v2.9.2) (2022-02-09) @@ -431,7 +431,7 @@ All notable changes to this project will be documented in this file. See [standa * and many other changes in protobufs. ### 1.10.0 -* Add `alterTable` method to TableSession class +* Add `alterTable` method to QuerySession class * Put compiled protobufs to a separate 'ydb-sdk' namespace ### 1.9.0 diff --git a/package-lock.json b/package-lock.json index fc52bf36..190a1b51 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9639,9 +9639,9 @@ } }, "node_modules/ydb-sdk-proto": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.1.0.tgz", - "integrity": "sha512-O/cmNQJjgv11QM51xj3szS1rU/0qRYqAZF8rlsCDzsK+JQt76+LW8d+N8mzIikjXap53lK4I6s1NudeuT3OviQ==" + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.2.1.tgz", + "integrity": "sha512-6/p+LggVbglreOTvqW1PZPvrO0LEVyj+fsSmvH2BCzTm1jAWbqXMocQFBfCRzbOhk07L03/KRS82mS7gIfQ+sA==" }, "node_modules/yn": { "version": "3.1.1", @@ -16980,9 +16980,9 @@ "dev": true }, "ydb-sdk-proto": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.1.0.tgz", - "integrity": "sha512-O/cmNQJjgv11QM51xj3szS1rU/0qRYqAZF8rlsCDzsK+JQt76+LW8d+N8mzIikjXap53lK4I6s1NudeuT3OviQ==" + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.2.1.tgz", + "integrity": "sha512-6/p+LggVbglreOTvqW1PZPvrO0LEVyj+fsSmvH2BCzTm1jAWbqXMocQFBfCRzbOhk07L03/KRS82mS7gIfQ+sA==" }, "yn": { "version": "3.1.1", diff --git "a/src/__tests__/e2e/query-service/exec-qu\321\203ry.test.ts" "b/src/__tests__/e2e/query-service/exec-qu\321\203ry.test.ts" new file mode 100644 index 00000000..31414a4a --- /dev/null +++ "b/src/__tests__/e2e/query-service/exec-qu\321\203ry.test.ts" @@ -0,0 +1,86 @@ +import Driver from '../../../driver'; +import { + createTable, + destroyDriver, + fillTableWithData, + initDriver, + Row, + TABLE +} from '../../../test-utils'; +import {ReadTableSettings} from '../../../table'; +import {TypedValues, TypedData} from '../../../types'; +import {QuerySession} from "../../../query/query-session"; + +// async function execQuery(session: QuerySession): Promise { +// const rows: TypedData[] = []; +// +// await session.streamReadTable(TABLE, (result) => { +// if (result.resultSet) { +// rows.push(...Row.createNativeObjects(result.resultSet)); +// } +// }, settings); +// +// return rows; +// } + +describe('Query service', () => { + let driver: Driver; + + beforeAll(async () => { + driver = await initDriver(); + }); + + afterAll(async () => await destroyDriver(driver)); + + it('Test', async () => { + await driver.tableClient.withSession(async (session) => { + const expectedRows = [ + new Row({id: 1, title: 'one'}), + new Row({id: 2, title: 'two'}), + ]; + + await createTable(session); + + // TODO: Create few tables + + // TODO: Make quries with few datasets + + await fillTableWithData(session, expectedRows); + + const res = await driver.queryClient.exec({ + query: 'SELECT * FROM ${TABLE}', + // Is callback a good name? + callback: (session: QuerySession) => { + // session.beginTransaction(), + // TODO: query -> array + // return; + }, + }); + + + + // { + // const rows = await execQuery(session, new ReadTableSettings()); + // expect(rows).toEqual(expectedRows); + // } + // + // { + // const rows = await readTable(session, new ReadTableSettings().withKeyRange({ + // greaterOrEqual: TypedValues.tuple(TypedValues.optional(TypedValues.uint64(1))), + // lessOrEqual: TypedValues.tuple(TypedValues.optional(TypedValues.uint64(2))), + // })); + // + // expect(rows).toEqual(expectedRows); + // } + // + // { + // const rows = await readTable(session, new ReadTableSettings().withKeyRange({ + // greater: TypedValues.tuple(TypedValues.optional(TypedValues.uint64(1))), + // lessOrEqual: TypedValues.tuple(TypedValues.optional(TypedValues.uint64(2))), + // })); + // + // expect(rows).toEqual(expectedRows.slice(1)); + // } + }); + }); +}); diff --git a/src/__tests__/alter-table.test.ts b/src/__tests__/e2e/table-service/alter-table.test.ts similarity index 97% rename from src/__tests__/alter-table.test.ts rename to src/__tests__/e2e/table-service/alter-table.test.ts index f667ecaf..f0135901 100644 --- a/src/__tests__/alter-table.test.ts +++ b/src/__tests__/e2e/table-service/alter-table.test.ts @@ -1,5 +1,5 @@ -import Driver from '../driver'; -import { destroyDriver, initDriver } from '../test-utils'; +import Driver from '../../../driver'; +import { destroyDriver, initDriver } from '../../../test-utils'; import { AlterTableDescription, AlterTableSettings, @@ -7,8 +7,8 @@ import { OperationParams, TableDescription, TableIndex, -} from '../table/table-session'; -import { Types } from '../types'; +} from '../../../table'; +import { Types } from '../../../types'; const getTableName = () => `table_alter_${Math.trunc(1000 * Math.random())}`; diff --git a/src/__tests__/bulk-upsert.test.ts b/src/__tests__/e2e/table-service/bulk-upsert.test.ts similarity index 93% rename from src/__tests__/bulk-upsert.test.ts rename to src/__tests__/e2e/table-service/bulk-upsert.test.ts index b10dd4b1..91c74d27 100644 --- a/src/__tests__/bulk-upsert.test.ts +++ b/src/__tests__/e2e/table-service/bulk-upsert.test.ts @@ -1,4 +1,4 @@ -import Driver from '../driver'; +import Driver from '../../../driver'; import { createTable, destroyDriver, @@ -6,8 +6,8 @@ import { initDriver, Row, TABLE -} from '../test-utils'; -import {TableSession} from '../table/table-session'; +} from '../../../test-utils'; +import {TableSession} from '../../../table'; import {Ydb} from 'ydb-sdk-proto'; async function readTable(session: TableSession): Promise { diff --git a/src/__tests__/bytestring-identity.test.ts b/src/__tests__/e2e/table-service/bytestring-identity.test.ts similarity index 90% rename from src/__tests__/bytestring-identity.test.ts rename to src/__tests__/e2e/table-service/bytestring-identity.test.ts index ec2819c5..0ee2ec1c 100644 --- a/src/__tests__/bytestring-identity.test.ts +++ b/src/__tests__/e2e/table-service/bytestring-identity.test.ts @@ -1,8 +1,8 @@ -import Driver from '../driver'; -import {destroyDriver, initDriver, TABLE} from '../test-utils'; -import {Column, TableSession, TableDescription} from '../table/table-session'; -import {declareType, TypedData, Types} from '../types'; -import {withRetries} from '../retries'; +import Driver from '../../../driver'; +import {destroyDriver, initDriver, TABLE} from '../../../test-utils'; +import {Column, TableSession, TableDescription} from '../../../table'; +import {declareType, TypedData, Types} from '../../../types'; +import {withRetries} from '../../../retries'; async function createTable(session: TableSession) { await session.dropTable(TABLE); diff --git a/src/__tests__/connection.test.ts b/src/__tests__/e2e/table-service/connection.test.ts similarity index 90% rename from src/__tests__/connection.test.ts rename to src/__tests__/e2e/table-service/connection.test.ts index f5ae1f3e..627533fc 100644 --- a/src/__tests__/connection.test.ts +++ b/src/__tests__/e2e/table-service/connection.test.ts @@ -1,4 +1,4 @@ -import {initDriver, destroyDriver} from '../test-utils'; +import {initDriver, destroyDriver} from '../../../test-utils'; describe('Connection', () => { it('Test GRPC connection', async () => { diff --git a/src/__tests__/create-table.test.ts b/src/__tests__/e2e/table-service/create-table.test.ts similarity index 98% rename from src/__tests__/create-table.test.ts rename to src/__tests__/e2e/table-service/create-table.test.ts index 939352e3..d4db86e4 100644 --- a/src/__tests__/create-table.test.ts +++ b/src/__tests__/e2e/table-service/create-table.test.ts @@ -1,7 +1,7 @@ -import Driver from '../driver'; -import {destroyDriver, initDriver} from '../test-utils'; -import {Column, DescribeTableSettings, TableDescription} from '../table/table-session'; -import {TypedValues, Types} from '../types'; +import Driver from '../../../driver'; +import {destroyDriver, initDriver} from '../../../test-utils'; +import {Column, DescribeTableSettings, TableDescription} from '../../../table'; +import {TypedValues, Types} from '../../../types'; import Long from 'long'; import {Ydb} from 'ydb-sdk-proto'; diff --git a/src/__tests__/graceful-session-close.test.ts b/src/__tests__/e2e/table-service/graceful-session-close.test.ts similarity index 90% rename from src/__tests__/graceful-session-close.test.ts rename to src/__tests__/e2e/table-service/graceful-session-close.test.ts index b288a21f..5a33d3bd 100644 --- a/src/__tests__/graceful-session-close.test.ts +++ b/src/__tests__/e2e/table-service/graceful-session-close.test.ts @@ -1,7 +1,7 @@ import http from 'http'; -import Driver from "../driver"; -import {destroyDriver, initDriver} from "../test-utils"; -import {sleep} from "../utils"; +import Driver from "../../../driver"; +import {destroyDriver, initDriver} from "../../../test-utils"; +import {sleep} from "../../../utils"; const SHUTDOWN_URL = process.env.YDB_SHUTDOWN_URL || 'http://localhost:8765/actors/kqp_proxy?force_shutdown=all'; diff --git a/src/__tests__/parse-connection-string.test.ts b/src/__tests__/e2e/table-service/parse-connection-string.test.ts similarity index 95% rename from src/__tests__/parse-connection-string.test.ts rename to src/__tests__/e2e/table-service/parse-connection-string.test.ts index ca62699a..3277390f 100644 --- a/src/__tests__/parse-connection-string.test.ts +++ b/src/__tests__/e2e/table-service/parse-connection-string.test.ts @@ -1,4 +1,4 @@ -import {parseConnectionString} from '../parse-connection-string'; +import {parseConnectionString} from '../../../parse-connection-string'; describe('Parse connection string', () => { it('test parseConnectionString', () => { diff --git a/src/__tests__/read-table.test.ts b/src/__tests__/e2e/table-service/read-table.test.ts similarity index 91% rename from src/__tests__/read-table.test.ts rename to src/__tests__/e2e/table-service/read-table.test.ts index b83af987..6bb0005c 100644 --- a/src/__tests__/read-table.test.ts +++ b/src/__tests__/e2e/table-service/read-table.test.ts @@ -1,4 +1,4 @@ -import Driver from '../driver'; +import Driver from '../../../driver'; import { createTable, destroyDriver, @@ -6,9 +6,9 @@ import { initDriver, Row, TABLE -} from '../test-utils'; -import {ReadTableSettings, TableSession} from '../table/table-session'; -import {TypedValues, TypedData} from '../types'; +} from '../../../test-utils'; +import {ReadTableSettings, TableSession} from '../../../table'; +import {TypedValues, TypedData} from '../../../types'; async function readTable(session: TableSession, settings: ReadTableSettings): Promise { const rows: TypedData[] = []; diff --git a/src/__tests__/retries.test.ts b/src/__tests__/e2e/table-service/retries.test.ts similarity index 87% rename from src/__tests__/retries.test.ts rename to src/__tests__/e2e/table-service/retries.test.ts index cdb53d2c..ef08830d 100644 --- a/src/__tests__/retries.test.ts +++ b/src/__tests__/e2e/table-service/retries.test.ts @@ -1,5 +1,5 @@ -import {Endpoint} from '../discovery'; -import Driver from '../driver'; +import {Endpoint} from '../../../discovery'; +import Driver from '../../../driver'; import { Aborted, BadRequest, @@ -18,11 +18,11 @@ import { Unavailable, Undetermined, YdbError, -} from '../errors'; -import {FallbackLogger} from '../logging'; -import {RetryParameters, retryable} from '../retries'; -import {destroyDriver, initDriver} from '../test-utils'; -import {pessimizable} from '../utils'; +} from '../../../errors'; +import {FallbackLogger} from '../../../logging'; +import {RetryParameters, retryable} from '../../../retries'; +import {destroyDriver, initDriver} from '../../../test-utils'; +import {pessimizable} from '../../../utils'; const logger = new FallbackLogger({level: 'error'}); class ErrorThrower { diff --git a/src/__tests__/scan-query.test.ts b/src/__tests__/e2e/table-service/scan-query.test.ts similarity index 87% rename from src/__tests__/scan-query.test.ts rename to src/__tests__/e2e/table-service/scan-query.test.ts index befba963..bcf0c616 100644 --- a/src/__tests__/scan-query.test.ts +++ b/src/__tests__/e2e/table-service/scan-query.test.ts @@ -1,4 +1,4 @@ -import Driver from '../driver'; +import Driver from '../../../driver'; import { TABLE, createTable, @@ -6,9 +6,9 @@ import { fillTableWithData, initDriver, Row, -} from '../test-utils'; -import {TableSession} from '../table/table-session'; -import {TypedData} from '../types'; +} from '../../../test-utils'; +import {TableSession} from '../../../table'; +import {TypedData} from '../../../types'; async function executeScanQuery(session: TableSession): Promise { const query = `SELECT * FROM ${TABLE};`; diff --git a/src/__tests__/types.test.ts b/src/__tests__/e2e/table-service/types.test.ts similarity index 99% rename from src/__tests__/types.test.ts rename to src/__tests__/e2e/table-service/types.test.ts index 959e6b4d..bed91879 100644 --- a/src/__tests__/types.test.ts +++ b/src/__tests__/e2e/table-service/types.test.ts @@ -1,8 +1,8 @@ import Long from 'long'; import {google, Ydb} from 'ydb-sdk-proto'; -import Driver from '../driver'; -import {initDriver, destroyDriver} from '../test-utils'; -import {TypedData, TypedValues, Types} from '../types'; +import Driver from '../../../driver'; +import {initDriver, destroyDriver} from '../../../test-utils'; +import {TypedData, TypedValues, Types} from '../../../types'; import NullValue = google.protobuf.NullValue; describe('Types', () => { diff --git a/src/credentials.ts b/src/credentials.ts index 4b4eb602..39a2c83c 100644 --- a/src/credentials.ts +++ b/src/credentials.ts @@ -1,7 +1,7 @@ import * as grpc from '@grpc/grpc-js'; import jwt from 'jsonwebtoken'; import {DateTime} from 'luxon'; -import {getOperationPayload, GrpcService, sleep, withTimeout} from './utils'; +import {GrpcService, sleep, withTimeout} from './utils'; import {yandex, Ydb} from 'ydb-sdk-proto'; import {ISslCredentials, makeDefaultSslCredentials} from './ssl-credentials'; import IamTokenService = yandex.cloud.iam.v1.IamTokenService; @@ -9,6 +9,7 @@ import AuthServiceResult = Ydb.Auth.LoginResult; import ICreateIamTokenResponse = yandex.cloud.iam.v1.ICreateIamTokenResponse; import type {MetadataTokenService} from '@yandex-cloud/nodejs-sdk/dist/token-service/metadata-token-service'; import {retryable} from './retries'; +import {getOperationPayload} from "./table/table-utils"; function makeCredentialsMetadata(token: string): grpc.Metadata { const metadata = new grpc.Metadata(); @@ -46,7 +47,7 @@ export class AnonymousAuthService implements IAuthService { interface StaticCredentialsAuthOptions { /** Custom ssl sertificates. If you use it in driver, you must use it here too */ sslCredentials?: ISslCredentials; - /** + /** * Timeout for token request in milliseconds * @default 10 * 1000 */ diff --git a/src/discovery.ts b/src/discovery.ts index f362cb31..e642178b 100644 --- a/src/discovery.ts +++ b/src/discovery.ts @@ -2,7 +2,7 @@ import _ from 'lodash'; import EventEmitter from 'events'; import {DateTime} from 'luxon'; import {Ydb} from "ydb-sdk-proto"; -import {AuthenticatedService, getOperationPayload, withTimeout} from "./utils"; +import {AuthenticatedService, withTimeout} from "./utils"; import {IAuthService} from "./credentials"; import {retryable} from "./retries"; // noinspection ES6PreferShortImport @@ -11,6 +11,7 @@ import DiscoveryServiceAPI = Ydb.Discovery.V1.DiscoveryService; import IEndpointInfo = Ydb.Discovery.IEndpointInfo; import {Events} from "./constants"; import {ISslCredentials} from './ssl-credentials'; +import {getOperationPayload} from "./table/table-utils"; type SuccessDiscoveryHandler = (result: Endpoint[]) => void; diff --git a/src/driver.ts b/src/driver.ts index bdfcaf8a..065818dc 100644 --- a/src/driver.ts +++ b/src/driver.ts @@ -9,6 +9,7 @@ import {ClientOptions} from './utils'; import {parseConnectionString} from './parse-connection-string'; import {makeSslCredentials, ISslCredentials} from './ssl-credentials'; import {TableClient} from "./table"; +import {QueryClient} from "./query/query-sessions-pool"; export interface IPoolSettings { minLimit?: number; @@ -38,6 +39,7 @@ export default class Driver { private discoveryService: DiscoveryService; public tableClient: TableClient; + public queryClient: QueryClient; public schemeClient: SchemeService; constructor(settings: IDriverSettings) { @@ -79,6 +81,15 @@ export default class Driver { discoveryService: this.discoveryService, logger: this.logger, }); + this.queryClient = new QueryClient({ + database: this.database, + authService: this.authService, + sslCredentials: this.sslCredentials, + poolSettings: this.poolSettings, + clientOptions: this.clientOptions, + discoveryService: this.discoveryService, + logger: this.logger, + }); this.schemeClient = new SchemeClient({ database: this.database, authService: this.authService, diff --git a/src/errors.ts b/src/errors.ts index 44248404..2bd10c9d 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -1,7 +1,6 @@ import {StatusObject as GrpcStatusObject} from '@grpc/grpc-js'; import {Ydb} from 'ydb-sdk-proto'; import ApiStatusCode = Ydb.StatusIds.StatusCode; -import IOperation = Ydb.Operations.IOperation; import {Status as GrpcStatus} from '@grpc/grpc-js/build/src/constants'; const TRANSPORT_STATUSES_FIRST = 401000; @@ -45,18 +44,21 @@ export class YdbError extends Error { return issues ? JSON.stringify(issues, null, 2) : ''; } - static checkStatus(operation: IOperation) { - if (!operation.status) { + static checkStatus(result: { + status?: (Ydb.StatusIds.StatusCode|null); + issues?: (Ydb.Issue.IIssueMessage[]|null); + }) { + if (!result.status) { throw new MissingStatus('Missing status!'); } - const status = operation.status as unknown as StatusCode; - if (operation.status && !SUCCESS_CODES.has(status)) { + const status = result.status as unknown as StatusCode; + if (result.status && !SUCCESS_CODES.has(status)) { const ErrCls = SERVER_SIDE_ERROR_CODES.get(status); if (!ErrCls) { throw new Error(`Unexpected status code ${status}!`); } else { - throw new ErrCls(`${ErrCls.name} (code ${status}): ${YdbError.formatIssues(operation.issues)}`, operation.issues); + throw new ErrCls(`${ErrCls.name} (code ${status}): ${YdbError.formatIssues(result.issues)}`, result.issues); } } } diff --git a/src/query/query-session.ts b/src/query/query-session.ts new file mode 100644 index 00000000..5ffdaa3d --- /dev/null +++ b/src/query/query-session.ts @@ -0,0 +1,116 @@ +// import {promisify} from 'util' +import EventEmitter from 'events'; +// import * as grpc from '@grpc/grpc-js'; +import {Ydb} from 'ydb-sdk-proto'; +import {pessimizable} from '../utils'; +import {Endpoint} from '../discovery'; +import {Logger} from '../logging'; +import {retryable} from '../retries'; +import QueryService = Ydb.Query.V1.QueryService; +import ICreateSessionResponse = Ydb.Query.ICreateSessionResponse; +import ITransactionSettings = Ydb.Query.ITransactionSettings; +import {SessionEvent} from "../table"; +import {ensureOperationSucceeded} from "./query-utils"; + +export class QuerySession extends EventEmitter implements ICreateSessionResponse { + // TODO: Allocate common functionality with querySession to a sessionBase class. It's likely that commo sessionsPool code will work both Query and Query + private beingDeleted = false; + private free = true; + private closing = false; + + constructor( + private api: QueryService, + public endpoint: Endpoint, + public sessionId: string, + private logger: Logger, + // private getResponseMetadata: (request: object) => grpc.Metadata | undefined + ) { + super(); + } + + async attach() { + // promisify(this.api.attachSession)() + const state = new Promise((resolve, reject) => { + this.api.attachSession({sessionId: this.sessionId}, (error, response) => { + console.info(1000, 'sessionId', this.sessionId, error, response); + if (error) reject(error); + resolve(response); + }); + }); + if (state) { + console.info(1100, 'state', state); + } + } + + acquire() { + this.free = false; + this.logger.debug(`Acquired session ${this.sessionId} on endpoint ${this.endpoint.toString()}.`); + return this; + } + + release() { + this.free = true; + this.logger.debug(`Released session ${this.sessionId} on endpoint ${this.endpoint.toString()}.`); + this.emit(SessionEvent.SESSION_RELEASE, this); + } + + public isFree() { + return this.free && !this.isDeleted(); + } + public isClosing() { + return this.closing; + } + public isDeleted() { + return this.beingDeleted; + } + + @retryable() + @pessimizable + public async delete(): Promise { + if (this.isDeleted()) { + return Promise.resolve(); + } + this.beingDeleted = true; + ensureOperationSucceeded(await this.api.deleteSession({sessionId: this.sessionId})); // TODO: Shouldn't session delete has retry? + } + + @retryable() + @pessimizable + public async beginTransaction( + txSettings: ITransactionSettings, + )/*: Promise*/ { + // TODO: Add transactions pairs lock. How to handle errors? + const request: Ydb.Query.IBeginTransactionRequest = { + sessionId: this.sessionId, + txSettings, + }; + const response = ensureOperationSucceeded(await this.api.beginTransaction(request)); + const {txMeta} = response; + if (txMeta) { + return txMeta; + } + throw new Error('Could not begin new transaction, txMeta is empty!'); + } + + // @retryable() + // @pessimizable + // public async commitTransaction(txControl: IExistingTransaction): Promise { + // const request: Ydb.Query.ICommitTransactionRequest = { + // sessionId: this.sessionId, + // txId: txControl.txId, + // }; + // const response = await this.api.commitTransaction(request); + // ensureOperationSucceeded(this.processResponseMetadata(request, response)); + // } + + // @retryable() + // @pessimizable + // public async rollbackTransaction(txControl: IExistingTransaction): Promise { + // const request: Ydb.Query.IRollbackTransactionRequest = { + // sessionId: this.sessionId, + // txId: txControl.txId, + // }; + // const response = await this.api.rollbackTransaction(request); + // ensureOperationSucceeded(this.processResponseMetadata(request, response)); + // } +} diff --git a/src/query/query-sessions-pool.ts b/src/query/query-sessions-pool.ts new file mode 100644 index 00000000..492d3ea6 --- /dev/null +++ b/src/query/query-sessions-pool.ts @@ -0,0 +1,277 @@ +import {Ydb} from 'ydb-sdk-proto'; +import {IAuthService} from "../credentials"; +import {ISslCredentials} from "../ssl-credentials"; +import {IPoolSettings} from "../driver"; +import {AuthenticatedService, ClientOptions, pessimizable} from "../utils"; +import DiscoveryService, {Endpoint} from "../discovery"; +import {Logger} from "../logging"; +import EventEmitter from "events"; +import {Events} from "../constants"; +import _ from "lodash"; +import {BadSession, SessionBusy, SessionPoolEmpty} from "../errors"; +import {retryable} from "../retries"; +import {QuerySession} from "./query-session"; +import {SessionEvent} from "../table/session-event"; +import QueryService = Ydb.Query.V1.QueryService; +import CreateSessionRequest = Ydb.Query.CreateSessionRequest; +import {ensureOperationSucceeded} from "./query-utils"; + +export class QuerySessionCreator extends AuthenticatedService { + public endpoint: Endpoint; + private readonly logger: Logger; + + constructor(endpoint: Endpoint, database: string, authService: IAuthService, logger: Logger, sslCredentials?: ISslCredentials, clientOptions?: ClientOptions) { + const host = endpoint.toString(); + super(host, database, 'Ydb.Query.V1.QueryService', QueryService, authService, sslCredentials, clientOptions); + this.endpoint = endpoint; + this.logger = logger; + } + + @retryable() + @pessimizable + async create(): Promise { + const response = ensureOperationSucceeded(await this.api.createSession(CreateSessionRequest.create())); + const {sessionId} = response; + const session = new QuerySession(this.api, this.endpoint, sessionId, this.logger/*, this.getResponseMetadata.bind(this)*/); + await session.attach(); + return session; + } +} + +type SessionCallback = (session: QuerySession) => Promise; + +interface IQueryClientSettings { + database: string; + authService: IAuthService; + sslCredentials?: ISslCredentials; + poolSettings?: IPoolSettings; + clientOptions?: ClientOptions; + discoveryService: DiscoveryService; + logger: Logger; +} + +export class QuerySessionsPool extends EventEmitter { + private readonly database: string; + private readonly authService: IAuthService; + private readonly sslCredentials?: ISslCredentials; + private readonly clientOptions?: ClientOptions; + private readonly minLimit: number; + private readonly maxLimit: number; + private readonly sessions: Set; + private readonly sessionCreators: Map; + private readonly discoveryService: DiscoveryService; + private newSessionsRequested: number; + private sessionsBeingDeleted: number; + private readonly logger: Logger; + private readonly waiters: ((session: QuerySession) => void)[] = []; + + private static SESSION_MIN_LIMIT = 5; // TODO: Consider less sessions limit in case of serverless function + private static SESSION_MAX_LIMIT = 20; + + constructor(settings: IQueryClientSettings) { + super(); + this.database = settings.database; + this.authService = settings.authService; + this.sslCredentials = settings.sslCredentials; + this.clientOptions = settings.clientOptions; + this.logger = settings.logger; + const poolSettings = settings.poolSettings; + this.minLimit = poolSettings?.minLimit || QuerySessionsPool.SESSION_MIN_LIMIT; + this.maxLimit = poolSettings?.maxLimit || QuerySessionsPool.SESSION_MAX_LIMIT; + this.sessions = new Set(); + this.newSessionsRequested = 0; + this.sessionsBeingDeleted = 0; + this.sessionCreators = new Map(); + this.discoveryService = settings.discoveryService; + this.discoveryService.on(Events.ENDPOINT_REMOVED, (endpoint: Endpoint) => { + this.sessionCreators.delete(endpoint); + }); + this.prepopulateSessions(); + } + + public async destroy(): Promise { + this.logger.debug('Destroying pool...'); + await Promise.all(_.map([...this.sessions], (session: QuerySession) => this.deleteSession(session))); + this.logger.debug('Pool has been destroyed.'); + } + + private prepopulateSessions() { + _.forEach(_.range(this.minLimit), () => this.createSession()); + } + + private async getSessionCreator(): Promise { + const endpoint = await this.discoveryService.getEndpoint(); + if (!this.sessionCreators.has(endpoint)) { + const sessionService = new QuerySessionCreator(endpoint, this.database, this.authService, this.logger, this.sslCredentials, this.clientOptions); + this.sessionCreators.set(endpoint, sessionService); + } + return this.sessionCreators.get(endpoint) as QuerySessionCreator; + } + + private maybeUseSession(session: QuerySession) { + if (this.waiters.length > 0) { + const waiter = this.waiters.shift(); + if (typeof waiter === "function") { + waiter(session); + return true; + } + } + return false; + } + + private async createSession(): Promise { + const sessionCreator = await this.getSessionCreator(); + const session = await sessionCreator.create(); + session.on(SessionEvent.SESSION_RELEASE, async () => { + if (session.isClosing()) { + await this.deleteSession(session); + } else { + this.maybeUseSession(session); + } + }) + session.on(SessionEvent.SESSION_BROKEN, async () => { + await this.deleteSession(session); + }); + // TODO: Make long running connection ! + this.sessions.add(session); + return session; + } + + private deleteSession(session: QuerySession): Promise { + if (session.isDeleted()) { + return Promise.resolve(); + } + + this.sessionsBeingDeleted++; + // acquire new session as soon one of existing ones is deleted + if (this.waiters.length > 0) { + this.acquire().then((session) => { + if (!this.maybeUseSession(session)) { + session.release(); + } + }); + } + return session.delete() + // delete session in any case + .finally(() => { + this.sessions.delete(session); + this.sessionsBeingDeleted--; + }); + } + + private acquire(timeout: number = 0): Promise { + for (const session of this.sessions) { + if (session.isFree()) { + return Promise.resolve(session.acquire()); + } + } + + if (this.sessions.size + this.newSessionsRequested - this.sessionsBeingDeleted <= this.maxLimit) { + this.newSessionsRequested++; + return this.createSession() + .then((session) => { + return session.acquire(); + }) + .finally(() => { + this.newSessionsRequested--; + }); + } else { + return new Promise((resolve, reject) => { + let timeoutId: NodeJS.Timeout; + + function waiter(session: QuerySession) { + clearTimeout(timeoutId); + resolve(session.acquire()); + } + + if (timeout) { + timeoutId = setTimeout(() => { + this.waiters.splice(this.waiters.indexOf(waiter), 1); + reject( + new SessionPoolEmpty(`No session became available within timeout of ${timeout} ms`) + ); + }, timeout); + } + this.waiters.push(waiter); + }); + } + } + + private async _withSession(session: QuerySession, callback: SessionCallback, maxRetries = 0): Promise { + try { + const result = await callback(session); + session.release(); + return result; + } catch (error) { + // TODO: Change the repetition strategy to one with different delays + // TODO: Remove repetitions on methods (@Retry) within session + // TODO: Add idempotency sign and do method with named parameters + // TODO: Мark _withSession as deprecated. Consider all operationj NOT idempotent + if (error instanceof BadSession || error instanceof SessionBusy) { + this.logger.debug('Encountered bad or busy session, re-creating the session'); + session.emit(SessionEvent.SESSION_BROKEN); + session = await this.createSession(); + if (maxRetries > 0) { + this.logger.debug(`Re-running operation in new session, ${maxRetries} left.`); + session.acquire(); + return this._withSession(session, callback, maxRetries - 1); + } + } else { + session.release(); + } + throw error; + } + } + + public async do(options: { + cb: SessionCallback, + timeout: number | undefined, + // TODO: Make all parameters + // txControl + // parameters + // transaction + }): T { + const session = await this.acquire(options.timeout); + // TODO: Start transaction + return this._withSession(session, cb); + } + + /* + public async withSession(callback: SessionCallback, timeout: number = 0): Promise { + const session = await this.acquire(timeout); + return this._withSession(session, callback); + } + + public async withSessionRetry(callback: SessionCallback, timeout: number = 0, maxRetries = 10): Promise { + const session = await this.acquire(timeout); + return this._withSession(session, callback, maxRetries); + } + */ +} + +export class QueryClient extends EventEmitter { + private pool: QuerySessionsPool; + + constructor(settings: IQueryClientSettings) { + super(); + this.pool = new QuerySessionsPool(settings); + } + + private async do(callback: (session: QuerySession) => Promise, timeout: number = 0): Promise { + return this.pool.withSession(callback, timeout); + } + +/* + private async withSession(callback: (session: QuerySession) => Promise, timeout: number = 0): Promise { + return this.pool.withSession(callback, timeout); + } + + private async withSessionRetry(callback: (session: QuerySession) => Promise, timeout: number = 0, maxRetries = 10): Promise { + return this.pool.withSessionRetry(callback, timeout, maxRetries); + } +*/ + + public async destroy() { + await this.pool.destroy(); + } +} diff --git a/src/query/query-utils.ts b/src/query/query-utils.ts new file mode 100644 index 00000000..7ee9f2bb --- /dev/null +++ b/src/query/query-utils.ts @@ -0,0 +1,19 @@ +import {MissingValue, StatusCode, YdbError} from "../errors"; +import {Ydb} from "ydb-sdk-proto"; + +export interface QueryAsyncResponse { + status: Ydb.StatusIds.StatusCode; + issues: Ydb.Issue.IIssueMessage[]; +} + +export function ensureOperationSucceeded(response: T, suppressedErrors: StatusCode[] = []): T { + try { + YdbError.checkStatus(response); + } catch (error) { + const e = error as any; + if (!(suppressedErrors.indexOf(e.constructor.status) > -1 || e instanceof MissingValue)) { + throw e; + } + } + return response; +} diff --git a/src/scheme.ts b/src/scheme.ts index f1775a16..a50aa8dc 100644 --- a/src/scheme.ts +++ b/src/scheme.ts @@ -1,8 +1,6 @@ import {Ydb} from "ydb-sdk-proto"; import { AuthenticatedService, - getOperationPayload, - ensureOperationSucceeded, pessimizable, ClientOptions } from "./utils"; @@ -22,6 +20,7 @@ import IMakeDirectoryRequest = Ydb.Scheme.IMakeDirectoryRequest; import IPermissions = Ydb.Scheme.IPermissions; import {util} from "protobufjs"; import EventEmitter = util.EventEmitter; +import {ensureOperationSucceeded, getOperationPayload} from "./table/table-utils"; function preparePermissions(action?: IPermissions | null) { diff --git a/src/table/table-session.ts b/src/table/table-session.ts index 6f2127e2..11aaae66 100644 --- a/src/table/table-session.ts +++ b/src/table/table-session.ts @@ -1,7 +1,7 @@ import EventEmitter from 'events'; import * as grpc from '@grpc/grpc-js'; import {google, Ydb} from 'ydb-sdk-proto'; -import {AsyncResponse, ensureOperationSucceeded, getOperationPayload, pessimizable, StreamEnd,} from '../utils'; +import {pessimizable, StreamEnd} from '../utils'; import {Endpoint} from '../discovery'; import {ResponseMetadataKeys} from '../constants'; import {Logger} from '../logging'; @@ -28,6 +28,7 @@ import TypedValue = Ydb.TypedValue; import BulkUpsertResult = Ydb.Table.BulkUpsertResult; import OperationMode = Ydb.Operations.OperationParams.OperationMode; import {SessionEvent} from "./session-event"; +import {ensureOperationSucceeded, getOperationPayload, TableAsyncResponse} from "./table-utils"; interface PartialResponse { status?: (Ydb.StatusIds.StatusCode|null); @@ -257,6 +258,7 @@ export class ExecuteScanQuerySettings { } export class TableSession extends EventEmitter implements ICreateSessionResult { + // TODO: Allocate common functionality with querySession to a sessionBase class. It's likely that commo sessionsPool code will work both Table and Query private beingDeleted = false; private free = true; private closing = false; @@ -299,7 +301,7 @@ export class TableSession extends EventEmitter implements ICreateSessionResult { return Promise.resolve(); } this.beingDeleted = true; - ensureOperationSucceeded(await this.api.deleteSession({sessionId: this.sessionId})); // TODO: Should not session delete has retry? + ensureOperationSucceeded(await this.api.deleteSession({sessionId: this.sessionId})); // TODO: Shouldn't session delete has retry? } @retryable() @@ -520,7 +522,7 @@ export class TableSession extends EventEmitter implements ICreateSessionResult { private processResponseMetadata( request: object, - response: AsyncResponse, + response: TableAsyncResponse, onResponseMetadata?: (metadata: grpc.Metadata) => void ) { const metadata = this.getResponseMetadata(request); diff --git a/src/table/table-sessions-pool.ts b/src/table/table-sessions-pool.ts index 13bfd4c6..09190146 100644 --- a/src/table/table-sessions-pool.ts +++ b/src/table/table-sessions-pool.ts @@ -2,7 +2,7 @@ import {Ydb} from 'ydb-sdk-proto'; import {IAuthService} from "../credentials"; import {ISslCredentials} from "../ssl-credentials"; import {IPoolSettings} from "../driver"; -import {AuthenticatedService, ClientOptions, getOperationPayload, pessimizable} from "../utils"; +import {AuthenticatedService, ClientOptions, pessimizable} from "../utils"; import DiscoveryService, {Endpoint} from "../discovery"; import {Logger} from "../logging"; import EventEmitter from "events"; @@ -15,6 +15,7 @@ import {SessionEvent} from "./session-event"; import TableService = Ydb.Table.V1.TableService; import CreateSessionRequest = Ydb.Table.CreateSessionRequest; import CreateSessionResult = Ydb.Table.CreateSessionResult; +import {getOperationPayload} from "./table-utils"; export class TableSessionBuilder extends AuthenticatedService { public endpoint: Endpoint; diff --git a/src/table/table-utils.ts b/src/table/table-utils.ts new file mode 100644 index 00000000..e879856e --- /dev/null +++ b/src/table/table-utils.ts @@ -0,0 +1,36 @@ +import {MissingOperation, MissingValue, StatusCode, YdbError} from "../errors"; +import {Ydb} from "ydb-sdk-proto"; + +export interface TableAsyncResponse { + operation?: Ydb.Operations.IOperation | null +} + +export function getOperationPayload(response: TableAsyncResponse): Uint8Array { + const {operation} = response; + + if (operation) { + YdbError.checkStatus(operation); + const value = operation?.result?.value; + if (!value) { + throw new MissingValue('Missing operation result value!'); + } + return value; + } else { + throw new MissingOperation('No operation in response!'); + } +} + +export function ensureOperationSucceeded(response: TableAsyncResponse, suppressedErrors: StatusCode[] = []): void { + try { + getOperationPayload(response); + } catch (error) { + const e = error as any; + if (suppressedErrors.indexOf(e.constructor.status) > -1) { + return; + } + + if (!(e instanceof MissingValue)) { + throw e; + } + } +} diff --git a/src/utils.ts b/src/utils.ts index 8fc4fe86..69839d60 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,9 +1,8 @@ import * as grpc from '@grpc/grpc-js'; import * as $protobuf from 'protobufjs'; import _ from 'lodash'; -import {Ydb} from 'ydb-sdk-proto'; import Long from 'long'; -import {MissingOperation, MissingValue, NotFound, StatusCode, TimeoutExpired, YdbError} from "./errors"; +import {NotFound, TimeoutExpired} from "./errors"; import {Endpoint} from './discovery'; import {IAuthService} from './credentials'; @@ -154,40 +153,6 @@ export abstract class AuthenticatedService { } } -export interface AsyncResponse { - operation?: Ydb.Operations.IOperation | null -} - -export function getOperationPayload(response: AsyncResponse): Uint8Array { - const {operation} = response; - - if (operation) { - YdbError.checkStatus(operation); - const value = operation?.result?.value; - if (!value) { - throw new MissingValue('Missing operation result value!'); - } - return value; - } else { - throw new MissingOperation('No operation in response!'); - } -} - -export function ensureOperationSucceeded(response: AsyncResponse, suppressedErrors: StatusCode[] = []): void { - try { - getOperationPayload(response); - } catch (error) { - const e = error as any; - if (suppressedErrors.indexOf(e.constructor.status) > -1) { - return; - } - - if (!(e instanceof MissingValue)) { - throw e; - } - } -} - export function pessimizable(_target: Pessimizable, _propertyKey: string, descriptor: PropertyDescriptor) { const originalMethod = descriptor.value; descriptor.value = async function (this: Pessimizable, ...args: any) {