diff --git "a/src/__tests__/e2e/query-service/exec-qu\321\203ry.test.ts" "b/src/__tests__/e2e/query-service/exec-qu\321\203ry.test.ts" index 31414a4a..34ade693 100644 --- "a/src/__tests__/e2e/query-service/exec-qu\321\203ry.test.ts" +++ "b/src/__tests__/e2e/query-service/exec-qu\321\203ry.test.ts" @@ -49,7 +49,7 @@ describe('Query service', () => { const res = await driver.queryClient.exec({ query: 'SELECT * FROM ${TABLE}', - // Is callback a good name? + // Is cb a good name? callback: (session: QuerySession) => { // session.beginTransaction(), // TODO: query -> array diff --git a/src/query/query-client.ts b/src/query/query-client.ts new file mode 100644 index 00000000..3882d377 --- /dev/null +++ b/src/query/query-client.ts @@ -0,0 +1,76 @@ +import EventEmitter from "events"; +import {RetryParameters, withRetries} from "../retries"; +import {BadSession, SessionBusy} from "../errors"; +import {SessionEvent} from "../table"; +import {QuerySession} from "./query-session"; +import {IAuthService} from "../credentials"; +import {ISslCredentials} from "../ssl-credentials"; +import {IPoolSettings} from "../driver"; +import {ClientOptions} from "../utils"; +import DiscoveryService from "../discovery"; +import {Logger} from "../logging"; +import {QuerySessionsPool} from "./query-sessions-pool"; +import {Ydb} from "ydb-sdk-proto"; +import TransactionSettings = Ydb.Query.TransactionSettings; + +type SessionCallback = (session: QuerySession) => Promise; +export interface IQueryClientSettings { + database: string; + authService: IAuthService; + sslCredentials?: ISslCredentials; + poolSettings?: IPoolSettings; + clientOptions?: ClientOptions; + discoveryService: DiscoveryService; + logger: Logger; +} + +const RETRY_PARAMETERS = new RetryParameters(); + +export class QueryClient extends EventEmitter { + private pool: QuerySessionsPool; + + constructor(settings: IQueryClientSettings) { + super(); + this.pool = new QuerySessionsPool(settings); + } + + public async do(options: { + cb: SessionCallback, + tx: TransactionSettings, + // timeout: number | undefined, + }): Promise { + if (!(typeof options.cb === 'function')) throw new Error(`Invalid options.cb: ${options.cb}`); + // if (!(options.timeout === undefined || options.timeout > 0)) throw new Error(`Invalid options.timeout: ${options.timeout}`); + + const {cb, tx/*, timeout*/} = options; + + return await withRetries(async () => { + const session = await this.pool.acquire(); // TODO: Shouldn't be a separated session acquire timeout + try { + if (tx) { + await session.beginTransaction(tx); + } + const res = cb(session); + if (tx) { + await session.commitTransaction(); + } + session.release(); + return res; + } catch (error) { + if (error instanceof BadSession || error instanceof SessionBusy) { + session.emit(SessionEvent.SESSION_BROKEN); + } else { + if (tx) { + await session.rollbackTransaction(); + } + session.release(); + } + throw error; + } + }, RETRY_PARAMETERS); + } + + public async destroy() { + await this.pool.destroy(); + } +} diff --git a/src/query/query-session.ts b/src/query/query-session.ts index 5ffdaa3d..c0e9b06d 100644 --- a/src/query/query-session.ts +++ b/src/query/query-session.ts @@ -12,11 +12,16 @@ import ITransactionSettings = Ydb.Query.ITransactionSettings; import {SessionEvent} from "../table"; import {ensureOperationSucceeded} from "./query-utils"; +interface IExistingTransaction { + txId: string +} + export class QuerySession extends EventEmitter implements ICreateSessionResponse { // TODO: Allocate common functionality with querySession to a sessionBase class. It's likely that commo sessionsPool code will work both Query and Query private beingDeleted = false; private free = true; private closing = false; + private txId?: string; constructor( private api: QueryService, @@ -79,38 +84,38 @@ export class QuerySession extends EventEmitter implements ICreateSessionResponse public async beginTransaction( txSettings: ITransactionSettings, )/*: Promise*/ { - // TODO: Add transactions pairs lock. How to handle errors? + if (this.txId) throw new Error('There is already opened transaction'); const request: Ydb.Query.IBeginTransactionRequest = { sessionId: this.sessionId, txSettings, }; const response = ensureOperationSucceeded(await this.api.beginTransaction(request)); const {txMeta} = response; - if (txMeta) { - return txMeta; - } - throw new Error('Could not begin new transaction, txMeta is empty!'); + if (!txMeta?.id) throw new Error('Could not begin new transaction, txMeta.id is empty!'); + this.txId = txMeta!.id!; } - // @retryable() - // @pessimizable - // public async commitTransaction(txControl: IExistingTransaction): Promise { - // const request: Ydb.Query.ICommitTransactionRequest = { - // sessionId: this.sessionId, - // txId: txControl.txId, - // }; - // const response = await this.api.commitTransaction(request); - // ensureOperationSucceeded(this.processResponseMetadata(request, response)); - // } + @retryable() + @pessimizable + public async commitTransaction(): Promise { + if (!this.txId) throw new Error('There is no an open transaction'); + const request: Ydb.Query.ICommitTransactionRequest = { + sessionId: this.sessionId, + txId: this.txId, + }; + delete this.txId; + ensureOperationSucceeded(await this.api.commitTransaction(request)); + } - // @retryable() - // @pessimizable - // public async rollbackTransaction(txControl: IExistingTransaction): Promise { - // const request: Ydb.Query.IRollbackTransactionRequest = { - // sessionId: this.sessionId, - // txId: txControl.txId, - // }; - // const response = await this.api.rollbackTransaction(request); - // ensureOperationSucceeded(this.processResponseMetadata(request, response)); - // } + @retryable() + @pessimizable + public async rollbackTransaction(): Promise { + if (!this.txId) throw new Error('There is no an open transaction'); + const request: Ydb.Query.IRollbackTransactionRequest = { + sessionId: this.sessionId, + txId: this.txId, + }; + delete this.txId; + await this.api.rollbackTransaction(request); + } } diff --git a/src/query/query-sessions-pool.ts b/src/query/query-sessions-pool.ts index 396a7018..baf7a170 100644 --- a/src/query/query-sessions-pool.ts +++ b/src/query/query-sessions-pool.ts @@ -1,20 +1,20 @@ import {Ydb} from 'ydb-sdk-proto'; import {IAuthService} from "../credentials"; import {ISslCredentials} from "../ssl-credentials"; -import {IPoolSettings} from "../driver"; import {AuthenticatedService, ClientOptions, pessimizable} from "../utils"; import DiscoveryService, {Endpoint} from "../discovery"; import {Logger} from "../logging"; import EventEmitter from "events"; import {Events} from "../constants"; import _ from "lodash"; -import {BadSession, SessionBusy, SessionPoolEmpty} from "../errors"; +import {SessionPoolEmpty} from "../errors"; import {retryable} from "../retries"; import {QuerySession} from "./query-session"; import {SessionEvent} from "../table/session-event"; +import {ensureOperationSucceeded} from "./query-utils"; +import {IQueryClientSettings} from "./query-client"; import QueryService = Ydb.Query.V1.QueryService; import CreateSessionRequest = Ydb.Query.CreateSessionRequest; -import {ensureOperationSucceeded} from "./query-utils"; export class QuerySessionCreator extends AuthenticatedService { public endpoint: Endpoint; @@ -40,18 +40,6 @@ export class QuerySessionCreator extends AuthenticatedService { } } -type SessionCallback = (session: QuerySession) => Promise; - -interface IQueryClientSettings { - database: string; - authService: IAuthService; - sslCredentials?: ISslCredentials; - poolSettings?: IPoolSettings; - clientOptions?: ClientOptions; - discoveryService: DiscoveryService; - logger: Logger; -} - export class QuerySessionsPool extends EventEmitter { private readonly database: string; private readonly authService: IAuthService; @@ -198,83 +186,5 @@ export class QuerySessionsPool extends EventEmitter { }); } } - - public async do(session: QuerySession, callback: SessionCallback, maxRetries = 0): Promise { - try { - const result = await callback(session); - session.release(); - return result; - } catch (error) { - // TODO: Change the repetition strategy to one with different delays - // TODO: Remove repetitions on methods (@Retry) within session - // TODO: Add idempotency sign and do method with named parameters - // TODO: Мark _withSession as deprecated. Consider all operationj NOT idempotent - if (error instanceof BadSession || error instanceof SessionBusy) { - this.logger.debug('Encountered bad or busy session, re-creating the session'); - session.emit(SessionEvent.SESSION_BROKEN); - session = await this.createSession(); - if (maxRetries > 0) { - this.logger.debug(`Re-running operation in new session, ${maxRetries} left.`); - session.acquire(); - return this._withSession(session, callback, maxRetries - 1); - } - } else { - session.release(); - } - throw error; - } - } - - public async do(options: { - cb: SessionCallback, - timeout: number | undefined, - // TODO: Make all parameters - // txControl - // parameters - // transaction - // retries - // TODO: Should that to be Random strategy - }): T { - const session = await this.acquire(options.timeout); - // TODO: Start transaction - return this._withSession(session, options.cb); - } - - /* - public async withSession(callback: SessionCallback, timeout: number = 0): Promise { - const session = await this.acquire(timeout); - return this._withSession(session, callback); - } - - public async withSessionRetry(callback: SessionCallback, timeout: number = 0, maxRetries = 10): Promise { - const session = await this.acquire(timeout); - return this._withSession(session, callback, maxRetries); - } - */ } -export class QueryClient extends EventEmitter { - private pool: QuerySessionsPool; - - constructor(settings: IQueryClientSettings) { - super(); - this.pool = new QuerySessionsPool(settings); - } - - private async do(callback: (session: QuerySession) => Promise, timeout: number = 0): Promise { - return this.pool.withSession(callback, timeout); - } - -/* - private async withSession(callback: (session: QuerySession) => Promise, timeout: number = 0): Promise { - return this.pool.withSession(callback, timeout); - } - - private async withSessionRetry(callback: (session: QuerySession) => Promise, timeout: number = 0, maxRetries = 10): Promise { - return this.pool.withSessionRetry(callback, timeout, maxRetries); - } -*/ - - public async destroy() { - await this.pool.destroy(); - } -} diff --git a/src/table/table-session.ts b/src/table/table-session.ts index d82c7cae..f5d77a7c 100644 --- a/src/table/table-session.ts +++ b/src/table/table-session.ts @@ -427,6 +427,7 @@ export class TableSession extends EventEmitter implements ICreateSessionResult { txSettings: ITransactionSettings, settings?: BeginTransactionSettings, ): Promise { + // TODO: Cosider keep txId in the session (one transaction in a time in a session policy) as i n query service const request: Ydb.Table.IBeginTransactionRequest = { sessionId: this.sessionId, txSettings, diff --git a/src/table/table-sessions-pool.ts b/src/table/table-sessions-pool.ts index aa55c07f..89f0d522 100644 --- a/src/table/table-sessions-pool.ts +++ b/src/table/table-sessions-pool.ts @@ -159,9 +159,9 @@ export class TableSessionsPool extends EventEmitter { this.sessionsBeingDeleted++; // acquire new session as soon one of existing ones is deleted if (this.waiters.length > 0) { - this.acquire().then((session) => { - if (!this.maybeUseSession(session)) { - session.release(); + this.acquire().then((newSession) => { + if (!this.maybeUseSession(newSession)) { + newSession.release(); } }); }