Skip to content

Commit

Permalink
feat: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Zorkaltsev committed Jan 31, 2024
1 parent cfb545a commit 7a53a81
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 122 deletions.
2 changes: 1 addition & 1 deletion src/__tests__/e2e/query-service/exec-quуry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ describe('Query service', () => {

const res = await driver.queryClient.exec({
query: 'SELECT * FROM ${TABLE}',
// Is callback a good name?
// Is cb a good name?
callback: (session: QuerySession) => {
// session.beginTransaction(),
// TODO: query -> array
Expand Down
76 changes: 76 additions & 0 deletions src/query/query-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import EventEmitter from "events";
import {RetryParameters, withRetries} from "../retries";
import {BadSession, SessionBusy} from "../errors";
import {SessionEvent} from "../table";
import {QuerySession} from "./query-session";
import {IAuthService} from "../credentials";
import {ISslCredentials} from "../ssl-credentials";
import {IPoolSettings} from "../driver";
import {ClientOptions} from "../utils";
import DiscoveryService from "../discovery";
import {Logger} from "../logging";
import {QuerySessionsPool} from "./query-sessions-pool";
import {Ydb} from "ydb-sdk-proto";
import TransactionSettings = Ydb.Query.TransactionSettings;

type SessionCallback<T> = (session: QuerySession) => Promise<T>;
export interface IQueryClientSettings {
database: string;
authService: IAuthService;
sslCredentials?: ISslCredentials;
poolSettings?: IPoolSettings;
clientOptions?: ClientOptions;
discoveryService: DiscoveryService;
logger: Logger;
}

const RETRY_PARAMETERS = new RetryParameters();

export class QueryClient extends EventEmitter {
private pool: QuerySessionsPool;

constructor(settings: IQueryClientSettings) {
super();
this.pool = new QuerySessionsPool(settings);
}

public async do<T>(options: {
cb: SessionCallback<T>,
tx: TransactionSettings,
// timeout: number | undefined,
}): Promise<T> {
if (!(typeof options.cb === 'function')) throw new Error(`Invalid options.cb: ${options.cb}`);
// if (!(options.timeout === undefined || options.timeout > 0)) throw new Error(`Invalid options.timeout: ${options.timeout}`);

const {cb, tx/*, timeout*/} = options;

return await withRetries(async () => {
const session = await this.pool.acquire(); // TODO: Shouldn't be a separated session acquire timeout
try {
if (tx) {
await session.beginTransaction(tx);
}
const res = cb(session);
if (tx) {
await session.commitTransaction();
}
session.release();
return res;
} catch (error) {
if (error instanceof BadSession || error instanceof SessionBusy) {
session.emit(SessionEvent.SESSION_BROKEN);
} else {
if (tx) {
await session.rollbackTransaction();
}
session.release();
}
throw error;
}
}, RETRY_PARAMETERS);
}

public async destroy() {
await this.pool.destroy();
}
}
55 changes: 30 additions & 25 deletions src/query/query-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@ import ITransactionSettings = Ydb.Query.ITransactionSettings;
import {SessionEvent} from "../table";
import {ensureOperationSucceeded} from "./query-utils";

interface IExistingTransaction {
txId: string
}

export class QuerySession extends EventEmitter implements ICreateSessionResponse {
// TODO: Allocate common functionality with querySession to a sessionBase class. It's likely that commo sessionsPool code will work both Query and Query
private beingDeleted = false;
private free = true;
private closing = false;
private txId?: string;

constructor(
private api: QueryService,
Expand Down Expand Up @@ -79,38 +84,38 @@ export class QuerySession extends EventEmitter implements ICreateSessionResponse
public async beginTransaction(
txSettings: ITransactionSettings,
)/*: Promise<ITransactionMeta>*/ {
// TODO: Add transactions pairs lock. How to handle errors?
if (this.txId) throw new Error('There is already opened transaction');
const request: Ydb.Query.IBeginTransactionRequest = {
sessionId: this.sessionId,
txSettings,
};
const response = ensureOperationSucceeded(await this.api.beginTransaction(request));
const {txMeta} = response;
if (txMeta) {
return txMeta;
}
throw new Error('Could not begin new transaction, txMeta is empty!');
if (!txMeta?.id) throw new Error('Could not begin new transaction, txMeta.id is empty!');
this.txId = txMeta!.id!;
}

// @retryable()
// @pessimizable
// public async commitTransaction(txControl: IExistingTransaction): Promise<void> {
// const request: Ydb.Query.ICommitTransactionRequest = {
// sessionId: this.sessionId,
// txId: txControl.txId,
// };
// const response = await this.api.commitTransaction(request);
// ensureOperationSucceeded(this.processResponseMetadata(request, response));
// }
@retryable()
@pessimizable
public async commitTransaction(): Promise<void> {
if (!this.txId) throw new Error('There is no an open transaction');
const request: Ydb.Query.ICommitTransactionRequest = {
sessionId: this.sessionId,
txId: this.txId,
};
delete this.txId;
ensureOperationSucceeded(await this.api.commitTransaction(request));
}

// @retryable()
// @pessimizable
// public async rollbackTransaction(txControl: IExistingTransaction): Promise<void> {
// const request: Ydb.Query.IRollbackTransactionRequest = {
// sessionId: this.sessionId,
// txId: txControl.txId,
// };
// const response = await this.api.rollbackTransaction(request);
// ensureOperationSucceeded(this.processResponseMetadata(request, response));
// }
@retryable()
@pessimizable
public async rollbackTransaction(): Promise<void> {
if (!this.txId) throw new Error('There is no an open transaction');
const request: Ydb.Query.IRollbackTransactionRequest = {
sessionId: this.sessionId,
txId: this.txId,
};
delete this.txId;
await this.api.rollbackTransaction(request);
}
}
96 changes: 3 additions & 93 deletions src/query/query-sessions-pool.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import {Ydb} from 'ydb-sdk-proto';
import {IAuthService} from "../credentials";
import {ISslCredentials} from "../ssl-credentials";
import {IPoolSettings} from "../driver";
import {AuthenticatedService, ClientOptions, pessimizable} from "../utils";
import DiscoveryService, {Endpoint} from "../discovery";
import {Logger} from "../logging";
import EventEmitter from "events";
import {Events} from "../constants";
import _ from "lodash";
import {BadSession, SessionBusy, SessionPoolEmpty} from "../errors";
import {SessionPoolEmpty} from "../errors";
import {retryable} from "../retries";
import {QuerySession} from "./query-session";
import {SessionEvent} from "../table/session-event";
import {ensureOperationSucceeded} from "./query-utils";
import {IQueryClientSettings} from "./query-client";
import QueryService = Ydb.Query.V1.QueryService;
import CreateSessionRequest = Ydb.Query.CreateSessionRequest;
import {ensureOperationSucceeded} from "./query-utils";

export class QuerySessionCreator extends AuthenticatedService<QueryService> {
public endpoint: Endpoint;
Expand All @@ -40,18 +40,6 @@ export class QuerySessionCreator extends AuthenticatedService<QueryService> {
}
}

type SessionCallback<T> = (session: QuerySession) => Promise<T>;

interface IQueryClientSettings {
database: string;
authService: IAuthService;
sslCredentials?: ISslCredentials;
poolSettings?: IPoolSettings;
clientOptions?: ClientOptions;
discoveryService: DiscoveryService;
logger: Logger;
}

export class QuerySessionsPool extends EventEmitter {
private readonly database: string;
private readonly authService: IAuthService;
Expand Down Expand Up @@ -198,83 +186,5 @@ export class QuerySessionsPool extends EventEmitter {
});
}
}

public async do<T>(session: QuerySession, callback: SessionCallback<T>, maxRetries = 0): Promise<T> {
try {
const result = await callback(session);
session.release();
return result;
} catch (error) {
// TODO: Change the repetition strategy to one with different delays
// TODO: Remove repetitions on methods (@Retry) within session
// TODO: Add idempotency sign and do method with named parameters
// TODO: Мark _withSession as deprecated. Consider all operationj NOT idempotent
if (error instanceof BadSession || error instanceof SessionBusy) {
this.logger.debug('Encountered bad or busy session, re-creating the session');
session.emit(SessionEvent.SESSION_BROKEN);
session = await this.createSession();
if (maxRetries > 0) {
this.logger.debug(`Re-running operation in new session, ${maxRetries} left.`);
session.acquire();
return this._withSession(session, callback, maxRetries - 1);
}
} else {
session.release();
}
throw error;
}
}

public async do<T>(options: {
cb: SessionCallback<T>,
timeout: number | undefined,
// TODO: Make all parameters
// txControl
// parameters
// transaction
// retries - // TODO: Should that to be Random strategy
}): T {
const session = await this.acquire(options.timeout);
// TODO: Start transaction
return this._withSession(session, options.cb);
}

/*
public async withSession<T>(callback: SessionCallback<T>, timeout: number = 0): Promise<T> {
const session = await this.acquire(timeout);
return this._withSession(session, callback);
}
public async withSessionRetry<T>(callback: SessionCallback<T>, timeout: number = 0, maxRetries = 10): Promise<T> {
const session = await this.acquire(timeout);
return this._withSession(session, callback, maxRetries);
}
*/
}

export class QueryClient extends EventEmitter {
private pool: QuerySessionsPool;

constructor(settings: IQueryClientSettings) {
super();
this.pool = new QuerySessionsPool(settings);
}

private async do<T>(callback: (session: QuerySession) => Promise<T>, timeout: number = 0): Promise<T> {
return this.pool.withSession(callback, timeout);
}

/*
private async withSession<T>(callback: (session: QuerySession) => Promise<T>, timeout: number = 0): Promise<T> {
return this.pool.withSession(callback, timeout);
}
private async withSessionRetry<T>(callback: (session: QuerySession) => Promise<T>, timeout: number = 0, maxRetries = 10): Promise<T> {
return this.pool.withSessionRetry(callback, timeout, maxRetries);
}
*/

public async destroy() {
await this.pool.destroy();
}
}
1 change: 1 addition & 0 deletions src/table/table-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ export class TableSession extends EventEmitter implements ICreateSessionResult {
txSettings: ITransactionSettings,
settings?: BeginTransactionSettings,
): Promise<ITransactionMeta> {
// TODO: Cosider keep txId in the session (one transaction in a time in a session policy) as i n query service
const request: Ydb.Table.IBeginTransactionRequest = {
sessionId: this.sessionId,
txSettings,
Expand Down
6 changes: 3 additions & 3 deletions src/table/table-sessions-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ export class TableSessionsPool extends EventEmitter {
this.sessionsBeingDeleted++;
// acquire new session as soon one of existing ones is deleted
if (this.waiters.length > 0) {
this.acquire().then((session) => {
if (!this.maybeUseSession(session)) {
session.release();
this.acquire().then((newSession) => {
if (!this.maybeUseSession(newSession)) {
newSession.release();
}
});
}
Expand Down

0 comments on commit 7a53a81

Please sign in to comment.