From f7f486aa59d8b0700b8c538dfd25c08f46b48598 Mon Sep 17 00:00:00 2001 From: Alexey Zorkaltsev Date: Sat, 27 Jan 2024 06:54:07 +0300 Subject: [PATCH] Revert "wip" This reverts commit 19c26f9a4d9a43fac8b21e2d19b89dcd61279488. Revert "protos update. and release changed from tag 'latest' to 'beta'" This reverts commit 45a065cf75a979674a62b31b4d527fe881d9ca53. Revert "wip" This reverts commit 21a4bea83a85dcdf607d9e82f2b322a3415f2a9a. --- .github/workflows/release.yml | 1 - CHANGELOG.md | 6 +- package-lock.json | 14 +- package.json | 2 +- query/index.ts | 1 - query/query-client.ts | 68 --- query/query-session.ts | 21 - src/__tests__/alter-table.test.ts | 4 +- src/__tests__/bulk-upsert.test.ts | 4 +- src/__tests__/bytestring-identity.test.ts | 6 +- src/__tests__/create-table.test.ts | 2 +- src/__tests__/read-table.test.ts | 4 +- src/__tests__/scan-query.test.ts | 4 +- src/driver.ts | 18 +- src/index.ts | 17 +- src/scheme.ts | 4 +- src/{table/table-session.ts => table.ts} | 549 ++++++++++++++++++---- src/table/index.ts | 4 - src/table/session-pool.ts | 230 --------- src/table/session.ts | 126 ----- src/table/table-client.ts | 73 --- src/test-utils.ts | 6 +- 22 files changed, 496 insertions(+), 668 deletions(-) delete mode 100644 query/index.ts delete mode 100644 query/query-client.ts delete mode 100644 query/query-session.ts rename src/{table/table-session.ts => table.ts} (66%) delete mode 100644 src/table/index.ts delete mode 100644 src/table/session-pool.ts delete mode 100644 src/table/session.ts delete mode 100644 src/table/table-client.ts diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 43c0a1a6..dd9209ce 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -13,4 +13,3 @@ jobs: with: github-token: ${{ secrets.YDB_PLATFORM_BOT_TOKEN_REPO }} npm-token: ${{ secrets.NODE_AUTH_TOKEN }} - npm-dist-tag: rc diff --git a/CHANGELOG.md b/CHANGELOG.md index 333edd4f..f612ceac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -286,7 +286,7 @@ All notable changes to this project will be documented in this file. See [standa - decimal value is present as string instead of bigint (it wasn't working for float values before) - fix uuid and tz-date types conversion (it wasn't working before) -* signatures of most methods in TableSession are changed: +* signatures of most methods in Session are changed: - executeQuery Before: `(query, params, txControl, operationParams?, settings?, collectStats?)` After: `(query, params, txControl, settings?)` @@ -336,7 +336,7 @@ All notable changes to this project will be documented in this file. See [standa * drop support of old environment variables ([963819a](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/963819af9209a45749f5118077f1da4bdb390fa6)) * reorganize signature of SchemeClient's methods ([734d57a](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/734d57a2dd7c655cf727b96df415212504339cf8)) -* reorganize signatures of TableSession's methods ([431f149](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/431f1491bf880f3ba9541d9455d8dd2f2b7849e6)) +* reorganize signatures of Session's methods ([431f149](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/431f1491bf880f3ba9541d9455d8dd2f2b7849e6)) * use identity names conversion in TypedData ([275598a](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/275598aa444e1e977386a3dadd02bbc9ba01f38e)) ### [2.9.2](https://www.github.com/ydb-platform/ydb-nodejs-sdk/compare/v2.9.1...v2.9.2) (2022-02-09) @@ -431,7 +431,7 @@ All notable changes to this project will be documented in this file. See [standa * and many other changes in protobufs. ### 1.10.0 -* Add `alterTable` method to TableSession class +* Add `alterTable` method to Session class * Put compiled protobufs to a separate 'ydb-sdk' namespace ### 1.9.0 diff --git a/package-lock.json b/package-lock.json index d87ece2a..fc52bf36 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,7 @@ "luxon": "^3.2.1", "reflect-metadata": "^0.1.13", "uuid": "^8.3.2", - "ydb-sdk-proto": "^1.2.1" + "ydb-sdk-proto": "^1.1.0" }, "devDependencies": { "@commitlint/cli": "^17.6.1", @@ -9639,9 +9639,9 @@ } }, "node_modules/ydb-sdk-proto": { - "version": "1.2.1", - "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.2.1.tgz", - "integrity": "sha512-6/p+LggVbglreOTvqW1PZPvrO0LEVyj+fsSmvH2BCzTm1jAWbqXMocQFBfCRzbOhk07L03/KRS82mS7gIfQ+sA==" + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.1.0.tgz", + "integrity": "sha512-O/cmNQJjgv11QM51xj3szS1rU/0qRYqAZF8rlsCDzsK+JQt76+LW8d+N8mzIikjXap53lK4I6s1NudeuT3OviQ==" }, "node_modules/yn": { "version": "3.1.1", @@ -16980,9 +16980,9 @@ "dev": true }, "ydb-sdk-proto": { - "version": "1.2.1", - "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.2.1.tgz", - "integrity": "sha512-6/p+LggVbglreOTvqW1PZPvrO0LEVyj+fsSmvH2BCzTm1jAWbqXMocQFBfCRzbOhk07L03/KRS82mS7gIfQ+sA==" + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.1.0.tgz", + "integrity": "sha512-O/cmNQJjgv11QM51xj3szS1rU/0qRYqAZF8rlsCDzsK+JQt76+LW8d+N8mzIikjXap53lK4I6s1NudeuT3OviQ==" }, "yn": { "version": "3.1.1", diff --git a/package.json b/package.json index 5028b06d..613c964c 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,7 @@ "luxon": "^3.2.1", "reflect-metadata": "^0.1.13", "uuid": "^8.3.2", - "ydb-sdk-proto": "^1.2.1" + "ydb-sdk-proto": "^1.1.0" }, "devDependencies": { "@commitlint/cli": "^17.6.1", diff --git a/query/index.ts b/query/index.ts deleted file mode 100644 index a9081699..00000000 --- a/query/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from './query-client'; diff --git a/query/query-client.ts b/query/query-client.ts deleted file mode 100644 index 22968b47..00000000 --- a/query/query-client.ts +++ /dev/null @@ -1,68 +0,0 @@ -import {Ydb} from "ydb-sdk-proto"; -import QueryService = Ydb.Query.V1.QueryService; -import CreateSessionRequest = Ydb.Query.CreateSessionRequest; -import CreateSessionResult = Ydb.Query.CreateSessionResult; -import EventEmitter from "events"; -import {IClientSettings, SessionPool} from "./session-pool"; -import {Endpoint} from "../discovery"; -import {IAuthService} from "../credentials"; -import {Logger} from "../logging"; -import {ISslCredentials} from "../ssl-credentials"; -import {AuthenticatedService, ClientOptions, getOperationPayload, pessimizable} from "../utils"; -import {SessionCreator} from "../table"; -import {retryable} from "../retries"; -import {QuerySession} from "./query-session"; - -export class QuerySessionCreator extends AuthenticatedService { - public endpoint: Endpoint; - private readonly logger: Logger; - - constructor(endpoint: Endpoint, database: string, authService: IAuthService, logger: Logger, sslCredentials?: ISslCredentials, clientOptions?: ClientOptions) { - const host = endpoint.toString(); - super(host, database, 'Ydb.Query.V1.QueryService', QueryService, authService, sslCredentials, clientOptions); - this.endpoint = endpoint; - this.logger = logger; - } - - @retryable() - @pessimizable - async create(): Promise { - const response = await this.api.createSession(CreateSessionRequest.create()); - const payload = getOperationPayload(response); - const {sessionId} = CreateSessionResult.decode(payload); - return new QuerySession(this.api, this.endpoint, sessionId, this.logger, this.getResponseMetadata.bind(this)); - } -} - -class QuerySessionPool extends SessionPool { - protected getSessionServiceCreator( - endpoint: Endpoint, - database: string, - authService: IAuthService, - logger: Logger, - sslCredentials: ISslCredentials | undefined, - clientOptions: ClientOptions | undefined): SessionCreator { - return new QuerySessionCreator(endpoint, database, authService, logger, sslCredentials, clientOptions); - } -} - -export class QueryClient extends EventEmitter { - private pool: QuerySessionPool; - - constructor(settings: IClientSettings) { - super(); - this.pool = new QuerySessionPool(settings); - } - - public async withSession(callback: (session: QuerySession) => Promise, timeout: number = 0): Promise { - return this.pool.withSession(callback, timeout); - } - - public async withSessionRetry(callback: (session: QuerySession) => Promise, timeout: number = 0, maxRetries = 10): Promise { - return this.pool.withSessionRetry(callback, timeout, maxRetries); - } - - public async destroy() { - await this.pool.destroy(); - } -} diff --git a/query/query-session.ts b/query/query-session.ts deleted file mode 100644 index eed142cd..00000000 --- a/query/query-session.ts +++ /dev/null @@ -1,21 +0,0 @@ -import {Session} from "../table"; -import {Endpoint} from "../discovery"; -import {Logger} from "../logging"; -import * as grpc from "@grpc/grpc-js"; -import {Ydb} from "ydb-sdk-proto"; -import QueryService = Ydb.Query.V1.QueryService; - -export class QuerySession extends Session { - - constructor( - api: QueryService, - endpoint: Endpoint, - sessionId: string, - logger: Logger, - getResponseMetadata: (request: object) => grpc.Metadata | undefined - ) { - super(api, endpoint, sessionId, logger, getResponseMetadata); - } - - // TODO: Add methods -} diff --git a/src/__tests__/alter-table.test.ts b/src/__tests__/alter-table.test.ts index 9dc40332..87c541b4 100644 --- a/src/__tests__/alter-table.test.ts +++ b/src/__tests__/alter-table.test.ts @@ -4,11 +4,11 @@ import { AlterTableDescription, AlterTableSettings, Column, + OperationParams, TableDescription, TableIndex, -} from '../table/table-session'; +} from '../table'; import { Types } from '../types'; -import {OperationParams} from "../table/session"; const getTableName = () => `table_alter_${Math.trunc(1000 * Math.random())}`; diff --git a/src/__tests__/bulk-upsert.test.ts b/src/__tests__/bulk-upsert.test.ts index b10dd4b1..245a4056 100644 --- a/src/__tests__/bulk-upsert.test.ts +++ b/src/__tests__/bulk-upsert.test.ts @@ -7,10 +7,10 @@ import { Row, TABLE } from '../test-utils'; -import {TableSession} from '../table/table-session'; +import {Session} from '../table'; import {Ydb} from 'ydb-sdk-proto'; -async function readTable(session: TableSession): Promise { +async function readTable(session: Session): Promise { const rows: Row[] = []; await session.streamReadTable(TABLE, (result) => { diff --git a/src/__tests__/bytestring-identity.test.ts b/src/__tests__/bytestring-identity.test.ts index ec2819c5..c2c4f51b 100644 --- a/src/__tests__/bytestring-identity.test.ts +++ b/src/__tests__/bytestring-identity.test.ts @@ -1,10 +1,10 @@ import Driver from '../driver'; import {destroyDriver, initDriver, TABLE} from '../test-utils'; -import {Column, TableSession, TableDescription} from '../table/table-session'; +import {Column, Session, TableDescription} from '../table'; import {declareType, TypedData, Types} from '../types'; import {withRetries} from '../retries'; -async function createTable(session: TableSession) { +async function createTable(session: Session) { await session.dropTable(TABLE); await session.createTable( TABLE, @@ -46,7 +46,7 @@ class Row extends TypedData { } } -export async function fillTableWithData(session: TableSession, rows: Row[]) { +export async function fillTableWithData(session: Session, rows: Row[]) { const query = ` DECLARE $data AS List>; diff --git a/src/__tests__/create-table.test.ts b/src/__tests__/create-table.test.ts index 939352e3..4933a011 100644 --- a/src/__tests__/create-table.test.ts +++ b/src/__tests__/create-table.test.ts @@ -1,6 +1,6 @@ import Driver from '../driver'; import {destroyDriver, initDriver} from '../test-utils'; -import {Column, DescribeTableSettings, TableDescription} from '../table/table-session'; +import {Column, DescribeTableSettings, TableDescription} from '../table'; import {TypedValues, Types} from '../types'; import Long from 'long'; import {Ydb} from 'ydb-sdk-proto'; diff --git a/src/__tests__/read-table.test.ts b/src/__tests__/read-table.test.ts index b83af987..70047334 100644 --- a/src/__tests__/read-table.test.ts +++ b/src/__tests__/read-table.test.ts @@ -7,10 +7,10 @@ import { Row, TABLE } from '../test-utils'; -import {ReadTableSettings, TableSession} from '../table/table-session'; +import {ReadTableSettings, Session} from '../table'; import {TypedValues, TypedData} from '../types'; -async function readTable(session: TableSession, settings: ReadTableSettings): Promise { +async function readTable(session: Session, settings: ReadTableSettings): Promise { const rows: TypedData[] = []; await session.streamReadTable(TABLE, (result) => { diff --git a/src/__tests__/scan-query.test.ts b/src/__tests__/scan-query.test.ts index befba963..04dbc428 100644 --- a/src/__tests__/scan-query.test.ts +++ b/src/__tests__/scan-query.test.ts @@ -7,10 +7,10 @@ import { initDriver, Row, } from '../test-utils'; -import {TableSession} from '../table/table-session'; +import {Session} from '../table'; import {TypedData} from '../types'; -async function executeScanQuery(session: TableSession): Promise { +async function executeScanQuery(session: Session): Promise { const query = `SELECT * FROM ${TABLE};`; const rows: TypedData[] = []; diff --git a/src/driver.ts b/src/driver.ts index ce82390a..35d34b3b 100644 --- a/src/driver.ts +++ b/src/driver.ts @@ -1,14 +1,14 @@ import DiscoveryService from './discovery'; -import {SchemeClient} from './scheme'; -// import {QueryClient} from './query/query-client'; +import {TableClient} from './table'; +import SchemeService from './scheme'; import {ENDPOINT_DISCOVERY_PERIOD} from './constants'; import {IAuthService} from './credentials'; import {TimeoutExpired} from './errors'; import {getLogger, Logger} from './logging'; +import SchemeClient from './scheme'; import {ClientOptions} from './utils'; import {parseConnectionString} from './parse-connection-string'; import {makeSslCredentials, ISslCredentials} from './ssl-credentials'; -import {TableClient} from "./table/table-client"; export interface IPoolSettings { minLimit?: number; @@ -38,8 +38,7 @@ export default class Driver { private discoveryService: DiscoveryService; public tableClient: TableClient; - public schemeClient: SchemeClient; - // public queryClient: QueryClient; + public schemeClient: SchemeService; constructor(settings: IDriverSettings) { this.logger = settings.logger || getLogger(); @@ -80,15 +79,6 @@ export default class Driver { discoveryService: this.discoveryService, logger: this.logger, }); - // this.queryClient = new QueryClient({ - // database: this.database, - // authService: this.authService, - // sslCredentials: this.sslCredentials, - // poolSettings: this.poolSettings, - // clientOptions: this.clientOptions, - // discoveryService: this.discoveryService, - // logger: this.logger, - // }); this.schemeClient = new SchemeClient({ database: this.database, authService: this.authService, diff --git a/src/index.ts b/src/index.ts index c8f112e9..b381adc8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -26,10 +26,14 @@ export { StringFunction, } from './types'; export { - TableSession, + SessionPool, + Session, CreateTableSettings, AlterTableSettings, DropTableSettings, + BeginTransactionSettings, + CommitTransactionSettings, + RollbackTransactionSettings, DescribeTableSettings, PrepareQuerySettings, ExecuteQuerySettings, @@ -50,8 +54,9 @@ export { CompactionPolicy, ExecutionPolicy, CachingPolicy, - -} from './table/table-session'; + OperationParams, + AUTO_TX, +} from './table'; export { MakeDirectorySettings, RemoveDirectorySettings, @@ -73,9 +78,3 @@ export { export {ISslCredentials} from './ssl-credentials'; export {withRetries, RetryParameters} from './retries'; export {YdbError, StatusCode} from './errors'; -export {SessionPool} from "./table/session-pool"; -export {RollbackTransactionSettings} from "./table/session"; -export {CommitTransactionSettings} from "./table/session"; -export {BeginTransactionSettings} from "./table/session"; -export {OperationParams} from "./table/session"; -export {AUTO_TX} from "./table/session"; diff --git a/src/scheme.ts b/src/scheme.ts index 321a6095..bf9bda06 100644 --- a/src/scheme.ts +++ b/src/scheme.ts @@ -12,6 +12,7 @@ import {Logger} from './logging'; import DiscoveryService, {Endpoint} from './discovery'; import {retryable} from "./retries"; import {ISslCredentials} from './ssl-credentials'; +import {OperationParamsSettings} from './table'; import SchemeServiceAPI = Ydb.Scheme.V1.SchemeService; import ListDirectoryResult = Ydb.Scheme.ListDirectoryResult; @@ -21,7 +22,6 @@ import IMakeDirectoryRequest = Ydb.Scheme.IMakeDirectoryRequest; import IPermissions = Ydb.Scheme.IPermissions; import {util} from "protobufjs"; import EventEmitter = util.EventEmitter; -import {OperationParamsSettings} from "./table/session"; function preparePermissions(action?: IPermissions | null) { @@ -70,7 +70,7 @@ interface ISchemeClientSettings { logger: Logger; } -export class SchemeClient extends EventEmitter { +export default class SchemeClient extends EventEmitter { private schemeServices: Map; constructor(private settings: ISchemeClientSettings) { diff --git a/src/table/table-session.ts b/src/table.ts similarity index 66% rename from src/table/table-session.ts rename to src/table.ts index 4189f348..6e6ad7ca 100644 --- a/src/table/table-session.ts +++ b/src/table.ts @@ -1,24 +1,47 @@ +import _ from 'lodash'; +import EventEmitter from 'events'; import * as grpc from '@grpc/grpc-js'; import {google, Ydb} from 'ydb-sdk-proto'; -import BeginTransactionResult = Ydb.Table.BeginTransactionResult; -import ITransactionSettings = Ydb.Table.ITransactionSettings; -import ITransactionMeta = Ydb.Table.ITransactionMeta; -import {ensureOperationSucceeded, getOperationPayload, pessimizable,} from '../utils'; -import {Endpoint} from '../discovery'; -import {Logger} from '../logging'; -import {retryable} from '../retries'; -import {MissingStatus, SchemeError} from '../errors'; import { - OperationMode, - Session -} from "./session"; + AuthenticatedService, + ClientOptions, + StreamEnd, + ensureOperationSucceeded, + getOperationPayload, + pessimizable, + AsyncResponse, +} from './utils'; +import DiscoveryService, {Endpoint} from './discovery'; +import {IPoolSettings} from './driver'; +import {ISslCredentials} from './ssl-credentials'; +import {Events, ResponseMetadataKeys, SESSION_KEEPALIVE_PERIOD} from './constants'; +import {IAuthService} from './credentials'; +// noinspection ES6PreferShortImport +import {Logger} from './logging'; +import {retryable} from './retries'; +import { + SchemeError, + SessionPoolEmpty, + BadSession, + SessionBusy, + MissingValue, + YdbError, + MissingStatus, +} from './errors'; + import TableService = Ydb.Table.V1.TableService; +import CreateSessionRequest = Ydb.Table.CreateSessionRequest; +import ICreateSessionResult = Ydb.Table.ICreateSessionResult; +import CreateSessionResult = Ydb.Table.CreateSessionResult; import IQuery = Ydb.Table.IQuery; import IType = Ydb.IType; import DescribeTableResult = Ydb.Table.DescribeTableResult; import PrepareQueryResult = Ydb.Table.PrepareQueryResult; import ExecuteQueryResult = Ydb.Table.ExecuteQueryResult; -import ExplainQueryResult = Ydb.Table.ExplainQueryResult; +import ExplainQueryResult = Ydb.Table.ExplainQueryResult +import ITransactionSettings = Ydb.Table.ITransactionSettings; +import BeginTransactionResult = Ydb.Table.BeginTransactionResult; +import ITransactionMeta = Ydb.Table.ITransactionMeta; import AutoPartitioningPolicy = Ydb.Table.PartitioningPolicy.AutoPartitioningPolicy; import ITypedValue = Ydb.ITypedValue; import FeatureFlag = Ydb.FeatureFlag.Status; @@ -27,12 +50,44 @@ import ExecuteScanQueryPartialResult = Ydb.Table.ExecuteScanQueryPartialResult; import IKeyRange = Ydb.Table.IKeyRange; import TypedValue = Ydb.TypedValue; import BulkUpsertResult = Ydb.Table.BulkUpsertResult; +import OperationMode = Ydb.Operations.OperationParams.OperationMode; -export interface IExistingTransaction { - txId: string +interface PartialResponse { + status?: (Ydb.StatusIds.StatusCode|null); + issues?: (Ydb.Issue.IIssueMessage[]|null); + result?: (T|null); +} + +export class SessionService extends AuthenticatedService { + public endpoint: Endpoint; + private readonly logger: Logger; + + constructor(endpoint: Endpoint, database: string, authService: IAuthService, logger: Logger, sslCredentials?: ISslCredentials, clientOptions?: ClientOptions) { + const host = endpoint.toString(); + super(host, database, 'Ydb.Table.V1.TableService', TableService, authService, sslCredentials, clientOptions); + this.endpoint = endpoint; + this.logger = logger; + } + + @retryable() + @pessimizable + async create(): Promise { + const response = await this.api.createSession(CreateSessionRequest.create()); + const payload = getOperationPayload(response); + const {sessionId} = CreateSessionResult.decode(payload); + return new Session(this.api, this.endpoint, sessionId, this.logger, this.getResponseMetadata.bind(this)); + } } -export interface INewTransaction { +enum SessionEvent { + SESSION_RELEASE = 'SESSION_RELEASE', + SESSION_BROKEN = 'SESSION_BROKEN' +} + +interface IExistingTransaction { + txId: string +} +interface INewTransaction { beginTx: ITransactionSettings, commitTx: boolean } @@ -44,6 +99,10 @@ export const AUTO_TX: INewTransaction = { commitTx: true }; +interface IQueryParams { + [k: string]: Ydb.ITypedValue +} + export class OperationParams implements Ydb.Operations.IOperationParams { operationMode?: OperationMode; operationTimeout?: google.protobuf.IDuration; @@ -81,7 +140,7 @@ export class OperationParams implements Ydb.Operations.IOperationParams { return this; } - withLabels(labels: { [k: string]: string }) { + withLabels(labels: {[k: string]: string}) { this.labels = labels; return this; } @@ -101,25 +160,6 @@ export class OperationParamsSettings { } } -export class BeginTransactionSettings extends OperationParamsSettings { -} - -export class CommitTransactionSettings extends OperationParamsSettings { - collectStats?: Ydb.Table.QueryStatsCollection.Mode; - - withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) { - this.collectStats = collectStats; - return this; - } -} - -export class RollbackTransactionSettings extends OperationParamsSettings { -} - -interface IQueryParams { - [k: string]: Ydb.ITypedValue -} - export class CreateTableSettings extends OperationParamsSettings { } @@ -129,7 +169,6 @@ export class AlterTableSettings extends OperationParamsSettings { interface IDropTableSettings { muteNonExistingTableErrors: boolean; } - export class DropTableSettings extends OperationParamsSettings { muteNonExistingTableErrors: boolean; @@ -160,6 +199,21 @@ export class DescribeTableSettings extends OperationParamsSettings { } } +export class BeginTransactionSettings extends OperationParamsSettings { +} + +export class CommitTransactionSettings extends OperationParamsSettings { + collectStats?: Ydb.Table.QueryStatsCollection.Mode; + + withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) { + this.collectStats = collectStats; + return this; + } +} + +export class RollbackTransactionSettings extends OperationParamsSettings { +} + export class PrepareQuerySettings extends OperationParamsSettings { } @@ -251,16 +305,58 @@ export class ExecuteScanQuerySettings { } } -export class TableSession extends Session { +export class Session extends EventEmitter implements ICreateSessionResult { + private beingDeleted = false; + private free = true; + private closing = false; constructor( - api: TableService, - endpoint: Endpoint, - sessionId: string, - logger: Logger, - getResponseMetadata: (request: object) => grpc.Metadata | undefined + private api: TableService, + public endpoint: Endpoint, + public sessionId: string, + private logger: Logger, + private getResponseMetadata: (request: object) => grpc.Metadata | undefined ) { - super(api, endpoint, sessionId, logger, getResponseMetadata); + super(); + } + + acquire() { + this.free = false; + this.logger.debug(`Acquired session ${this.sessionId} on endpoint ${this.endpoint.toString()}.`); + return this; + } + release() { + this.free = true; + this.logger.debug(`Released session ${this.sessionId} on endpoint ${this.endpoint.toString()}.`); + this.emit(SessionEvent.SESSION_RELEASE, this); + } + + public isFree() { + return this.free && !this.isDeleted(); + } + public isClosing() { + return this.closing; + } + public isDeleted() { + return this.beingDeleted; + } + + @retryable() + @pessimizable + public async delete(): Promise { + if (this.isDeleted()) { + return Promise.resolve(); + } + this.beingDeleted = true; + ensureOperationSucceeded(await this.api.deleteSession({sessionId: this.sessionId})); + } + + @retryable() + @pessimizable + public async keepAlive(): Promise { + const request = {sessionId: this.sessionId}; + const response = await this.api.keepAlive(request); + ensureOperationSucceeded(this.processResponseMetadata(request, response)); } @retryable() @@ -364,6 +460,57 @@ export class TableSession extends Session { return Ydb.Table.DescribeTableOptionsResult.decode(payload); } + @retryable() + @pessimizable + public async beginTransaction( + txSettings: ITransactionSettings, + settings?: BeginTransactionSettings, + ): Promise { + const request: Ydb.Table.IBeginTransactionRequest = { + sessionId: this.sessionId, + txSettings, + }; + if (settings) { + request.operationParams = settings.operationParams; + } + const response = await this.api.beginTransaction(request); + const payload = getOperationPayload(this.processResponseMetadata(request, response)); + const {txMeta} = BeginTransactionResult.decode(payload); + if (txMeta) { + return txMeta; + } + throw new Error('Could not begin new transaction, txMeta is empty!'); + } + + @retryable() + @pessimizable + public async commitTransaction(txControl: IExistingTransaction, settings?: CommitTransactionSettings): Promise { + const request: Ydb.Table.ICommitTransactionRequest = { + sessionId: this.sessionId, + txId: txControl.txId, + }; + if (settings) { + request.operationParams = settings.operationParams; + request.collectStats = settings.collectStats; + } + const response = await this.api.commitTransaction(request); + ensureOperationSucceeded(this.processResponseMetadata(request, response)); + } + + @retryable() + @pessimizable + public async rollbackTransaction(txControl: IExistingTransaction, settings?: RollbackTransactionSettings): Promise { + const request: Ydb.Table.IRollbackTransactionRequest = { + sessionId: this.sessionId, + txId: txControl.txId, + }; + if (settings) { + request.operationParams = settings.operationParams; + } + const response = await this.api.rollbackTransaction(request); + ensureOperationSucceeded(this.processResponseMetadata(request, response)); + } + @retryable() @pessimizable public async prepareQuery(queryText: string, settings?: PrepareQuerySettings): Promise { @@ -420,6 +567,22 @@ export class TableSession extends Session { return ExecuteQueryResult.decode(payload); } + private processResponseMetadata( + request: object, + response: AsyncResponse, + onResponseMetadata?: (metadata: grpc.Metadata) => void + ) { + const metadata = this.getResponseMetadata(request); + if (metadata) { + const serverHints = metadata.get(ResponseMetadataKeys.ServerHints) || []; + if (serverHints.includes('session-close')) { + this.closing = true; + } + onResponseMetadata?.(metadata); + } + return response; + } + @pessimizable public async bulkUpsert(tablePath: string, rows: TypedValue, settings?: BulkUpsertSettings) { const request: Ydb.Table.IBulkUpsertRequest = { @@ -491,6 +654,43 @@ export class TableSession extends Session { consumer); } + private executeStreamRequest, IRes, Res>( + request: Req, + apiStreamMethod: (request: Req, callback: (error: (Error|null), response?: Resp) => void) => void, + transformer: (result: IRes) => Res, + consumer: (result: Res) => void) + : Promise { + return new Promise((resolve, reject) => { + apiStreamMethod(request, (error, response) => { + try { + if (error) { + if (error instanceof StreamEnd) { + resolve(); + } else { + reject(error); + } + } else if (response) { + const operation = { + status: response.status, + issues: response.issues, + } as Ydb.Operations.IOperation; + YdbError.checkStatus(operation); + + if (!response.result) { + reject(new MissingValue('Missing result value!')); + return; + } + + const result = transformer(response.result); + consumer(result); + } + } catch (e) { + reject(e); + } + }); + }); + } + public async explainQuery(query: string, operationParams?: Ydb.Operations.IOperationParams): Promise { const request: Ydb.Table.IExplainDataQueryRequest = { sessionId: this.sessionId, @@ -501,67 +701,230 @@ export class TableSession extends Session { const payload = getOperationPayload(this.processResponseMetadata(request, response)); return ExplainQueryResult.decode(payload); } +} - @retryable() - @pessimizable - public async beginTransaction( - txSettings: ITransactionSettings, - settings?: BeginTransactionSettings, - ): Promise { - const request: Ydb.Table.IBeginTransactionRequest = { - sessionId: this.sessionId, - txSettings, - }; - if (settings) { - request.operationParams = settings.operationParams; - } - const response = await this.api.beginTransaction(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response)); - const {txMeta} = BeginTransactionResult.decode(payload); - if (txMeta) { - return txMeta; +type SessionCallback = (session: Session) => Promise; + +interface ITableClientSettings { + database: string; + authService: IAuthService; + sslCredentials?: ISslCredentials; + poolSettings?: IPoolSettings; + clientOptions?: ClientOptions; + discoveryService: DiscoveryService; + logger: Logger; +} + +export class SessionPool extends EventEmitter { + private readonly database: string; + private readonly authService: IAuthService; + private readonly sslCredentials?: ISslCredentials; + private readonly clientOptions?: ClientOptions; + private readonly minLimit: number; + private readonly maxLimit: number; + private readonly sessions: Set; + private readonly sessionCreators: Map; + private readonly discoveryService: DiscoveryService; + private newSessionsRequested: number; + private sessionsBeingDeleted: number; + private readonly sessionKeepAliveId: NodeJS.Timeout; + private readonly logger: Logger; + private readonly waiters: ((session: Session) => void)[] = []; + + private static SESSION_MIN_LIMIT = 5; + private static SESSION_MAX_LIMIT = 20; + + constructor(settings: ITableClientSettings) { + super(); + this.database = settings.database; + this.authService = settings.authService; + this.sslCredentials = settings.sslCredentials; + this.clientOptions = settings.clientOptions; + this.logger = settings.logger; + const poolSettings = settings.poolSettings; + this.minLimit = poolSettings?.minLimit || SessionPool.SESSION_MIN_LIMIT; + this.maxLimit = poolSettings?.maxLimit || SessionPool.SESSION_MAX_LIMIT; + this.sessions = new Set(); + this.newSessionsRequested = 0; + this.sessionsBeingDeleted = 0; + this.sessionKeepAliveId = this.initListeners(poolSettings?.keepAlivePeriod || SESSION_KEEPALIVE_PERIOD); + this.sessionCreators = new Map(); + this.discoveryService = settings.discoveryService; + this.discoveryService.on(Events.ENDPOINT_REMOVED, (endpoint: Endpoint) => { + this.sessionCreators.delete(endpoint); + }); + this.prepopulateSessions(); + } + + public async destroy(): Promise { + this.logger.debug('Destroying pool...'); + clearInterval(this.sessionKeepAliveId); + await Promise.all(_.map([...this.sessions], (session: Session) => this.deleteSession(session))); + this.logger.debug('Pool has been destroyed.'); + } + + private initListeners(keepAlivePeriod: number) { + return setInterval(async () => Promise.all( + _.map([...this.sessions], (session: Session) => { + return session.keepAlive() + // delete session if error + .catch(() => this.deleteSession(session)) + // ignore errors to avoid UnhandledPromiseRejectionWarning + .catch(() => Promise.resolve()) + }) + ), keepAlivePeriod); + } + + private prepopulateSessions() { + _.forEach(_.range(this.minLimit), () => this.createSession()); + } + + private async getSessionCreator(): Promise { + const endpoint = await this.discoveryService.getEndpoint(); + if (!this.sessionCreators.has(endpoint)) { + const sessionService = new SessionService(endpoint, this.database, this.authService, this.logger, this.sslCredentials, this.clientOptions); + this.sessionCreators.set(endpoint, sessionService); } - throw new Error('Could not begin new transaction, txMeta is empty!'); + return this.sessionCreators.get(endpoint) as SessionService; } - @retryable() - @pessimizable - public async commitTransaction(txControl: IExistingTransaction, settings?: CommitTransactionSettings): Promise { - const request: Ydb.Table.ICommitTransactionRequest = { - sessionId: this.sessionId, - txId: txControl.txId, - }; - if (settings) { - request.operationParams = settings.operationParams; - request.collectStats = settings.collectStats; + private maybeUseSession(session: Session) { + if (this.waiters.length > 0) { + const waiter = this.waiters.shift(); + if (typeof waiter === "function") { + waiter(session); + return true; + } } - const response = await this.api.commitTransaction(request); - ensureOperationSucceeded(this.processResponseMetadata(request, response)); + return false; } - @retryable() - @pessimizable - public async rollbackTransaction(txControl: IExistingTransaction, settings?: RollbackTransactionSettings): Promise { - const request: Ydb.Table.IRollbackTransactionRequest = { - sessionId: this.sessionId, - txId: txControl.txId, - }; - if (settings) { - request.operationParams = settings.operationParams; + private async createSession(): Promise { + const sessionCreator = await this.getSessionCreator(); + const session = await sessionCreator.create(); + session.on(SessionEvent.SESSION_RELEASE, async () => { + if (session.isClosing()) { + await this.deleteSession(session); + } else { + this.maybeUseSession(session); + } + }) + session.on(SessionEvent.SESSION_BROKEN, async () => { + await this.deleteSession(session); + }); + this.sessions.add(session); + return session; + } + + private deleteSession(session: Session): Promise { + if (session.isDeleted()) { + return Promise.resolve(); + } + + this.sessionsBeingDeleted++; + // acquire new session as soon one of existing ones is deleted + if (this.waiters.length > 0) { + this.acquire().then((session) => { + if (!this.maybeUseSession(session)) { + session.release(); + } + }); + } + return session.delete() + // delete session in any case + .finally(() => { + this.sessions.delete(session); + this.sessionsBeingDeleted--; + }); + } + + private acquire(timeout: number = 0): Promise { + for (const session of this.sessions) { + if (session.isFree()) { + return Promise.resolve(session.acquire()); + } + } + + if (this.sessions.size + this.newSessionsRequested - this.sessionsBeingDeleted <= this.maxLimit) { + this.newSessionsRequested++; + return this.createSession() + .then((session) => { + return session.acquire(); + }) + .finally(() => { + this.newSessionsRequested--; + }); + } else { + return new Promise((resolve, reject) => { + let timeoutId: NodeJS.Timeout; + function waiter(session: Session) { + clearTimeout(timeoutId); + resolve(session.acquire()); + } + if (timeout) { + timeoutId = setTimeout(() => { + this.waiters.splice(this.waiters.indexOf(waiter), 1); + reject( + new SessionPoolEmpty(`No session became available within timeout of ${timeout} ms`) + ); + }, timeout); + } + this.waiters.push(waiter); + }); } - const response = await this.api.rollbackTransaction(request); - ensureOperationSucceeded(this.processResponseMetadata(request, response)); } - @retryable() - @pessimizable - public async keepAlive(): Promise { - if (typeof this.api.keepAlive !== 'function') { - throw new Error('This service does not support keep alive method'); + private async _withSession(session: Session, callback: SessionCallback, maxRetries = 0): Promise { + try { + const result = await callback(session); + session.release(); + return result; + } catch (error) { + if (error instanceof BadSession || error instanceof SessionBusy) { + this.logger.debug('Encountered bad or busy session, re-creating the session'); + session.emit(SessionEvent.SESSION_BROKEN); + session = await this.createSession(); + if (maxRetries > 0) { + this.logger.debug(`Re-running operation in new session, ${maxRetries} left.`); + session.acquire(); + return this._withSession(session, callback, maxRetries - 1); + } + } else { + session.release(); + } + throw error; } - const request = {sessionId: this.sessionId}; - const response = await this.api.keepAlive(request); - ensureOperationSucceeded(this.processResponseMetadata(request, response)); + } + + public async withSession(callback: SessionCallback, timeout: number = 0): Promise { + const session = await this.acquire(timeout); + return this._withSession(session, callback); + } + + public async withSessionRetry(callback: SessionCallback, timeout: number = 0, maxRetries = 10): Promise { + const session = await this.acquire(timeout); + return this._withSession(session, callback, maxRetries); + } +} + +export class TableClient extends EventEmitter { + private pool: SessionPool; + + constructor(settings: ITableClientSettings) { + super(); + this.pool = new SessionPool(settings); + } + + public async withSession(callback: (session: Session) => Promise, timeout: number = 0): Promise { + return this.pool.withSession(callback, timeout); + } + + public async withSessionRetry(callback: (session: Session) => Promise, timeout: number = 0, maxRetries = 10): Promise { + return this.pool.withSessionRetry(callback, timeout, maxRetries); + } + + public async destroy() { + await this.pool.destroy(); } } diff --git a/src/table/index.ts b/src/table/index.ts deleted file mode 100644 index 72657904..00000000 --- a/src/table/index.ts +++ /dev/null @@ -1,4 +0,0 @@ -export * from './session-pool'; -export * from './session'; -export * from './table-client'; -export * from './table-session'; diff --git a/src/table/session-pool.ts b/src/table/session-pool.ts deleted file mode 100644 index b508e397..00000000 --- a/src/table/session-pool.ts +++ /dev/null @@ -1,230 +0,0 @@ -import EventEmitter from "events"; -import {IAuthService} from "../credentials"; -import {ISslCredentials} from "../ssl-credentials"; -import {ClientOptions} from "../utils"; -import DiscoveryService, {Endpoint} from "../discovery"; -import {Logger} from "../logging"; -import {Events, SESSION_KEEPALIVE_PERIOD} from "../constants"; -import _ from "lodash"; -import {BadSession, SessionBusy, SessionPoolEmpty} from "../errors"; -import {IPoolSettings} from "../driver"; -import {Session, SessionCreator} from "./session"; - -export enum SessionEvent { - SESSION_RELEASE = 'SESSION_RELEASE', - SESSION_BROKEN = 'SESSION_BROKEN' -} - -type SessionCallback, T> = (session: SessionType) => Promise; - -export interface IClientSettings { - database: string; - authService: IAuthService; - sslCredentials?: ISslCredentials; - poolSettings?: IPoolSettings; - clientOptions?: ClientOptions; - discoveryService: DiscoveryService; - logger: Logger; -} - -export abstract class SessionPool> extends EventEmitter { - private readonly database: string; - private readonly authService: IAuthService; - private readonly sslCredentials?: ISslCredentials; - private readonly clientOptions?: ClientOptions; - private readonly minLimit: number; - private readonly maxLimit: number; - private readonly sessions: Set; - private readonly sessionCreators: Map>; - private readonly discoveryService: DiscoveryService; - private newSessionsRequested: number; - private sessionsBeingDeleted: number; - private readonly sessionKeepAliveId: NodeJS.Timeout; - private readonly logger: Logger; - private readonly waiters: ((session: SessionType) => void)[] = []; - - private static SESSION_MIN_LIMIT = 5; - private static SESSION_MAX_LIMIT = 20; - - constructor(settings: IClientSettings) { - super(); - this.database = settings.database; - this.authService = settings.authService; - this.sslCredentials = settings.sslCredentials; - this.clientOptions = settings.clientOptions; - this.logger = settings.logger; - const poolSettings = settings.poolSettings; - this.minLimit = poolSettings?.minLimit || SessionPool.SESSION_MIN_LIMIT; - this.maxLimit = poolSettings?.maxLimit || SessionPool.SESSION_MAX_LIMIT; - this.sessions = new Set(); - this.newSessionsRequested = 0; - this.sessionsBeingDeleted = 0; - this.sessionKeepAliveId = this.initListeners(poolSettings?.keepAlivePeriod || SESSION_KEEPALIVE_PERIOD); - this.sessionCreators = new Map(); - this.discoveryService = settings.discoveryService; - this.discoveryService.on(Events.ENDPOINT_REMOVED, (endpoint: Endpoint) => { - this.sessionCreators.delete(endpoint); - }); - this.prepopulateSessions(); - } - - public async destroy(): Promise { - this.logger.debug('Destroying pool...'); - clearInterval(this.sessionKeepAliveId); - await Promise.all(_.map([...this.sessions], (session: SessionType) => this.deleteSession(session))); - this.logger.debug('Pool has been destroyed.'); - } - - private initListeners(keepAlivePeriod: number) { - return setInterval(async () => Promise.all( - _.map([...this.sessions], (session: SessionType) => { - return session.keepAlive() - // delete session if error - .catch(() => this.deleteSession(session)) - // ignore errors to avoid UnhandledPromiseRejectionWarning - .catch(() => Promise.resolve()) - }) - ), keepAlivePeriod); - } - - private prepopulateSessions() { - _.forEach(_.range(this.minLimit), () => this.createSession()); - } - - protected abstract getSessionServiceCreator( - endpoint: Endpoint, - database: string, - authService: IAuthService, - logger: Logger, - sslCredentials: ISslCredentials | undefined, - clientOptions: ClientOptions | undefined): SessionCreator; - - private async getSessionCreator(): Promise> { - const endpoint = await this.discoveryService.getEndpoint(); - if (!this.sessionCreators.has(endpoint)) { - const sessionService = this.getSessionServiceCreator(endpoint, this.database, this.authService, this.logger, this.sslCredentials, this.clientOptions); - this.sessionCreators.set(endpoint, sessionService); - } - return this.sessionCreators.get(endpoint)!; - } - - private maybeUseSession(session: SessionType) { - if (this.waiters.length > 0) { - const waiter = this.waiters.shift(); - if (typeof waiter === "function") { - waiter(session); - return true; - } - } - return false; - } - - private async createSession(): Promise { - const sessionCreator = await this.getSessionCreator(); - const session = await sessionCreator.create() as SessionType; - session.on(SessionEvent.SESSION_RELEASE, async () => { - if (session.isClosing()) { - await this.deleteSession(session); - } else { - this.maybeUseSession(session); - } - }) - session.on(SessionEvent.SESSION_BROKEN, async () => { - await this.deleteSession(session); - }); - this.sessions.add(session); - return session; - } - - private deleteSession(session: SessionType): Promise { - if (session.isDeleted()) { - return Promise.resolve(); - } - - this.sessionsBeingDeleted++; - // acquire new session as soon one of existing ones is deleted - if (this.waiters.length > 0) { - this.acquire().then((session) => { - if (!this.maybeUseSession(session)) { - session.release(); - } - }); - } - return session.delete() - // delete session in any case - .finally(() => { - this.sessions.delete(session); - this.sessionsBeingDeleted--; - }); - } - - private acquire(timeout: number = 0): Promise { - for (const session of this.sessions) { - if (session.isFree()) { - return Promise.resolve(session.acquire()); - } - } - - if (this.sessions.size + this.newSessionsRequested - this.sessionsBeingDeleted <= this.maxLimit) { - this.newSessionsRequested++; - return this.createSession() - .then((session) => { - return session.acquire(); - }) - .finally(() => { - this.newSessionsRequested--; - }); - } else { - return new Promise((resolve, reject) => { - let timeoutId: NodeJS.Timeout; - - function waiter(session: SessionType) { - clearTimeout(timeoutId); - resolve(session.acquire()); - } - - if (timeout) { - timeoutId = setTimeout(() => { - this.waiters.splice(this.waiters.indexOf(waiter), 1); - reject( - new SessionPoolEmpty(`No session became available within timeout of ${timeout} ms`) - ); - }, timeout); - } - this.waiters.push(waiter); - }); - } - } - - private async _withSession(session: SessionType, callback: SessionCallback, maxRetries = 0): Promise { - try { - const result = await callback(session); - session.release(); - return result; - } catch (error) { - if (error instanceof BadSession || error instanceof SessionBusy) { - this.logger.debug('Encountered bad or busy session, re-creating the session'); - session.emit(SessionEvent.SESSION_BROKEN); - session = await this.createSession(); - if (maxRetries > 0) { - this.logger.debug(`Re-running operation in new session, ${maxRetries} left.`); - session.acquire(); - return this._withSession(session, callback, maxRetries - 1); - } - } else { - session.release(); - } - throw error; - } - } - - public async withSession(callback: SessionCallback, timeout: number = 0): Promise { - const session = await this.acquire(timeout); - return this._withSession(session, callback); - } - - public async withSessionRetry(callback: SessionCallback, timeout: number = 0, maxRetries = 10): Promise { - const session = await this.acquire(timeout); - return this._withSession(session, callback, maxRetries); - } -} diff --git a/src/table/session.ts b/src/table/session.ts deleted file mode 100644 index 5dbba772..00000000 --- a/src/table/session.ts +++ /dev/null @@ -1,126 +0,0 @@ -import {Ydb} from "ydb-sdk-proto"; -export import OperationMode = Ydb.Operations.OperationParams.OperationMode; -import EventEmitter from "events"; -import {Endpoint} from "../discovery"; -import {Logger} from "../logging"; -import * as grpc from "@grpc/grpc-js"; -import {SessionEvent} from "./session-pool"; -import {retryable} from "../retries"; -import {AsyncResponse, ensureOperationSucceeded, pessimizable, StreamEnd} from "../utils"; -import {ResponseMetadataKeys} from "../constants"; -import {MissingValue, YdbError} from "../errors"; -import * as $protobuf from "protobufjs"; - -interface PartialResponse { - status?: (Ydb.StatusIds.StatusCode|null); - issues?: (Ydb.Issue.IIssueMessage[]|null); - result?: (T|null); -} - - -export interface ServiceWithSessions extends EventEmitter { - createSession(request: Ydb.Table.ICreateSessionRequest): Promise; - deleteSession(request: Ydb.Table.IDeleteSessionRequest): Promise; - Alive?(request: Ydb.Table.IKeepAliveRequest): Promise; -} - -export interface SessionCreator> { - create(): Promise; -} - -export abstract class Session extends EventEmitter implements Ydb.Table.ICreateSessionResult { - private beingDeleted = false; - private free = true; - private closing = false; - - constructor(protected api: ServiceType, public endpoint: Endpoint, public sessionId: string, protected logger: Logger, protected getResponseMetadata: (request: object) => grpc.Metadata | undefined) { - super(); - } - - acquire() { - this.free = false; - this.logger.debug(`Acquired session ${this.sessionId} on endpoint ${this.endpoint.toString()}.`); - return this; - } - - release() { - this.free = true; - this.logger.debug(`Released session ${this.sessionId} on endpoint ${this.endpoint.toString()}.`); - this.emit(SessionEvent.SESSION_RELEASE, this); - } - - public isFree() { - return this.free && !this.isDeleted(); - } - - public isClosing() { - return this.closing; - } - - public isDeleted() { - return this.beingDeleted; - } - - @retryable() - @pessimizable - public async delete(): Promise { - if (this.isDeleted()) { - return Promise.resolve(); - } - this.beingDeleted = true; - ensureOperationSucceeded(await this.api.deleteSession({sessionId: this.sessionId})); - } - - protected processResponseMetadata( - request: object, - response: AsyncResponse, - onResponseMetadata?: (metadata: grpc.Metadata) => void - ) { - const metadata = this.getResponseMetadata(request); - if (metadata) { - const serverHints = metadata.get(ResponseMetadataKeys.ServerHints) || []; - if (serverHints.includes('session-close')) { - this.closing = true; - } - onResponseMetadata?.(metadata); - } - return response; - } - - protected executeStreamRequest, IRes, Res>( - request: Req, - apiStreamMethod: (request: Req, callback: (error: (Error|null), response?: Resp) => void) => void, - transformer: (result: IRes) => Res, - consumer: (result: Res) => void) - : Promise { - return new Promise((resolve, reject) => { - apiStreamMethod(request, (error, response) => { - try { - if (error) { - if (error instanceof StreamEnd) { - resolve(); - } else { - reject(error); - } - } else if (response) { - const operation = { - status: response.status, - issues: response.issues, - } as Ydb.Operations.IOperation; - YdbError.checkStatus(operation); - - if (!response.result) { - reject(new MissingValue('Missing result value!')); - return; - } - - const result = transformer(response.result); - consumer(result); - } - } catch (e) { - reject(e); - } - }); - }); - } -} diff --git a/src/table/table-client.ts b/src/table/table-client.ts deleted file mode 100644 index 7580d73b..00000000 --- a/src/table/table-client.ts +++ /dev/null @@ -1,73 +0,0 @@ -import {Ydb} from "ydb-sdk-proto"; -import TableService = Ydb.Table.V1.TableService; -import CreateSessionRequest = Ydb.Table.CreateSessionRequest; -import CreateSessionResult = Ydb.Table.CreateSessionResult; -import EventEmitter from "events"; -import {IClientSettings, SessionPool} from "./session-pool"; -import {Endpoint} from "../discovery"; -import {IAuthService} from "../credentials"; -import {Logger} from "../logging"; -import {ISslCredentials} from "../ssl-credentials"; -import {AuthenticatedService, ClientOptions, getOperationPayload, pessimizable} from "../utils"; -import {SessionCreator} from "./session"; -import {retryable} from "../retries"; -import {TableSession} from "./table-session"; -import TransactionSettings = Ydb.Query.TransactionSettings; - -export class TableSessionCreator extends AuthenticatedService { - public endpoint: Endpoint; - private readonly logger: Logger; - - constructor(endpoint: Endpoint, database: string, authService: IAuthService, logger: Logger, sslCredentials?: ISslCredentials, clientOptions?: ClientOptions) { - const host = endpoint.toString(); - super(host, database, 'Ydb.Table.V1.TableService', TableService, authService, sslCredentials, clientOptions); - this.endpoint = endpoint; - this.logger = logger; - } -7 - @retryable() - @pessimizable - async create(): Promise { - const response = await this.api.createSession(CreateSessionRequest.create()); - const payload = getOperationPayload(response); - const {sessionId} = CreateSessionResult.decode(payload); - return new TableSession(this.api, this.endpoint, sessionId, this.logger, this.getResponseMetadata.bind(this)); - } -} - -class TableSessionPool extends SessionPool { - protected getSessionServiceCreator( - endpoint: Endpoint, - database: string, - authService: IAuthService, - logger: Logger, - sslCredentials: ISslCredentials | undefined, - clientOptions: ClientOptions | undefined): SessionCreator { - return new TableSessionCreator(endpoint, database, authService, logger, sslCredentials, clientOptions); - } -} - -export class TableClient extends EventEmitter { - private pool: TableSessionPool; - - constructor(settings: IClientSettings) { - super(); - this.pool = new TableSessionPool(settings); - } - - public async withSession(callback: (session: TableSession) => Promise, timeout: number = 0): Promise { - return this.pool.withSession(callback, timeout); - } - - public async withSessionRetry(callback: (session: TableSession) => Promise, timeout: number = 0, maxRetries = 10): Promise { - return this.pool.withSessionRetry(callback, timeout, maxRetries); - } - - public async destroy() { - await this.pool.destroy(); - } -} - -class Test extends TransactionSettings { - -} diff --git a/src/test-utils.ts b/src/test-utils.ts index 65c949b9..f676d290 100644 --- a/src/test-utils.ts +++ b/src/test-utils.ts @@ -2,7 +2,7 @@ import fs from 'fs'; import path from 'path'; import Driver, {IDriverSettings} from "./driver"; import {declareType, TypedData, Types} from "./types"; -import {Column, TableSession, TableDescription} from "./table/table-session"; +import {Column, Session, TableDescription} from "./table"; import {withRetries} from "./retries"; import {AnonymousAuthService} from "./credentials"; @@ -55,7 +55,7 @@ export async function destroyDriver(driver: Driver): Promise { } } -export async function createTable(session: TableSession) { +export async function createTable(session: Session) { await session.dropTable(TABLE); await session.createTable( TABLE, @@ -72,7 +72,7 @@ export async function createTable(session: TableSession) { ); } -export async function fillTableWithData(session: TableSession, rows: Row[]) { +export async function fillTableWithData(session: Session, rows: Row[]) { const query = ` DECLARE $data AS List>;