From 21a4bea83a85dcdf607d9e82f2b322a3415f2a9a Mon Sep 17 00:00:00 2001 From: Alexey Zorkaltsev Date: Wed, 24 Jan 2024 11:41:54 +0300 Subject: [PATCH] wip --- CHANGELOG.md | 6 +- src/__tests__/alter-table.test.ts | 4 +- src/__tests__/bulk-upsert.test.ts | 4 +- src/__tests__/bytestring-identity.test.ts | 6 +- src/__tests__/create-table.test.ts | 2 +- src/__tests__/read-table.test.ts | 4 +- src/__tests__/scan-query.test.ts | 4 +- src/driver.ts | 2 +- src/index.ts | 17 +- src/scheme.ts | 2 +- src/table/index.ts | 3 + src/table/session-pool.ts | 230 ++++++++++ src/table/session.ts | 281 ++++++++++++ src/{table.ts => table/table-client.ts} | 529 ++-------------------- src/test-utils.ts | 6 +- 15 files changed, 585 insertions(+), 515 deletions(-) create mode 100644 src/table/index.ts create mode 100644 src/table/session-pool.ts create mode 100644 src/table/session.ts rename src/{table.ts => table/table-client.ts} (59%) diff --git a/CHANGELOG.md b/CHANGELOG.md index f612ceac..333edd4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -286,7 +286,7 @@ All notable changes to this project will be documented in this file. See [standa - decimal value is present as string instead of bigint (it wasn't working for float values before) - fix uuid and tz-date types conversion (it wasn't working before) -* signatures of most methods in Session are changed: +* signatures of most methods in TableSession are changed: - executeQuery Before: `(query, params, txControl, operationParams?, settings?, collectStats?)` After: `(query, params, txControl, settings?)` @@ -336,7 +336,7 @@ All notable changes to this project will be documented in this file. See [standa * drop support of old environment variables ([963819a](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/963819af9209a45749f5118077f1da4bdb390fa6)) * reorganize signature of SchemeClient's methods ([734d57a](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/734d57a2dd7c655cf727b96df415212504339cf8)) -* reorganize signatures of Session's methods ([431f149](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/431f1491bf880f3ba9541d9455d8dd2f2b7849e6)) +* reorganize signatures of TableSession's methods ([431f149](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/431f1491bf880f3ba9541d9455d8dd2f2b7849e6)) * use identity names conversion in TypedData ([275598a](https://www.github.com/ydb-platform/ydb-nodejs-sdk/commit/275598aa444e1e977386a3dadd02bbc9ba01f38e)) ### [2.9.2](https://www.github.com/ydb-platform/ydb-nodejs-sdk/compare/v2.9.1...v2.9.2) (2022-02-09) @@ -431,7 +431,7 @@ All notable changes to this project will be documented in this file. See [standa * and many other changes in protobufs. ### 1.10.0 -* Add `alterTable` method to Session class +* Add `alterTable` method to TableSession class * Put compiled protobufs to a separate 'ydb-sdk' namespace ### 1.9.0 diff --git a/src/__tests__/alter-table.test.ts b/src/__tests__/alter-table.test.ts index 87c541b4..190c38e2 100644 --- a/src/__tests__/alter-table.test.ts +++ b/src/__tests__/alter-table.test.ts @@ -4,11 +4,11 @@ import { AlterTableDescription, AlterTableSettings, Column, - OperationParams, TableDescription, TableIndex, -} from '../table'; +} from '../table/table-client'; import { Types } from '../types'; +import {OperationParams} from "../table/session"; const getTableName = () => `table_alter_${Math.trunc(1000 * Math.random())}`; diff --git a/src/__tests__/bulk-upsert.test.ts b/src/__tests__/bulk-upsert.test.ts index 245a4056..3bbf7b4c 100644 --- a/src/__tests__/bulk-upsert.test.ts +++ b/src/__tests__/bulk-upsert.test.ts @@ -7,10 +7,10 @@ import { Row, TABLE } from '../test-utils'; -import {Session} from '../table'; +import {TableSession} from '../table/table-client'; import {Ydb} from 'ydb-sdk-proto'; -async function readTable(session: Session): Promise { +async function readTable(session: TableSession): Promise { const rows: Row[] = []; await session.streamReadTable(TABLE, (result) => { diff --git a/src/__tests__/bytestring-identity.test.ts b/src/__tests__/bytestring-identity.test.ts index c2c4f51b..6aa5d46e 100644 --- a/src/__tests__/bytestring-identity.test.ts +++ b/src/__tests__/bytestring-identity.test.ts @@ -1,10 +1,10 @@ import Driver from '../driver'; import {destroyDriver, initDriver, TABLE} from '../test-utils'; -import {Column, Session, TableDescription} from '../table'; +import {Column, TableSession, TableDescription} from '../table/table-client'; import {declareType, TypedData, Types} from '../types'; import {withRetries} from '../retries'; -async function createTable(session: Session) { +async function createTable(session: TableSession) { await session.dropTable(TABLE); await session.createTable( TABLE, @@ -46,7 +46,7 @@ class Row extends TypedData { } } -export async function fillTableWithData(session: Session, rows: Row[]) { +export async function fillTableWithData(session: TableSession, rows: Row[]) { const query = ` DECLARE $data AS List>; diff --git a/src/__tests__/create-table.test.ts b/src/__tests__/create-table.test.ts index 4933a011..598cd1d9 100644 --- a/src/__tests__/create-table.test.ts +++ b/src/__tests__/create-table.test.ts @@ -1,6 +1,6 @@ import Driver from '../driver'; import {destroyDriver, initDriver} from '../test-utils'; -import {Column, DescribeTableSettings, TableDescription} from '../table'; +import {Column, DescribeTableSettings, TableDescription} from '../table/table-client'; import {TypedValues, Types} from '../types'; import Long from 'long'; import {Ydb} from 'ydb-sdk-proto'; diff --git a/src/__tests__/read-table.test.ts b/src/__tests__/read-table.test.ts index 70047334..696c4da0 100644 --- a/src/__tests__/read-table.test.ts +++ b/src/__tests__/read-table.test.ts @@ -7,10 +7,10 @@ import { Row, TABLE } from '../test-utils'; -import {ReadTableSettings, Session} from '../table'; +import {ReadTableSettings, TableSession} from '../table/table-client'; import {TypedValues, TypedData} from '../types'; -async function readTable(session: Session, settings: ReadTableSettings): Promise { +async function readTable(session: TableSession, settings: ReadTableSettings): Promise { const rows: TypedData[] = []; await session.streamReadTable(TABLE, (result) => { diff --git a/src/__tests__/scan-query.test.ts b/src/__tests__/scan-query.test.ts index 04dbc428..0a61a9d4 100644 --- a/src/__tests__/scan-query.test.ts +++ b/src/__tests__/scan-query.test.ts @@ -7,10 +7,10 @@ import { initDriver, Row, } from '../test-utils'; -import {Session} from '../table'; +import {TableSession} from '../table/table-client'; import {TypedData} from '../types'; -async function executeScanQuery(session: Session): Promise { +async function executeScanQuery(session: TableSession): Promise { const query = `SELECT * FROM ${TABLE};`; const rows: TypedData[] = []; diff --git a/src/driver.ts b/src/driver.ts index 35d34b3b..250207b9 100644 --- a/src/driver.ts +++ b/src/driver.ts @@ -1,5 +1,5 @@ import DiscoveryService from './discovery'; -import {TableClient} from './table'; +import {TableClient} from './table/table-client'; import SchemeService from './scheme'; import {ENDPOINT_DISCOVERY_PERIOD} from './constants'; import {IAuthService} from './credentials'; diff --git a/src/index.ts b/src/index.ts index b381adc8..a823fbd9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -26,14 +26,10 @@ export { StringFunction, } from './types'; export { - SessionPool, - Session, + TableSession, CreateTableSettings, AlterTableSettings, DropTableSettings, - BeginTransactionSettings, - CommitTransactionSettings, - RollbackTransactionSettings, DescribeTableSettings, PrepareQuerySettings, ExecuteQuerySettings, @@ -54,9 +50,8 @@ export { CompactionPolicy, ExecutionPolicy, CachingPolicy, - OperationParams, - AUTO_TX, -} from './table'; + +} from './table/table-client'; export { MakeDirectorySettings, RemoveDirectorySettings, @@ -78,3 +73,9 @@ export { export {ISslCredentials} from './ssl-credentials'; export {withRetries, RetryParameters} from './retries'; export {YdbError, StatusCode} from './errors'; +export {SessionPool} from "./table/session-pool"; +export {RollbackTransactionSettings} from "./table/session"; +export {CommitTransactionSettings} from "./table/session"; +export {BeginTransactionSettings} from "./table/session"; +export {OperationParams} from "./table/session"; +export {AUTO_TX} from "./table/session"; diff --git a/src/scheme.ts b/src/scheme.ts index bf9bda06..6b2786fe 100644 --- a/src/scheme.ts +++ b/src/scheme.ts @@ -12,7 +12,6 @@ import {Logger} from './logging'; import DiscoveryService, {Endpoint} from './discovery'; import {retryable} from "./retries"; import {ISslCredentials} from './ssl-credentials'; -import {OperationParamsSettings} from './table'; import SchemeServiceAPI = Ydb.Scheme.V1.SchemeService; import ListDirectoryResult = Ydb.Scheme.ListDirectoryResult; @@ -22,6 +21,7 @@ import IMakeDirectoryRequest = Ydb.Scheme.IMakeDirectoryRequest; import IPermissions = Ydb.Scheme.IPermissions; import {util} from "protobufjs"; import EventEmitter = util.EventEmitter; +import {OperationParamsSettings} from "./table/session"; function preparePermissions(action?: IPermissions | null) { diff --git a/src/table/index.ts b/src/table/index.ts new file mode 100644 index 00000000..b1758268 --- /dev/null +++ b/src/table/index.ts @@ -0,0 +1,3 @@ +export * from './session-pool'; +export * from './session'; +export * from './table-client'; diff --git a/src/table/session-pool.ts b/src/table/session-pool.ts new file mode 100644 index 00000000..b508e397 --- /dev/null +++ b/src/table/session-pool.ts @@ -0,0 +1,230 @@ +import EventEmitter from "events"; +import {IAuthService} from "../credentials"; +import {ISslCredentials} from "../ssl-credentials"; +import {ClientOptions} from "../utils"; +import DiscoveryService, {Endpoint} from "../discovery"; +import {Logger} from "../logging"; +import {Events, SESSION_KEEPALIVE_PERIOD} from "../constants"; +import _ from "lodash"; +import {BadSession, SessionBusy, SessionPoolEmpty} from "../errors"; +import {IPoolSettings} from "../driver"; +import {Session, SessionCreator} from "./session"; + +export enum SessionEvent { + SESSION_RELEASE = 'SESSION_RELEASE', + SESSION_BROKEN = 'SESSION_BROKEN' +} + +type SessionCallback, T> = (session: SessionType) => Promise; + +export interface IClientSettings { + database: string; + authService: IAuthService; + sslCredentials?: ISslCredentials; + poolSettings?: IPoolSettings; + clientOptions?: ClientOptions; + discoveryService: DiscoveryService; + logger: Logger; +} + +export abstract class SessionPool> extends EventEmitter { + private readonly database: string; + private readonly authService: IAuthService; + private readonly sslCredentials?: ISslCredentials; + private readonly clientOptions?: ClientOptions; + private readonly minLimit: number; + private readonly maxLimit: number; + private readonly sessions: Set; + private readonly sessionCreators: Map>; + private readonly discoveryService: DiscoveryService; + private newSessionsRequested: number; + private sessionsBeingDeleted: number; + private readonly sessionKeepAliveId: NodeJS.Timeout; + private readonly logger: Logger; + private readonly waiters: ((session: SessionType) => void)[] = []; + + private static SESSION_MIN_LIMIT = 5; + private static SESSION_MAX_LIMIT = 20; + + constructor(settings: IClientSettings) { + super(); + this.database = settings.database; + this.authService = settings.authService; + this.sslCredentials = settings.sslCredentials; + this.clientOptions = settings.clientOptions; + this.logger = settings.logger; + const poolSettings = settings.poolSettings; + this.minLimit = poolSettings?.minLimit || SessionPool.SESSION_MIN_LIMIT; + this.maxLimit = poolSettings?.maxLimit || SessionPool.SESSION_MAX_LIMIT; + this.sessions = new Set(); + this.newSessionsRequested = 0; + this.sessionsBeingDeleted = 0; + this.sessionKeepAliveId = this.initListeners(poolSettings?.keepAlivePeriod || SESSION_KEEPALIVE_PERIOD); + this.sessionCreators = new Map(); + this.discoveryService = settings.discoveryService; + this.discoveryService.on(Events.ENDPOINT_REMOVED, (endpoint: Endpoint) => { + this.sessionCreators.delete(endpoint); + }); + this.prepopulateSessions(); + } + + public async destroy(): Promise { + this.logger.debug('Destroying pool...'); + clearInterval(this.sessionKeepAliveId); + await Promise.all(_.map([...this.sessions], (session: SessionType) => this.deleteSession(session))); + this.logger.debug('Pool has been destroyed.'); + } + + private initListeners(keepAlivePeriod: number) { + return setInterval(async () => Promise.all( + _.map([...this.sessions], (session: SessionType) => { + return session.keepAlive() + // delete session if error + .catch(() => this.deleteSession(session)) + // ignore errors to avoid UnhandledPromiseRejectionWarning + .catch(() => Promise.resolve()) + }) + ), keepAlivePeriod); + } + + private prepopulateSessions() { + _.forEach(_.range(this.minLimit), () => this.createSession()); + } + + protected abstract getSessionServiceCreator( + endpoint: Endpoint, + database: string, + authService: IAuthService, + logger: Logger, + sslCredentials: ISslCredentials | undefined, + clientOptions: ClientOptions | undefined): SessionCreator; + + private async getSessionCreator(): Promise> { + const endpoint = await this.discoveryService.getEndpoint(); + if (!this.sessionCreators.has(endpoint)) { + const sessionService = this.getSessionServiceCreator(endpoint, this.database, this.authService, this.logger, this.sslCredentials, this.clientOptions); + this.sessionCreators.set(endpoint, sessionService); + } + return this.sessionCreators.get(endpoint)!; + } + + private maybeUseSession(session: SessionType) { + if (this.waiters.length > 0) { + const waiter = this.waiters.shift(); + if (typeof waiter === "function") { + waiter(session); + return true; + } + } + return false; + } + + private async createSession(): Promise { + const sessionCreator = await this.getSessionCreator(); + const session = await sessionCreator.create() as SessionType; + session.on(SessionEvent.SESSION_RELEASE, async () => { + if (session.isClosing()) { + await this.deleteSession(session); + } else { + this.maybeUseSession(session); + } + }) + session.on(SessionEvent.SESSION_BROKEN, async () => { + await this.deleteSession(session); + }); + this.sessions.add(session); + return session; + } + + private deleteSession(session: SessionType): Promise { + if (session.isDeleted()) { + return Promise.resolve(); + } + + this.sessionsBeingDeleted++; + // acquire new session as soon one of existing ones is deleted + if (this.waiters.length > 0) { + this.acquire().then((session) => { + if (!this.maybeUseSession(session)) { + session.release(); + } + }); + } + return session.delete() + // delete session in any case + .finally(() => { + this.sessions.delete(session); + this.sessionsBeingDeleted--; + }); + } + + private acquire(timeout: number = 0): Promise { + for (const session of this.sessions) { + if (session.isFree()) { + return Promise.resolve(session.acquire()); + } + } + + if (this.sessions.size + this.newSessionsRequested - this.sessionsBeingDeleted <= this.maxLimit) { + this.newSessionsRequested++; + return this.createSession() + .then((session) => { + return session.acquire(); + }) + .finally(() => { + this.newSessionsRequested--; + }); + } else { + return new Promise((resolve, reject) => { + let timeoutId: NodeJS.Timeout; + + function waiter(session: SessionType) { + clearTimeout(timeoutId); + resolve(session.acquire()); + } + + if (timeout) { + timeoutId = setTimeout(() => { + this.waiters.splice(this.waiters.indexOf(waiter), 1); + reject( + new SessionPoolEmpty(`No session became available within timeout of ${timeout} ms`) + ); + }, timeout); + } + this.waiters.push(waiter); + }); + } + } + + private async _withSession(session: SessionType, callback: SessionCallback, maxRetries = 0): Promise { + try { + const result = await callback(session); + session.release(); + return result; + } catch (error) { + if (error instanceof BadSession || error instanceof SessionBusy) { + this.logger.debug('Encountered bad or busy session, re-creating the session'); + session.emit(SessionEvent.SESSION_BROKEN); + session = await this.createSession(); + if (maxRetries > 0) { + this.logger.debug(`Re-running operation in new session, ${maxRetries} left.`); + session.acquire(); + return this._withSession(session, callback, maxRetries - 1); + } + } else { + session.release(); + } + throw error; + } + } + + public async withSession(callback: SessionCallback, timeout: number = 0): Promise { + const session = await this.acquire(timeout); + return this._withSession(session, callback); + } + + public async withSessionRetry(callback: SessionCallback, timeout: number = 0, maxRetries = 10): Promise { + const session = await this.acquire(timeout); + return this._withSession(session, callback, maxRetries); + } +} diff --git a/src/table/session.ts b/src/table/session.ts new file mode 100644 index 00000000..1df8187a --- /dev/null +++ b/src/table/session.ts @@ -0,0 +1,281 @@ +import {google, Ydb} from "ydb-sdk-proto"; +import ITransactionSettings = Ydb.Table.ITransactionSettings; +import BeginTransactionResult = Ydb.Table.BeginTransactionResult; +import ITransactionMeta = Ydb.Table.ITransactionMeta; +export import OperationMode = Ydb.Operations.OperationParams.OperationMode; +import EventEmitter from "events"; +import {Endpoint} from "../discovery"; +import {Logger} from "../logging"; +import * as grpc from "@grpc/grpc-js"; +import {SessionEvent} from "./session-pool"; +import {retryable} from "../retries"; +import {AsyncResponse, ensureOperationSucceeded, getOperationPayload, pessimizable, StreamEnd} from "../utils"; +import {ResponseMetadataKeys} from "../constants"; +import {MissingValue, YdbError} from "../errors"; + +export interface IExistingTransaction { + txId: string +} + +export interface INewTransaction { + beginTx: ITransactionSettings, + commitTx: boolean +} + +export const AUTO_TX: INewTransaction = { + beginTx: { + serializableReadWrite: {} + }, + commitTx: true +}; + +interface PartialResponse { + status?: (Ydb.StatusIds.StatusCode|null); + issues?: (Ydb.Issue.IIssueMessage[]|null); + result?: (T|null); +} + +export class OperationParams implements Ydb.Operations.IOperationParams { + operationMode?: OperationMode; + operationTimeout?: google.protobuf.IDuration; + cancelAfter?: google.protobuf.IDuration; + labels?: { [k: string]: string }; + reportCostInfo?: Ydb.FeatureFlag.Status; + + withSyncMode() { + this.operationMode = OperationMode.SYNC; + return this; + } + + withAsyncMode() { + this.operationMode = OperationMode.ASYNC; + return this; + } + + withOperationTimeout(duration: google.protobuf.IDuration) { + this.operationTimeout = duration; + return this; + } + + withOperationTimeoutSeconds(seconds: number) { + this.operationTimeout = {seconds}; + return this; + } + + withCancelAfter(duration: google.protobuf.IDuration) { + this.cancelAfter = duration; + return this; + } + + withCancelAfterSeconds(seconds: number) { + this.cancelAfter = {seconds}; + return this; + } + + withLabels(labels: { [k: string]: string }) { + this.labels = labels; + return this; + } + + withReportCostInfo() { + this.reportCostInfo = Ydb.FeatureFlag.Status.ENABLED; + return this; + } +} + +export class OperationParamsSettings { + operationParams?: OperationParams; + + withOperationParams(operationParams: OperationParams) { + this.operationParams = operationParams; + return this; + } +} + +export class BeginTransactionSettings extends OperationParamsSettings { +} + +export class CommitTransactionSettings extends OperationParamsSettings { + collectStats?: Ydb.Table.QueryStatsCollection.Mode; + + withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) { + this.collectStats = collectStats; + return this; + } +} + +export class RollbackTransactionSettings extends OperationParamsSettings { +} + +export interface ServiceWithSessionsAndTransactions extends EventEmitter { + createSession(request: Ydb.Table.ICreateSessionRequest): Promise; + deleteSession(request: Ydb.Table.IDeleteSessionRequest): Promise; + keepAlive?(request: Ydb.Table.IKeepAliveRequest): Promise; + + beginTransaction(request: Ydb.Table.IBeginTransactionRequest): Promise; + commitTransaction(request: Ydb.Table.ICommitTransactionRequest): Promise; + rollbackTransaction(request: Ydb.Table.IRollbackTransactionRequest): Promise; +} + +export interface SessionCreator> { + create(): Promise; +} + +export class Session extends EventEmitter implements Ydb.Table.ICreateSessionResult { + private beingDeleted = false; + private free = true; + private closing = false; + + constructor(protected api: ServiceType, public endpoint: Endpoint, public sessionId: string, protected logger: Logger, protected getResponseMetadata: (request: object) => grpc.Metadata | undefined) { + super(); + } + + acquire() { + this.free = false; + this.logger.debug(`Acquired session ${this.sessionId} on endpoint ${this.endpoint.toString()}.`); + return this; + } + + release() { + this.free = true; + this.logger.debug(`Released session ${this.sessionId} on endpoint ${this.endpoint.toString()}.`); + this.emit(SessionEvent.SESSION_RELEASE, this); + } + + public isFree() { + return this.free && !this.isDeleted(); + } + + public isClosing() { + return this.closing; + } + + public isDeleted() { + return this.beingDeleted; + } + + @retryable() + @pessimizable + public async delete(): Promise { + if (this.isDeleted()) { + return Promise.resolve(); + } + this.beingDeleted = true; + ensureOperationSucceeded(await this.api.deleteSession({sessionId: this.sessionId})); + } + + @retryable() + @pessimizable + public async keepAlive(): Promise { + if (typeof this.api.keepAlive !== 'function') { + throw new Error('This service does not support keep alive method'); + } + const request = {sessionId: this.sessionId}; + const response = await this.api.keepAlive(request); + ensureOperationSucceeded(this.processResponseMetadata(request, response)); + } + + @retryable() + @pessimizable + public async beginTransaction( + txSettings: ITransactionSettings, + settings?: BeginTransactionSettings, + ): Promise { + const request: Ydb.Table.IBeginTransactionRequest = { + sessionId: this.sessionId, + txSettings, + }; + if (settings) { + request.operationParams = settings.operationParams; + } + const response = await this.api.beginTransaction(request); + const payload = getOperationPayload(this.processResponseMetadata(request, response)); + const {txMeta} = BeginTransactionResult.decode(payload); + if (txMeta) { + return txMeta; + } + throw new Error('Could not begin new transaction, txMeta is empty!'); + } + + @retryable() + @pessimizable + public async commitTransaction(txControl: IExistingTransaction, settings?: CommitTransactionSettings): Promise { + const request: Ydb.Table.ICommitTransactionRequest = { + sessionId: this.sessionId, + txId: txControl.txId, + }; + if (settings) { + request.operationParams = settings.operationParams; + request.collectStats = settings.collectStats; + } + const response = await this.api.commitTransaction(request); + ensureOperationSucceeded(this.processResponseMetadata(request, response)); + } + + @retryable() + @pessimizable + public async rollbackTransaction(txControl: IExistingTransaction, settings?: RollbackTransactionSettings): Promise { + const request: Ydb.Table.IRollbackTransactionRequest = { + sessionId: this.sessionId, + txId: txControl.txId, + }; + if (settings) { + request.operationParams = settings.operationParams; + } + const response = await this.api.rollbackTransaction(request); + ensureOperationSucceeded(this.processResponseMetadata(request, response)); + } + + protected processResponseMetadata( + request: object, + response: AsyncResponse, + onResponseMetadata?: (metadata: grpc.Metadata) => void + ) { + const metadata = this.getResponseMetadata(request); + if (metadata) { + const serverHints = metadata.get(ResponseMetadataKeys.ServerHints) || []; + if (serverHints.includes('session-close')) { + this.closing = true; + } + onResponseMetadata?.(metadata); + } + return response; + } + + protected executeStreamRequest, IRes, Res>( + request: Req, + apiStreamMethod: (request: Req, callback: (error: (Error|null), response?: Resp) => void) => void, + transformer: (result: IRes) => Res, + consumer: (result: Res) => void) + : Promise { + return new Promise((resolve, reject) => { + apiStreamMethod(request, (error, response) => { + try { + if (error) { + if (error instanceof StreamEnd) { + resolve(); + } else { + reject(error); + } + } else if (response) { + const operation = { + status: response.status, + issues: response.issues, + } as Ydb.Operations.IOperation; + YdbError.checkStatus(operation); + + if (!response.result) { + reject(new MissingValue('Missing result value!')); + return; + } + + const result = transformer(response.result); + consumer(result); + } + } catch (e) { + reject(e); + } + }); + }); + } +} diff --git a/src/table.ts b/src/table/table-client.ts similarity index 59% rename from src/table.ts rename to src/table/table-client.ts index 6e6ad7ca..339e416f 100644 --- a/src/table.ts +++ b/src/table/table-client.ts @@ -1,47 +1,39 @@ -import _ from 'lodash'; import EventEmitter from 'events'; import * as grpc from '@grpc/grpc-js'; -import {google, Ydb} from 'ydb-sdk-proto'; +import {Ydb} from 'ydb-sdk-proto'; import { AuthenticatedService, ClientOptions, - StreamEnd, ensureOperationSucceeded, getOperationPayload, pessimizable, - AsyncResponse, -} from './utils'; -import DiscoveryService, {Endpoint} from './discovery'; -import {IPoolSettings} from './driver'; -import {ISslCredentials} from './ssl-credentials'; -import {Events, ResponseMetadataKeys, SESSION_KEEPALIVE_PERIOD} from './constants'; -import {IAuthService} from './credentials'; +} from '../utils'; +import {Endpoint} from '../discovery'; +import {ISslCredentials} from '../ssl-credentials'; +import {IAuthService} from '../credentials'; // noinspection ES6PreferShortImport -import {Logger} from './logging'; -import {retryable} from './retries'; -import { - SchemeError, - SessionPoolEmpty, - BadSession, - SessionBusy, - MissingValue, - YdbError, - MissingStatus, -} from './errors'; +import {Logger} from '../logging'; +import {retryable} from '../retries'; +import {MissingStatus, SchemeError} from '../errors'; +import {IClientSettings, SessionPool} from "./session-pool"; +import { + AUTO_TX, + IExistingTransaction, + INewTransaction, + OperationMode, + OperationParamsSettings, + Session, SessionCreator +} from "./session"; import TableService = Ydb.Table.V1.TableService; import CreateSessionRequest = Ydb.Table.CreateSessionRequest; -import ICreateSessionResult = Ydb.Table.ICreateSessionResult; import CreateSessionResult = Ydb.Table.CreateSessionResult; import IQuery = Ydb.Table.IQuery; import IType = Ydb.IType; import DescribeTableResult = Ydb.Table.DescribeTableResult; import PrepareQueryResult = Ydb.Table.PrepareQueryResult; import ExecuteQueryResult = Ydb.Table.ExecuteQueryResult; -import ExplainQueryResult = Ydb.Table.ExplainQueryResult -import ITransactionSettings = Ydb.Table.ITransactionSettings; -import BeginTransactionResult = Ydb.Table.BeginTransactionResult; -import ITransactionMeta = Ydb.Table.ITransactionMeta; +import ExplainQueryResult = Ydb.Table.ExplainQueryResult; import AutoPartitioningPolicy = Ydb.Table.PartitioningPolicy.AutoPartitioningPolicy; import ITypedValue = Ydb.ITypedValue; import FeatureFlag = Ydb.FeatureFlag.Status; @@ -50,15 +42,8 @@ import ExecuteScanQueryPartialResult = Ydb.Table.ExecuteScanQueryPartialResult; import IKeyRange = Ydb.Table.IKeyRange; import TypedValue = Ydb.TypedValue; import BulkUpsertResult = Ydb.Table.BulkUpsertResult; -import OperationMode = Ydb.Operations.OperationParams.OperationMode; -interface PartialResponse { - status?: (Ydb.StatusIds.StatusCode|null); - issues?: (Ydb.Issue.IIssueMessage[]|null); - result?: (T|null); -} - -export class SessionService extends AuthenticatedService { +export class TableSessionCreator extends AuthenticatedService { public endpoint: Endpoint; private readonly logger: Logger; @@ -71,95 +56,18 @@ export class SessionService extends AuthenticatedService { @retryable() @pessimizable - async create(): Promise { + async create(): Promise { const response = await this.api.createSession(CreateSessionRequest.create()); const payload = getOperationPayload(response); const {sessionId} = CreateSessionResult.decode(payload); - return new Session(this.api, this.endpoint, sessionId, this.logger, this.getResponseMetadata.bind(this)); + return new TableSession(this.api, this.endpoint, sessionId, this.logger, this.getResponseMetadata.bind(this)); } } -enum SessionEvent { - SESSION_RELEASE = 'SESSION_RELEASE', - SESSION_BROKEN = 'SESSION_BROKEN' -} - -interface IExistingTransaction { - txId: string -} -interface INewTransaction { - beginTx: ITransactionSettings, - commitTx: boolean -} - -export const AUTO_TX: INewTransaction = { - beginTx: { - serializableReadWrite: {} - }, - commitTx: true -}; - interface IQueryParams { [k: string]: Ydb.ITypedValue } -export class OperationParams implements Ydb.Operations.IOperationParams { - operationMode?: OperationMode; - operationTimeout?: google.protobuf.IDuration; - cancelAfter?: google.protobuf.IDuration; - labels?: { [k: string]: string }; - reportCostInfo?: Ydb.FeatureFlag.Status; - - withSyncMode() { - this.operationMode = OperationMode.SYNC; - return this; - } - - withAsyncMode() { - this.operationMode = OperationMode.ASYNC; - return this; - } - - withOperationTimeout(duration: google.protobuf.IDuration) { - this.operationTimeout = duration; - return this; - } - - withOperationTimeoutSeconds(seconds: number) { - this.operationTimeout = {seconds}; - return this; - } - - withCancelAfter(duration: google.protobuf.IDuration) { - this.cancelAfter = duration; - return this; - } - - withCancelAfterSeconds(seconds: number) { - this.cancelAfter = {seconds}; - return this; - } - - withLabels(labels: {[k: string]: string}) { - this.labels = labels; - return this; - } - - withReportCostInfo() { - this.reportCostInfo = Ydb.FeatureFlag.Status.ENABLED; - return this; - } -} - -export class OperationParamsSettings { - operationParams?: OperationParams; - - withOperationParams(operationParams: OperationParams) { - this.operationParams = operationParams; - return this; - } -} - export class CreateTableSettings extends OperationParamsSettings { } @@ -199,21 +107,6 @@ export class DescribeTableSettings extends OperationParamsSettings { } } -export class BeginTransactionSettings extends OperationParamsSettings { -} - -export class CommitTransactionSettings extends OperationParamsSettings { - collectStats?: Ydb.Table.QueryStatsCollection.Mode; - - withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) { - this.collectStats = collectStats; - return this; - } -} - -export class RollbackTransactionSettings extends OperationParamsSettings { -} - export class PrepareQuerySettings extends OperationParamsSettings { } @@ -305,58 +198,16 @@ export class ExecuteScanQuerySettings { } } -export class Session extends EventEmitter implements ICreateSessionResult { - private beingDeleted = false; - private free = true; - private closing = false; +export class TableSession extends Session { constructor( - private api: TableService, - public endpoint: Endpoint, - public sessionId: string, - private logger: Logger, - private getResponseMetadata: (request: object) => grpc.Metadata | undefined + api: TableService, + endpoint: Endpoint, + sessionId: string, + logger: Logger, + getResponseMetadata: (request: object) => grpc.Metadata | undefined ) { - super(); - } - - acquire() { - this.free = false; - this.logger.debug(`Acquired session ${this.sessionId} on endpoint ${this.endpoint.toString()}.`); - return this; - } - release() { - this.free = true; - this.logger.debug(`Released session ${this.sessionId} on endpoint ${this.endpoint.toString()}.`); - this.emit(SessionEvent.SESSION_RELEASE, this); - } - - public isFree() { - return this.free && !this.isDeleted(); - } - public isClosing() { - return this.closing; - } - public isDeleted() { - return this.beingDeleted; - } - - @retryable() - @pessimizable - public async delete(): Promise { - if (this.isDeleted()) { - return Promise.resolve(); - } - this.beingDeleted = true; - ensureOperationSucceeded(await this.api.deleteSession({sessionId: this.sessionId})); - } - - @retryable() - @pessimizable - public async keepAlive(): Promise { - const request = {sessionId: this.sessionId}; - const response = await this.api.keepAlive(request); - ensureOperationSucceeded(this.processResponseMetadata(request, response)); + super(api, endpoint, sessionId, logger, getResponseMetadata); } @retryable() @@ -460,57 +311,6 @@ export class Session extends EventEmitter implements ICreateSessionResult { return Ydb.Table.DescribeTableOptionsResult.decode(payload); } - @retryable() - @pessimizable - public async beginTransaction( - txSettings: ITransactionSettings, - settings?: BeginTransactionSettings, - ): Promise { - const request: Ydb.Table.IBeginTransactionRequest = { - sessionId: this.sessionId, - txSettings, - }; - if (settings) { - request.operationParams = settings.operationParams; - } - const response = await this.api.beginTransaction(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response)); - const {txMeta} = BeginTransactionResult.decode(payload); - if (txMeta) { - return txMeta; - } - throw new Error('Could not begin new transaction, txMeta is empty!'); - } - - @retryable() - @pessimizable - public async commitTransaction(txControl: IExistingTransaction, settings?: CommitTransactionSettings): Promise { - const request: Ydb.Table.ICommitTransactionRequest = { - sessionId: this.sessionId, - txId: txControl.txId, - }; - if (settings) { - request.operationParams = settings.operationParams; - request.collectStats = settings.collectStats; - } - const response = await this.api.commitTransaction(request); - ensureOperationSucceeded(this.processResponseMetadata(request, response)); - } - - @retryable() - @pessimizable - public async rollbackTransaction(txControl: IExistingTransaction, settings?: RollbackTransactionSettings): Promise { - const request: Ydb.Table.IRollbackTransactionRequest = { - sessionId: this.sessionId, - txId: txControl.txId, - }; - if (settings) { - request.operationParams = settings.operationParams; - } - const response = await this.api.rollbackTransaction(request); - ensureOperationSucceeded(this.processResponseMetadata(request, response)); - } - @retryable() @pessimizable public async prepareQuery(queryText: string, settings?: PrepareQuerySettings): Promise { @@ -567,22 +367,6 @@ export class Session extends EventEmitter implements ICreateSessionResult { return ExecuteQueryResult.decode(payload); } - private processResponseMetadata( - request: object, - response: AsyncResponse, - onResponseMetadata?: (metadata: grpc.Metadata) => void - ) { - const metadata = this.getResponseMetadata(request); - if (metadata) { - const serverHints = metadata.get(ResponseMetadataKeys.ServerHints) || []; - if (serverHints.includes('session-close')) { - this.closing = true; - } - onResponseMetadata?.(metadata); - } - return response; - } - @pessimizable public async bulkUpsert(tablePath: string, rows: TypedValue, settings?: BulkUpsertSettings) { const request: Ydb.Table.IBulkUpsertRequest = { @@ -654,43 +438,6 @@ export class Session extends EventEmitter implements ICreateSessionResult { consumer); } - private executeStreamRequest, IRes, Res>( - request: Req, - apiStreamMethod: (request: Req, callback: (error: (Error|null), response?: Resp) => void) => void, - transformer: (result: IRes) => Res, - consumer: (result: Res) => void) - : Promise { - return new Promise((resolve, reject) => { - apiStreamMethod(request, (error, response) => { - try { - if (error) { - if (error instanceof StreamEnd) { - resolve(); - } else { - reject(error); - } - } else if (response) { - const operation = { - status: response.status, - issues: response.issues, - } as Ydb.Operations.IOperation; - YdbError.checkStatus(operation); - - if (!response.result) { - reject(new MissingValue('Missing result value!')); - return; - } - - const result = transformer(response.result); - consumer(result); - } - } catch (e) { - reject(e); - } - }); - }); - } - public async explainQuery(query: string, operationParams?: Ydb.Operations.IOperationParams): Promise { const request: Ydb.Table.IExplainDataQueryRequest = { sessionId: this.sessionId, @@ -703,223 +450,31 @@ export class Session extends EventEmitter implements ICreateSessionResult { } } -type SessionCallback = (session: Session) => Promise; - -interface ITableClientSettings { - database: string; - authService: IAuthService; - sslCredentials?: ISslCredentials; - poolSettings?: IPoolSettings; - clientOptions?: ClientOptions; - discoveryService: DiscoveryService; - logger: Logger; -} - -export class SessionPool extends EventEmitter { - private readonly database: string; - private readonly authService: IAuthService; - private readonly sslCredentials?: ISslCredentials; - private readonly clientOptions?: ClientOptions; - private readonly minLimit: number; - private readonly maxLimit: number; - private readonly sessions: Set; - private readonly sessionCreators: Map; - private readonly discoveryService: DiscoveryService; - private newSessionsRequested: number; - private sessionsBeingDeleted: number; - private readonly sessionKeepAliveId: NodeJS.Timeout; - private readonly logger: Logger; - private readonly waiters: ((session: Session) => void)[] = []; - - private static SESSION_MIN_LIMIT = 5; - private static SESSION_MAX_LIMIT = 20; - - constructor(settings: ITableClientSettings) { - super(); - this.database = settings.database; - this.authService = settings.authService; - this.sslCredentials = settings.sslCredentials; - this.clientOptions = settings.clientOptions; - this.logger = settings.logger; - const poolSettings = settings.poolSettings; - this.minLimit = poolSettings?.minLimit || SessionPool.SESSION_MIN_LIMIT; - this.maxLimit = poolSettings?.maxLimit || SessionPool.SESSION_MAX_LIMIT; - this.sessions = new Set(); - this.newSessionsRequested = 0; - this.sessionsBeingDeleted = 0; - this.sessionKeepAliveId = this.initListeners(poolSettings?.keepAlivePeriod || SESSION_KEEPALIVE_PERIOD); - this.sessionCreators = new Map(); - this.discoveryService = settings.discoveryService; - this.discoveryService.on(Events.ENDPOINT_REMOVED, (endpoint: Endpoint) => { - this.sessionCreators.delete(endpoint); - }); - this.prepopulateSessions(); - } - - public async destroy(): Promise { - this.logger.debug('Destroying pool...'); - clearInterval(this.sessionKeepAliveId); - await Promise.all(_.map([...this.sessions], (session: Session) => this.deleteSession(session))); - this.logger.debug('Pool has been destroyed.'); - } - - private initListeners(keepAlivePeriod: number) { - return setInterval(async () => Promise.all( - _.map([...this.sessions], (session: Session) => { - return session.keepAlive() - // delete session if error - .catch(() => this.deleteSession(session)) - // ignore errors to avoid UnhandledPromiseRejectionWarning - .catch(() => Promise.resolve()) - }) - ), keepAlivePeriod); - } - - private prepopulateSessions() { - _.forEach(_.range(this.minLimit), () => this.createSession()); - } - - private async getSessionCreator(): Promise { - const endpoint = await this.discoveryService.getEndpoint(); - if (!this.sessionCreators.has(endpoint)) { - const sessionService = new SessionService(endpoint, this.database, this.authService, this.logger, this.sslCredentials, this.clientOptions); - this.sessionCreators.set(endpoint, sessionService); - } - return this.sessionCreators.get(endpoint) as SessionService; - } - - private maybeUseSession(session: Session) { - if (this.waiters.length > 0) { - const waiter = this.waiters.shift(); - if (typeof waiter === "function") { - waiter(session); - return true; - } - } - return false; - } - - private async createSession(): Promise { - const sessionCreator = await this.getSessionCreator(); - const session = await sessionCreator.create(); - session.on(SessionEvent.SESSION_RELEASE, async () => { - if (session.isClosing()) { - await this.deleteSession(session); - } else { - this.maybeUseSession(session); - } - }) - session.on(SessionEvent.SESSION_BROKEN, async () => { - await this.deleteSession(session); - }); - this.sessions.add(session); - return session; - } - - private deleteSession(session: Session): Promise { - if (session.isDeleted()) { - return Promise.resolve(); - } - - this.sessionsBeingDeleted++; - // acquire new session as soon one of existing ones is deleted - if (this.waiters.length > 0) { - this.acquire().then((session) => { - if (!this.maybeUseSession(session)) { - session.release(); - } - }); - } - return session.delete() - // delete session in any case - .finally(() => { - this.sessions.delete(session); - this.sessionsBeingDeleted--; - }); - } - - private acquire(timeout: number = 0): Promise { - for (const session of this.sessions) { - if (session.isFree()) { - return Promise.resolve(session.acquire()); - } - } - - if (this.sessions.size + this.newSessionsRequested - this.sessionsBeingDeleted <= this.maxLimit) { - this.newSessionsRequested++; - return this.createSession() - .then((session) => { - return session.acquire(); - }) - .finally(() => { - this.newSessionsRequested--; - }); - } else { - return new Promise((resolve, reject) => { - let timeoutId: NodeJS.Timeout; - function waiter(session: Session) { - clearTimeout(timeoutId); - resolve(session.acquire()); - } - if (timeout) { - timeoutId = setTimeout(() => { - this.waiters.splice(this.waiters.indexOf(waiter), 1); - reject( - new SessionPoolEmpty(`No session became available within timeout of ${timeout} ms`) - ); - }, timeout); - } - this.waiters.push(waiter); - }); - } - } - - private async _withSession(session: Session, callback: SessionCallback, maxRetries = 0): Promise { - try { - const result = await callback(session); - session.release(); - return result; - } catch (error) { - if (error instanceof BadSession || error instanceof SessionBusy) { - this.logger.debug('Encountered bad or busy session, re-creating the session'); - session.emit(SessionEvent.SESSION_BROKEN); - session = await this.createSession(); - if (maxRetries > 0) { - this.logger.debug(`Re-running operation in new session, ${maxRetries} left.`); - session.acquire(); - return this._withSession(session, callback, maxRetries - 1); - } - } else { - session.release(); - } - throw error; - } - } - - public async withSession(callback: SessionCallback, timeout: number = 0): Promise { - const session = await this.acquire(timeout); - return this._withSession(session, callback); - } - - public async withSessionRetry(callback: SessionCallback, timeout: number = 0, maxRetries = 10): Promise { - const session = await this.acquire(timeout); - return this._withSession(session, callback, maxRetries); +class TableSessionPool extends SessionPool { + protected getSessionServiceCreator( + endpoint: Endpoint, + database: string, + authService: IAuthService, + logger: Logger, + sslCredentials: ISslCredentials | undefined, + clientOptions: ClientOptions | undefined): SessionCreator { + return new TableSessionCreator(endpoint, database, authService, logger, sslCredentials, clientOptions); } } export class TableClient extends EventEmitter { - private pool: SessionPool; + private pool: TableSessionPool; - constructor(settings: ITableClientSettings) { + constructor(settings: IClientSettings) { super(); - this.pool = new SessionPool(settings); + this.pool = new TableSessionPool(settings); } - public async withSession(callback: (session: Session) => Promise, timeout: number = 0): Promise { + public async withSession(callback: (session: TableSession) => Promise, timeout: number = 0): Promise { return this.pool.withSession(callback, timeout); } - public async withSessionRetry(callback: (session: Session) => Promise, timeout: number = 0, maxRetries = 10): Promise { + public async withSessionRetry(callback: (session: TableSession) => Promise, timeout: number = 0, maxRetries = 10): Promise { return this.pool.withSessionRetry(callback, timeout, maxRetries); } diff --git a/src/test-utils.ts b/src/test-utils.ts index f676d290..e06f2062 100644 --- a/src/test-utils.ts +++ b/src/test-utils.ts @@ -2,7 +2,7 @@ import fs from 'fs'; import path from 'path'; import Driver, {IDriverSettings} from "./driver"; import {declareType, TypedData, Types} from "./types"; -import {Column, Session, TableDescription} from "./table"; +import {Column, TableSession, TableDescription} from "./table/table-client"; import {withRetries} from "./retries"; import {AnonymousAuthService} from "./credentials"; @@ -55,7 +55,7 @@ export async function destroyDriver(driver: Driver): Promise { } } -export async function createTable(session: Session) { +export async function createTable(session: TableSession) { await session.dropTable(TABLE); await session.createTable( TABLE, @@ -72,7 +72,7 @@ export async function createTable(session: Session) { ); } -export async function fillTableWithData(session: Session, rows: Row[]) { +export async function fillTableWithData(session: TableSession, rows: Row[]) { const query = ` DECLARE $data AS List>;