diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index d8044662..43c0a1a6 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -13,4 +13,4 @@ jobs: with: github-token: ${{ secrets.YDB_PLATFORM_BOT_TOKEN_REPO }} npm-token: ${{ secrets.NODE_AUTH_TOKEN }} - npm-dist-tag: beta + npm-dist-tag: rc diff --git a/package-lock.json b/package-lock.json index 07f6c8cc..d87ece2a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,7 @@ "luxon": "^3.2.1", "reflect-metadata": "^0.1.13", "uuid": "^8.3.2", - "ydb-sdk-proto": "^1.1.0" + "ydb-sdk-proto": "^1.2.1" }, "devDependencies": { "@commitlint/cli": "^17.6.1", @@ -9236,11 +9236,6 @@ "url": "https://github.com/sponsors/ljharb" } }, - "node_modules/undici-types": { - "version": "5.26.5", - "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz", - "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==" - }, "node_modules/universalify": { "version": "0.1.2", "resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz", @@ -9644,45 +9639,9 @@ } }, "node_modules/ydb-sdk-proto": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.2.0.tgz", - "integrity": "sha512-t6eSjDYr+63o+pHQSbWc14nTx4B1UZRiwaI6cMNSbidjEnqMSzl7KS0qoZqyFejjIM+u5FNbmn9/8UInzBMPlg==", - "dependencies": { - "protobufjs": "^6.11.3" - } - }, - "node_modules/ydb-sdk-proto/node_modules/@types/node": { - "version": "20.11.6", - "resolved": "https://registry.npmjs.org/@types/node/-/node-20.11.6.tgz", - "integrity": "sha512-+EOokTnksGVgip2PbYbr3xnR7kZigh4LbybAfBAw5BpnQ+FqBYUsvCEjYd70IXKlbohQ64mzEYmMtlWUY8q//Q==", - "dependencies": { - "undici-types": "~5.26.4" - } - }, - "node_modules/ydb-sdk-proto/node_modules/protobufjs": { - "version": "6.11.4", - "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-6.11.4.tgz", - "integrity": "sha512-5kQWPaJHi1WoCpjTGszzQ32PG2F4+wRY6BmAT4Vfw56Q2FZ4YZzK20xUYQH4YkfehY1e6QSICrJquM6xXZNcrw==", - "hasInstallScript": true, - "dependencies": { - "@protobufjs/aspromise": "^1.1.2", - "@protobufjs/base64": "^1.1.2", - "@protobufjs/codegen": "^2.0.4", - "@protobufjs/eventemitter": "^1.1.0", - "@protobufjs/fetch": "^1.1.0", - "@protobufjs/float": "^1.0.2", - "@protobufjs/inquire": "^1.1.0", - "@protobufjs/path": "^1.1.2", - "@protobufjs/pool": "^1.1.0", - "@protobufjs/utf8": "^1.1.0", - "@types/long": "^4.0.1", - "@types/node": ">=13.7.0", - "long": "^4.0.0" - }, - "bin": { - "pbjs": "bin/pbjs", - "pbts": "bin/pbts" - } + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.2.1.tgz", + "integrity": "sha512-6/p+LggVbglreOTvqW1PZPvrO0LEVyj+fsSmvH2BCzTm1jAWbqXMocQFBfCRzbOhk07L03/KRS82mS7gIfQ+sA==" }, "node_modules/yn": { "version": "3.1.1", @@ -16706,11 +16665,6 @@ "which-boxed-primitive": "^1.0.2" } }, - "undici-types": { - "version": "5.26.5", - "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz", - "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==" - }, "universalify": { "version": "0.1.2", "resolved": "https://registry.npmjs.org/universalify/-/universalify-0.1.2.tgz", @@ -17026,42 +16980,9 @@ "dev": true }, "ydb-sdk-proto": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.2.0.tgz", - "integrity": "sha512-t6eSjDYr+63o+pHQSbWc14nTx4B1UZRiwaI6cMNSbidjEnqMSzl7KS0qoZqyFejjIM+u5FNbmn9/8UInzBMPlg==", - "requires": { - "protobufjs": "^6.11.3" - }, - "dependencies": { - "@types/node": { - "version": "20.11.6", - "resolved": "https://registry.npmjs.org/@types/node/-/node-20.11.6.tgz", - "integrity": "sha512-+EOokTnksGVgip2PbYbr3xnR7kZigh4LbybAfBAw5BpnQ+FqBYUsvCEjYd70IXKlbohQ64mzEYmMtlWUY8q//Q==", - "requires": { - "undici-types": "~5.26.4" - } - }, - "protobufjs": { - "version": "6.11.4", - "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-6.11.4.tgz", - "integrity": "sha512-5kQWPaJHi1WoCpjTGszzQ32PG2F4+wRY6BmAT4Vfw56Q2FZ4YZzK20xUYQH4YkfehY1e6QSICrJquM6xXZNcrw==", - "requires": { - "@protobufjs/aspromise": "^1.1.2", - "@protobufjs/base64": "^1.1.2", - "@protobufjs/codegen": "^2.0.4", - "@protobufjs/eventemitter": "^1.1.0", - "@protobufjs/fetch": "^1.1.0", - "@protobufjs/float": "^1.0.2", - "@protobufjs/inquire": "^1.1.0", - "@protobufjs/path": "^1.1.2", - "@protobufjs/pool": "^1.1.0", - "@protobufjs/utf8": "^1.1.0", - "@types/long": "^4.0.1", - "@types/node": ">=13.7.0", - "long": "^4.0.0" - } - } - } + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/ydb-sdk-proto/-/ydb-sdk-proto-1.2.1.tgz", + "integrity": "sha512-6/p+LggVbglreOTvqW1PZPvrO0LEVyj+fsSmvH2BCzTm1jAWbqXMocQFBfCRzbOhk07L03/KRS82mS7gIfQ+sA==" }, "yn": { "version": "3.1.1", diff --git a/package.json b/package.json index 613c964c..5028b06d 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,7 @@ "luxon": "^3.2.1", "reflect-metadata": "^0.1.13", "uuid": "^8.3.2", - "ydb-sdk-proto": "^1.1.0" + "ydb-sdk-proto": "^1.2.1" }, "devDependencies": { "@commitlint/cli": "^17.6.1", diff --git a/query/index.ts b/query/index.ts new file mode 100644 index 00000000..a9081699 --- /dev/null +++ b/query/index.ts @@ -0,0 +1 @@ +export * from './query-client'; diff --git a/query/query-client.ts b/query/query-client.ts new file mode 100644 index 00000000..22968b47 --- /dev/null +++ b/query/query-client.ts @@ -0,0 +1,68 @@ +import {Ydb} from "ydb-sdk-proto"; +import QueryService = Ydb.Query.V1.QueryService; +import CreateSessionRequest = Ydb.Query.CreateSessionRequest; +import CreateSessionResult = Ydb.Query.CreateSessionResult; +import EventEmitter from "events"; +import {IClientSettings, SessionPool} from "./session-pool"; +import {Endpoint} from "../discovery"; +import {IAuthService} from "../credentials"; +import {Logger} from "../logging"; +import {ISslCredentials} from "../ssl-credentials"; +import {AuthenticatedService, ClientOptions, getOperationPayload, pessimizable} from "../utils"; +import {SessionCreator} from "../table"; +import {retryable} from "../retries"; +import {QuerySession} from "./query-session"; + +export class QuerySessionCreator extends AuthenticatedService { + public endpoint: Endpoint; + private readonly logger: Logger; + + constructor(endpoint: Endpoint, database: string, authService: IAuthService, logger: Logger, sslCredentials?: ISslCredentials, clientOptions?: ClientOptions) { + const host = endpoint.toString(); + super(host, database, 'Ydb.Query.V1.QueryService', QueryService, authService, sslCredentials, clientOptions); + this.endpoint = endpoint; + this.logger = logger; + } + + @retryable() + @pessimizable + async create(): Promise { + const response = await this.api.createSession(CreateSessionRequest.create()); + const payload = getOperationPayload(response); + const {sessionId} = CreateSessionResult.decode(payload); + return new QuerySession(this.api, this.endpoint, sessionId, this.logger, this.getResponseMetadata.bind(this)); + } +} + +class QuerySessionPool extends SessionPool { + protected getSessionServiceCreator( + endpoint: Endpoint, + database: string, + authService: IAuthService, + logger: Logger, + sslCredentials: ISslCredentials | undefined, + clientOptions: ClientOptions | undefined): SessionCreator { + return new QuerySessionCreator(endpoint, database, authService, logger, sslCredentials, clientOptions); + } +} + +export class QueryClient extends EventEmitter { + private pool: QuerySessionPool; + + constructor(settings: IClientSettings) { + super(); + this.pool = new QuerySessionPool(settings); + } + + public async withSession(callback: (session: QuerySession) => Promise, timeout: number = 0): Promise { + return this.pool.withSession(callback, timeout); + } + + public async withSessionRetry(callback: (session: QuerySession) => Promise, timeout: number = 0, maxRetries = 10): Promise { + return this.pool.withSessionRetry(callback, timeout, maxRetries); + } + + public async destroy() { + await this.pool.destroy(); + } +} diff --git a/query/query-session.ts b/query/query-session.ts new file mode 100644 index 00000000..eed142cd --- /dev/null +++ b/query/query-session.ts @@ -0,0 +1,21 @@ +import {Session} from "../table"; +import {Endpoint} from "../discovery"; +import {Logger} from "../logging"; +import * as grpc from "@grpc/grpc-js"; +import {Ydb} from "ydb-sdk-proto"; +import QueryService = Ydb.Query.V1.QueryService; + +export class QuerySession extends Session { + + constructor( + api: QueryService, + endpoint: Endpoint, + sessionId: string, + logger: Logger, + getResponseMetadata: (request: object) => grpc.Metadata | undefined + ) { + super(api, endpoint, sessionId, logger, getResponseMetadata); + } + + // TODO: Add methods +} diff --git a/src/__tests__/alter-table.test.ts b/src/__tests__/alter-table.test.ts index 190c38e2..9dc40332 100644 --- a/src/__tests__/alter-table.test.ts +++ b/src/__tests__/alter-table.test.ts @@ -6,7 +6,7 @@ import { Column, TableDescription, TableIndex, -} from '../table/table-client'; +} from '../table/table-session'; import { Types } from '../types'; import {OperationParams} from "../table/session"; diff --git a/src/__tests__/bulk-upsert.test.ts b/src/__tests__/bulk-upsert.test.ts index 3bbf7b4c..b10dd4b1 100644 --- a/src/__tests__/bulk-upsert.test.ts +++ b/src/__tests__/bulk-upsert.test.ts @@ -7,7 +7,7 @@ import { Row, TABLE } from '../test-utils'; -import {TableSession} from '../table/table-client'; +import {TableSession} from '../table/table-session'; import {Ydb} from 'ydb-sdk-proto'; async function readTable(session: TableSession): Promise { diff --git a/src/__tests__/bytestring-identity.test.ts b/src/__tests__/bytestring-identity.test.ts index 6aa5d46e..ec2819c5 100644 --- a/src/__tests__/bytestring-identity.test.ts +++ b/src/__tests__/bytestring-identity.test.ts @@ -1,6 +1,6 @@ import Driver from '../driver'; import {destroyDriver, initDriver, TABLE} from '../test-utils'; -import {Column, TableSession, TableDescription} from '../table/table-client'; +import {Column, TableSession, TableDescription} from '../table/table-session'; import {declareType, TypedData, Types} from '../types'; import {withRetries} from '../retries'; diff --git a/src/__tests__/create-table.test.ts b/src/__tests__/create-table.test.ts index 598cd1d9..939352e3 100644 --- a/src/__tests__/create-table.test.ts +++ b/src/__tests__/create-table.test.ts @@ -1,6 +1,6 @@ import Driver from '../driver'; import {destroyDriver, initDriver} from '../test-utils'; -import {Column, DescribeTableSettings, TableDescription} from '../table/table-client'; +import {Column, DescribeTableSettings, TableDescription} from '../table/table-session'; import {TypedValues, Types} from '../types'; import Long from 'long'; import {Ydb} from 'ydb-sdk-proto'; diff --git a/src/__tests__/read-table.test.ts b/src/__tests__/read-table.test.ts index 696c4da0..b83af987 100644 --- a/src/__tests__/read-table.test.ts +++ b/src/__tests__/read-table.test.ts @@ -7,7 +7,7 @@ import { Row, TABLE } from '../test-utils'; -import {ReadTableSettings, TableSession} from '../table/table-client'; +import {ReadTableSettings, TableSession} from '../table/table-session'; import {TypedValues, TypedData} from '../types'; async function readTable(session: TableSession, settings: ReadTableSettings): Promise { diff --git a/src/__tests__/scan-query.test.ts b/src/__tests__/scan-query.test.ts index 0a61a9d4..befba963 100644 --- a/src/__tests__/scan-query.test.ts +++ b/src/__tests__/scan-query.test.ts @@ -7,7 +7,7 @@ import { initDriver, Row, } from '../test-utils'; -import {TableSession} from '../table/table-client'; +import {TableSession} from '../table/table-session'; import {TypedData} from '../types'; async function executeScanQuery(session: TableSession): Promise { diff --git a/src/driver.ts b/src/driver.ts index 250207b9..ce82390a 100644 --- a/src/driver.ts +++ b/src/driver.ts @@ -1,14 +1,14 @@ import DiscoveryService from './discovery'; -import {TableClient} from './table/table-client'; -import SchemeService from './scheme'; +import {SchemeClient} from './scheme'; +// import {QueryClient} from './query/query-client'; import {ENDPOINT_DISCOVERY_PERIOD} from './constants'; import {IAuthService} from './credentials'; import {TimeoutExpired} from './errors'; import {getLogger, Logger} from './logging'; -import SchemeClient from './scheme'; import {ClientOptions} from './utils'; import {parseConnectionString} from './parse-connection-string'; import {makeSslCredentials, ISslCredentials} from './ssl-credentials'; +import {TableClient} from "./table/table-client"; export interface IPoolSettings { minLimit?: number; @@ -38,7 +38,8 @@ export default class Driver { private discoveryService: DiscoveryService; public tableClient: TableClient; - public schemeClient: SchemeService; + public schemeClient: SchemeClient; + // public queryClient: QueryClient; constructor(settings: IDriverSettings) { this.logger = settings.logger || getLogger(); @@ -79,6 +80,15 @@ export default class Driver { discoveryService: this.discoveryService, logger: this.logger, }); + // this.queryClient = new QueryClient({ + // database: this.database, + // authService: this.authService, + // sslCredentials: this.sslCredentials, + // poolSettings: this.poolSettings, + // clientOptions: this.clientOptions, + // discoveryService: this.discoveryService, + // logger: this.logger, + // }); this.schemeClient = new SchemeClient({ database: this.database, authService: this.authService, diff --git a/src/index.ts b/src/index.ts index a823fbd9..c8f112e9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -51,7 +51,7 @@ export { ExecutionPolicy, CachingPolicy, -} from './table/table-client'; +} from './table/table-session'; export { MakeDirectorySettings, RemoveDirectorySettings, diff --git a/src/scheme.ts b/src/scheme.ts index 6b2786fe..321a6095 100644 --- a/src/scheme.ts +++ b/src/scheme.ts @@ -70,7 +70,7 @@ interface ISchemeClientSettings { logger: Logger; } -export default class SchemeClient extends EventEmitter { +export class SchemeClient extends EventEmitter { private schemeServices: Map; constructor(private settings: ISchemeClientSettings) { diff --git a/src/table/index.ts b/src/table/index.ts index b1758268..72657904 100644 --- a/src/table/index.ts +++ b/src/table/index.ts @@ -1,3 +1,4 @@ export * from './session-pool'; export * from './session'; export * from './table-client'; +export * from './table-session'; diff --git a/src/table/session.ts b/src/table/session.ts index 1df8187a..5dbba772 100644 --- a/src/table/session.ts +++ b/src/table/session.ts @@ -1,7 +1,4 @@ -import {google, Ydb} from "ydb-sdk-proto"; -import ITransactionSettings = Ydb.Table.ITransactionSettings; -import BeginTransactionResult = Ydb.Table.BeginTransactionResult; -import ITransactionMeta = Ydb.Table.ITransactionMeta; +import {Ydb} from "ydb-sdk-proto"; export import OperationMode = Ydb.Operations.OperationParams.OperationMode; import EventEmitter from "events"; import {Endpoint} from "../discovery"; @@ -9,25 +6,10 @@ import {Logger} from "../logging"; import * as grpc from "@grpc/grpc-js"; import {SessionEvent} from "./session-pool"; import {retryable} from "../retries"; -import {AsyncResponse, ensureOperationSucceeded, getOperationPayload, pessimizable, StreamEnd} from "../utils"; +import {AsyncResponse, ensureOperationSucceeded, pessimizable, StreamEnd} from "../utils"; import {ResponseMetadataKeys} from "../constants"; import {MissingValue, YdbError} from "../errors"; - -export interface IExistingTransaction { - txId: string -} - -export interface INewTransaction { - beginTx: ITransactionSettings, - commitTx: boolean -} - -export const AUTO_TX: INewTransaction = { - beginTx: { - serializableReadWrite: {} - }, - commitTx: true -}; +import * as $protobuf from "protobufjs"; interface PartialResponse { status?: (Ydb.StatusIds.StatusCode|null); @@ -35,93 +17,18 @@ interface PartialResponse { result?: (T|null); } -export class OperationParams implements Ydb.Operations.IOperationParams { - operationMode?: OperationMode; - operationTimeout?: google.protobuf.IDuration; - cancelAfter?: google.protobuf.IDuration; - labels?: { [k: string]: string }; - reportCostInfo?: Ydb.FeatureFlag.Status; - - withSyncMode() { - this.operationMode = OperationMode.SYNC; - return this; - } - - withAsyncMode() { - this.operationMode = OperationMode.ASYNC; - return this; - } - - withOperationTimeout(duration: google.protobuf.IDuration) { - this.operationTimeout = duration; - return this; - } - - withOperationTimeoutSeconds(seconds: number) { - this.operationTimeout = {seconds}; - return this; - } - - withCancelAfter(duration: google.protobuf.IDuration) { - this.cancelAfter = duration; - return this; - } - - withCancelAfterSeconds(seconds: number) { - this.cancelAfter = {seconds}; - return this; - } - - withLabels(labels: { [k: string]: string }) { - this.labels = labels; - return this; - } - - withReportCostInfo() { - this.reportCostInfo = Ydb.FeatureFlag.Status.ENABLED; - return this; - } -} - -export class OperationParamsSettings { - operationParams?: OperationParams; - - withOperationParams(operationParams: OperationParams) { - this.operationParams = operationParams; - return this; - } -} - -export class BeginTransactionSettings extends OperationParamsSettings { -} - -export class CommitTransactionSettings extends OperationParamsSettings { - collectStats?: Ydb.Table.QueryStatsCollection.Mode; - - withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) { - this.collectStats = collectStats; - return this; - } -} -export class RollbackTransactionSettings extends OperationParamsSettings { -} - -export interface ServiceWithSessionsAndTransactions extends EventEmitter { +export interface ServiceWithSessions extends EventEmitter { createSession(request: Ydb.Table.ICreateSessionRequest): Promise; deleteSession(request: Ydb.Table.IDeleteSessionRequest): Promise; - keepAlive?(request: Ydb.Table.IKeepAliveRequest): Promise; - - beginTransaction(request: Ydb.Table.IBeginTransactionRequest): Promise; - commitTransaction(request: Ydb.Table.ICommitTransactionRequest): Promise; - rollbackTransaction(request: Ydb.Table.IRollbackTransactionRequest): Promise; + Alive?(request: Ydb.Table.IKeepAliveRequest): Promise; } export interface SessionCreator> { create(): Promise; } -export class Session extends EventEmitter implements Ydb.Table.ICreateSessionResult { +export abstract class Session extends EventEmitter implements Ydb.Table.ICreateSessionResult { private beingDeleted = false; private free = true; private closing = false; @@ -164,68 +71,6 @@ export class Session ext ensureOperationSucceeded(await this.api.deleteSession({sessionId: this.sessionId})); } - @retryable() - @pessimizable - public async keepAlive(): Promise { - if (typeof this.api.keepAlive !== 'function') { - throw new Error('This service does not support keep alive method'); - } - const request = {sessionId: this.sessionId}; - const response = await this.api.keepAlive(request); - ensureOperationSucceeded(this.processResponseMetadata(request, response)); - } - - @retryable() - @pessimizable - public async beginTransaction( - txSettings: ITransactionSettings, - settings?: BeginTransactionSettings, - ): Promise { - const request: Ydb.Table.IBeginTransactionRequest = { - sessionId: this.sessionId, - txSettings, - }; - if (settings) { - request.operationParams = settings.operationParams; - } - const response = await this.api.beginTransaction(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response)); - const {txMeta} = BeginTransactionResult.decode(payload); - if (txMeta) { - return txMeta; - } - throw new Error('Could not begin new transaction, txMeta is empty!'); - } - - @retryable() - @pessimizable - public async commitTransaction(txControl: IExistingTransaction, settings?: CommitTransactionSettings): Promise { - const request: Ydb.Table.ICommitTransactionRequest = { - sessionId: this.sessionId, - txId: txControl.txId, - }; - if (settings) { - request.operationParams = settings.operationParams; - request.collectStats = settings.collectStats; - } - const response = await this.api.commitTransaction(request); - ensureOperationSucceeded(this.processResponseMetadata(request, response)); - } - - @retryable() - @pessimizable - public async rollbackTransaction(txControl: IExistingTransaction, settings?: RollbackTransactionSettings): Promise { - const request: Ydb.Table.IRollbackTransactionRequest = { - sessionId: this.sessionId, - txId: txControl.txId, - }; - if (settings) { - request.operationParams = settings.operationParams; - } - const response = await this.api.rollbackTransaction(request); - ensureOperationSucceeded(this.processResponseMetadata(request, response)); - } - protected processResponseMetadata( request: object, response: AsyncResponse, diff --git a/src/table/table-client.ts b/src/table/table-client.ts index 339e416f..7580d73b 100644 --- a/src/table/table-client.ts +++ b/src/table/table-client.ts @@ -1,47 +1,18 @@ -import EventEmitter from 'events'; -import * as grpc from '@grpc/grpc-js'; -import {Ydb} from 'ydb-sdk-proto'; -import { - AuthenticatedService, - ClientOptions, - ensureOperationSucceeded, - getOperationPayload, - pessimizable, -} from '../utils'; -import {Endpoint} from '../discovery'; -import {ISslCredentials} from '../ssl-credentials'; -import {IAuthService} from '../credentials'; -// noinspection ES6PreferShortImport -import {Logger} from '../logging'; -import {retryable} from '../retries'; -import {MissingStatus, SchemeError} from '../errors'; -import {IClientSettings, SessionPool} from "./session-pool"; - -import { - AUTO_TX, - IExistingTransaction, - INewTransaction, - OperationMode, - OperationParamsSettings, - Session, SessionCreator -} from "./session"; +import {Ydb} from "ydb-sdk-proto"; import TableService = Ydb.Table.V1.TableService; import CreateSessionRequest = Ydb.Table.CreateSessionRequest; import CreateSessionResult = Ydb.Table.CreateSessionResult; -import IQuery = Ydb.Table.IQuery; -import IType = Ydb.IType; -import DescribeTableResult = Ydb.Table.DescribeTableResult; -import PrepareQueryResult = Ydb.Table.PrepareQueryResult; -import ExecuteQueryResult = Ydb.Table.ExecuteQueryResult; -import ExplainQueryResult = Ydb.Table.ExplainQueryResult; -import AutoPartitioningPolicy = Ydb.Table.PartitioningPolicy.AutoPartitioningPolicy; -import ITypedValue = Ydb.ITypedValue; -import FeatureFlag = Ydb.FeatureFlag.Status; -import Compression = Ydb.Table.ColumnFamilyPolicy.Compression; -import ExecuteScanQueryPartialResult = Ydb.Table.ExecuteScanQueryPartialResult; -import IKeyRange = Ydb.Table.IKeyRange; -import TypedValue = Ydb.TypedValue; -import BulkUpsertResult = Ydb.Table.BulkUpsertResult; +import EventEmitter from "events"; +import {IClientSettings, SessionPool} from "./session-pool"; +import {Endpoint} from "../discovery"; +import {IAuthService} from "../credentials"; +import {Logger} from "../logging"; +import {ISslCredentials} from "../ssl-credentials"; +import {AuthenticatedService, ClientOptions, getOperationPayload, pessimizable} from "../utils"; +import {SessionCreator} from "./session"; +import {retryable} from "../retries"; +import {TableSession} from "./table-session"; +import TransactionSettings = Ydb.Query.TransactionSettings; export class TableSessionCreator extends AuthenticatedService { public endpoint: Endpoint; @@ -53,7 +24,7 @@ export class TableSessionCreator extends AuthenticatedService { this.endpoint = endpoint; this.logger = logger; } - +7 @retryable() @pessimizable async create(): Promise { @@ -64,392 +35,6 @@ export class TableSessionCreator extends AuthenticatedService { } } -interface IQueryParams { - [k: string]: Ydb.ITypedValue -} - -export class CreateTableSettings extends OperationParamsSettings { -} - -export class AlterTableSettings extends OperationParamsSettings { -} - -interface IDropTableSettings { - muteNonExistingTableErrors: boolean; -} -export class DropTableSettings extends OperationParamsSettings { - muteNonExistingTableErrors: boolean; - - constructor({muteNonExistingTableErrors = true} = {} as IDropTableSettings) { - super(); - this.muteNonExistingTableErrors = muteNonExistingTableErrors; - } -} - -export class DescribeTableSettings extends OperationParamsSettings { - includeShardKeyBounds?: boolean; - includeTableStats?: boolean; - includePartitionStats?: boolean; - - withIncludeShardKeyBounds(includeShardKeyBounds: boolean) { - this.includeShardKeyBounds = includeShardKeyBounds; - return this; - } - - withIncludeTableStats(includeTableStats: boolean) { - this.includeTableStats = includeTableStats; - return this; - } - - withIncludePartitionStats(includePartitionStats: boolean) { - this.includePartitionStats = includePartitionStats; - return this; - } -} - -export class PrepareQuerySettings extends OperationParamsSettings { -} - -export class ExecuteQuerySettings extends OperationParamsSettings { - keepInCache: boolean = false; - collectStats?: Ydb.Table.QueryStatsCollection.Mode; - onResponseMetadata?: (metadata: grpc.Metadata) => void; - - withKeepInCache(keepInCache: boolean) { - this.keepInCache = keepInCache; - return this; - } - - withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) { - this.collectStats = collectStats; - return this; - } -} - -export class BulkUpsertSettings extends OperationParamsSettings { -} - -export class ReadTableSettings { - columns?: string[]; - ordered?: boolean; - rowLimit?: number; - keyRange?: Ydb.Table.IKeyRange; - - withRowLimit(rowLimit: number) { - this.rowLimit = rowLimit; - return this; - } - - withColumns(...columns: string[]) { - this.columns = columns; - return this; - } - - withOrdered(ordered: boolean) { - this.ordered = ordered; - return this; - } - - withKeyRange(keyRange: IKeyRange) { - this.keyRange = keyRange; - return this; - } - - withKeyGreater(value: ITypedValue) { - this.getOrInitKeyRange().greater = value; - return this; - } - - withKeyGreaterOrEqual(value: ITypedValue) { - this.getOrInitKeyRange().greaterOrEqual = value; - return this; - } - - withKeyLess(value: ITypedValue) { - this.getOrInitKeyRange().less = value; - return this; - } - - withKeyLessOrEqual(value: ITypedValue) { - this.getOrInitKeyRange().lessOrEqual = value; - return this; - } - - private getOrInitKeyRange() { - if (!this.keyRange) { - this.keyRange = {}; - } - return this.keyRange; - } -} - -export class ExecuteScanQuerySettings { - mode?: Ydb.Table.ExecuteScanQueryRequest.Mode; - collectStats?: Ydb.Table.QueryStatsCollection.Mode; - - withMode(mode: Ydb.Table.ExecuteScanQueryRequest.Mode) { - this.mode = mode; - return this; - } - - withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) { - this.collectStats = collectStats; - return this; - } -} - -export class TableSession extends Session { - - constructor( - api: TableService, - endpoint: Endpoint, - sessionId: string, - logger: Logger, - getResponseMetadata: (request: object) => grpc.Metadata | undefined - ) { - super(api, endpoint, sessionId, logger, getResponseMetadata); - } - - @retryable() - @pessimizable - public async createTable( - tablePath: string, - description: TableDescription, - settings?: CreateTableSettings, - ): Promise { - const request: Ydb.Table.ICreateTableRequest = { - ...description, - sessionId: this.sessionId, - path: `${this.endpoint.database}/${tablePath}`, - }; - - if (settings) { - request.operationParams = settings.operationParams; - } - const response = await this.api.createTable(request); - ensureOperationSucceeded(this.processResponseMetadata(request, response)); - } - - @retryable() - @pessimizable - public async alterTable( - tablePath: string, - description: AlterTableDescription, - settings?: AlterTableSettings - ): Promise { - const request: Ydb.Table.IAlterTableRequest = { - ...description, - sessionId: this.sessionId, - path: `${this.endpoint.database}/${tablePath}`, - }; - if (settings) { - request.operationParams = settings.operationParams; - } - - const response = await this.api.alterTable(request); - try { - ensureOperationSucceeded(this.processResponseMetadata(request, response)); - } catch (error) { - // !! does not returns response status if async operation mode - if (request.operationParams?.operationMode !== OperationMode.SYNC && error instanceof MissingStatus) return; - throw error; - } - } - - /* - Drop table located at `tablePath` in the current database. By default dropping non-existent tables does not - throw an error, to throw an error pass `new DropTableSettings({muteNonExistingTableErrors: true})` as 2nd argument. - */ - @retryable() - @pessimizable - public async dropTable(tablePath: string, settings?: DropTableSettings): Promise { - const request: Ydb.Table.IDropTableRequest = { - sessionId: this.sessionId, - path: `${this.endpoint.database}/${tablePath}`, - }; - if (settings) { - request.operationParams = settings.operationParams; - } - settings = settings || new DropTableSettings(); - const suppressedErrors = settings?.muteNonExistingTableErrors ? [SchemeError.status] : []; - const response = await this.api.dropTable(request); - ensureOperationSucceeded(this.processResponseMetadata(request, response), suppressedErrors); - } - - @retryable() - @pessimizable - public async describeTable(tablePath: string, settings?: DescribeTableSettings): Promise { - const request: Ydb.Table.IDescribeTableRequest = { - sessionId: this.sessionId, - path: `${this.endpoint.database}/${tablePath}`, - operationParams: settings?.operationParams, - }; - - if (settings) { - request.includeTableStats = settings.includeTableStats; - request.includeShardKeyBounds = settings.includeShardKeyBounds; - request.includePartitionStats = settings.includePartitionStats; - request.operationParams = settings.operationParams; - } - - const response = await this.api.describeTable(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response)); - return DescribeTableResult.decode(payload); - } - - @retryable() - @pessimizable - public async describeTableOptions( - settings?: DescribeTableSettings, - ): Promise { - const request: Ydb.Table.IDescribeTableOptionsRequest = { - operationParams: settings?.operationParams, - }; - - const response = await this.api.describeTableOptions(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response)); - return Ydb.Table.DescribeTableOptionsResult.decode(payload); - } - - @retryable() - @pessimizable - public async prepareQuery(queryText: string, settings?: PrepareQuerySettings): Promise { - const request: Ydb.Table.IPrepareDataQueryRequest = { - sessionId: this.sessionId, - yqlText: queryText, - }; - if (settings) { - request.operationParams = settings.operationParams; - } - const response = await this.api.prepareDataQuery(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response)); - return PrepareQueryResult.decode(payload); - } - - @pessimizable - public async executeQuery( - query: PrepareQueryResult | string, - params: IQueryParams = {}, - txControl: IExistingTransaction | INewTransaction = AUTO_TX, - settings?: ExecuteQuerySettings, - ): Promise { - this.logger.trace('preparedQuery %o', query); - this.logger.trace('parameters %o', params); - let queryToExecute: IQuery; - let keepInCache = false; - if (typeof query === 'string') { - queryToExecute = { - yqlText: query - }; - if (settings?.keepInCache !== undefined) { - keepInCache = settings.keepInCache; - } - } else { - queryToExecute = { - id: query.queryId - }; - } - const request: Ydb.Table.IExecuteDataQueryRequest = { - sessionId: this.sessionId, - txControl, - parameters: params, - query: queryToExecute, - }; - if (settings) { - request.operationParams = settings.operationParams; - request.collectStats = settings.collectStats; - } - if (keepInCache) { - request.queryCachePolicy = {keepInCache}; - } - const response = await this.api.executeDataQuery(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response, settings?.onResponseMetadata)); - return ExecuteQueryResult.decode(payload); - } - - @pessimizable - public async bulkUpsert(tablePath: string, rows: TypedValue, settings?: BulkUpsertSettings) { - const request: Ydb.Table.IBulkUpsertRequest = { - table: `${this.endpoint.database}/${tablePath}`, - rows, - }; - if (settings) { - request.operationParams = settings.operationParams; - } - const response = await this.api.bulkUpsert(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response)); - return BulkUpsertResult.decode(payload); - } - - @pessimizable - public async streamReadTable( - tablePath: string, - consumer: (result: Ydb.Table.ReadTableResult) => void, - settings?: ReadTableSettings): Promise { - const request: Ydb.Table.IReadTableRequest = { - sessionId: this.sessionId, - path: `${this.endpoint.database}/${tablePath}`, - }; - if (settings) { - request.columns = settings.columns; - request.ordered = settings.ordered; - request.rowLimit = settings.rowLimit; - request.keyRange = settings.keyRange; - } - - return this.executeStreamRequest( - request, - this.api.streamReadTable.bind(this.api), - Ydb.Table.ReadTableResult.create, - consumer); - } - - @pessimizable - public async streamExecuteScanQuery( - query: PrepareQueryResult | string, - consumer: (result: ExecuteScanQueryPartialResult) => void, - params: IQueryParams = {}, - settings?: ExecuteScanQuerySettings): Promise { - let queryToExecute: IQuery; - if (typeof query === 'string') { - queryToExecute = { - yqlText: query - }; - } else { - queryToExecute = { - id: query.queryId - }; - } - - const request: Ydb.Table.IExecuteScanQueryRequest = { - query: queryToExecute, - parameters: params, - mode: settings?.mode || Ydb.Table.ExecuteScanQueryRequest.Mode.MODE_EXEC, - }; - - if (settings) { - request.collectStats = settings.collectStats; - } - - return this.executeStreamRequest( - request, - this.api.streamExecuteScanQuery.bind(this.api), - ExecuteScanQueryPartialResult.create, - consumer); - } - - public async explainQuery(query: string, operationParams?: Ydb.Operations.IOperationParams): Promise { - const request: Ydb.Table.IExplainDataQueryRequest = { - sessionId: this.sessionId, - yqlText: query, - operationParams - }; - const response = await this.api.explainDataQuery(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response)); - return ExplainQueryResult.decode(payload); - } -} - class TableSessionPool extends SessionPool { protected getSessionServiceCreator( endpoint: Endpoint, @@ -483,364 +68,6 @@ export class TableClient extends EventEmitter { } } -export class Column implements Ydb.Table.IColumnMeta { - constructor(public name: string, public type: IType, public family?: string) {} -} - -export class StorageSettings implements Ydb.Table.IStoragePool { - constructor(public media: string) {} -} - -export class ColumnFamilyPolicy implements Ydb.Table.IColumnFamilyPolicy { - public name?: string; - public data?: StorageSettings; - public external?: StorageSettings; - public keepInMemory?: FeatureFlag; - public compression?: Compression; - - withName(name: string) { - this.name = name; - return this; - } - - withData(data: StorageSettings) { - this.data = data; - return this; - } - - withExternal(external: StorageSettings) { - this.external = external; - return this; - } - - withKeepInMemory(keepInMemory: FeatureFlag) { - this.keepInMemory = keepInMemory; - return this; - } - - withCompression(compression: Compression) { - this.compression = compression; - return this; - } -} - -export class StoragePolicy implements Ydb.Table.IStoragePolicy { - public presetName?: string; - public syslog?: StorageSettings; - public log?: StorageSettings; - public data?: StorageSettings; - public external?: StorageSettings; - public keepInMemory?: FeatureFlag; - public columnFamilies: ColumnFamilyPolicy[] = []; - - withPresetName(presetName: string) { - this.presetName = presetName; - return this; - } - - withSyslog(syslog: StorageSettings) { - this.syslog = syslog; - return this; - } - - withLog(log: StorageSettings) { - this.log = log; - return this; - } - - withData(data: StorageSettings) { - this.data = data; - return this; - } - - withExternal(external: StorageSettings) { - this.external = external; - return this; - } - - withKeepInMemory(keepInMemory: FeatureFlag) { - this.keepInMemory = keepInMemory; - return this; - } - - withColumnFamilies(...columnFamilies: ColumnFamilyPolicy[]) { - for (const policy of columnFamilies) { - this.columnFamilies.push(policy); - } - return this; - } -} - -export class ExplicitPartitions implements Ydb.Table.IExplicitPartitions { - constructor(public splitPoints: ITypedValue[]) {} -} - -export class PartitioningPolicy implements Ydb.Table.IPartitioningPolicy { - public presetName?: string; - public autoPartitioning?: AutoPartitioningPolicy; - public uniformPartitions?: number; - public explicitPartitions?: ExplicitPartitions; - - withPresetName(presetName: string) { - this.presetName = presetName; - return this; - } - - withUniformPartitions(uniformPartitions: number) { - this.uniformPartitions = uniformPartitions; - return this; - } - - withAutoPartitioning(autoPartitioning: AutoPartitioningPolicy) { - this.autoPartitioning = autoPartitioning; - return this; - } - - withExplicitPartitions(explicitPartitions: ExplicitPartitions) { - this.explicitPartitions = explicitPartitions; - return this; - } -} - -export class ReplicationPolicy implements Ydb.Table.IReplicationPolicy { - presetName?: string; - replicasCount?: number; - createPerAvailabilityZone?: FeatureFlag; - allowPromotion?: FeatureFlag; - - withPresetName(presetName: string) { - this.presetName = presetName; - return this; - } - - withReplicasCount(replicasCount: number) { - this.replicasCount = replicasCount; - return this; - } - - withCreatePerAvailabilityZone(createPerAvailabilityZone: FeatureFlag) { - this.createPerAvailabilityZone = createPerAvailabilityZone; - return this; - } - - withAllowPromotion(allowPromotion: FeatureFlag) { - this.allowPromotion = allowPromotion; - return this; - } -} - -export class CompactionPolicy implements Ydb.Table.ICompactionPolicy { - constructor(public presetName: string) {} -} - -export class ExecutionPolicy implements Ydb.Table.IExecutionPolicy { - constructor(public presetName: string) {} -} - -export class CachingPolicy implements Ydb.Table.ICachingPolicy { - constructor(public presetName: string) {} -} - -export class TableProfile implements Ydb.Table.ITableProfile { - public presetName?: string; - public storagePolicy?: StoragePolicy; - public compactionPolicy?: CompactionPolicy; - public partitioningPolicy?: PartitioningPolicy; - public executionPolicy?: ExecutionPolicy; - public replicationPolicy?: ReplicationPolicy; - public cachingPolicy?: CachingPolicy; - - withPresetName(presetName: string) { - this.presetName = presetName; - return this; - } - - withStoragePolicy(storagePolicy: StoragePolicy) { - this.storagePolicy = storagePolicy; - return this; - } - - withCompactionPolicy(compactionPolicy: CompactionPolicy) { - this.compactionPolicy = compactionPolicy; - return this; - } - - withPartitioningPolicy(partitioningPolicy: PartitioningPolicy) { - this.partitioningPolicy = partitioningPolicy; - return this; - } - - withExecutionPolicy(executionPolicy: ExecutionPolicy) { - this.executionPolicy = executionPolicy; - return this; - } - - withReplicationPolicy(replicationPolicy: ReplicationPolicy) { - this.replicationPolicy = replicationPolicy; - return this; - } - - withCachingPolicy(cachingPolicy: CachingPolicy) { - this.cachingPolicy = cachingPolicy; - return this; - } -} - -export class TableIndex implements Ydb.Table.ITableIndex { - public indexColumns: string[] = []; - public dataColumns: string[] | null = null; - public globalIndex: Ydb.Table.IGlobalIndex|null = null; - public globalAsyncIndex: Ydb.Table.IGlobalAsyncIndex|null = null; - - constructor(public name: string) {} - - withIndexColumns(...indexColumns: string[]) { - this.indexColumns.push(...indexColumns); - return this; - } - - /** Adds [covering index](https://ydb.tech/en/docs/concepts/secondary_indexes#covering) over columns */ - withDataColumns(...dataColumns: string[]) { - if(!this.dataColumns) this.dataColumns = [] - this.dataColumns?.push(...dataColumns) - return this - } - - withGlobalAsync(isAsync: boolean) { - if(isAsync) { - this.globalAsyncIndex = new Ydb.Table.GlobalAsyncIndex() - this.globalIndex = null - } - else { - this.globalAsyncIndex = null - this.globalIndex = new Ydb.Table.GlobalIndex() - } - return this - } -} - -export class TtlSettings implements Ydb.Table.ITtlSettings { - public dateTypeColumn?: Ydb.Table.IDateTypeColumnModeSettings | null; - constructor(columnName: string, expireAfterSeconds: number = 0) { - this.dateTypeColumn = { columnName, expireAfterSeconds }; - } -} - -export class TableDescription implements Ydb.Table.ICreateTableRequest { - /** @deprecated use TableDescription options instead */ - public profile?: TableProfile; - public indexes: TableIndex[] = []; - public ttlSettings?: TtlSettings; - public partitioningSettings?: Ydb.Table.IPartitioningSettings; - public uniformPartitions?: number; - public columnFamilies?: Ydb.Table.IColumnFamily[]; - public attributes?: {[k: string]: string}; - public compactionPolicy?: 'default' | 'small_table' | 'log_table'; - public keyBloomFilter?: FeatureFlag; - public partitionAtKeys?: Ydb.Table.IExplicitPartitions; - public readReplicasSettings?: Ydb.Table.IReadReplicasSettings; - public storageSettings?: Ydb.Table.IStorageSettings; - // path and operationPrams defined in createTable, - // columns and primaryKey are in constructor - - constructor(public columns: Column[] = [], public primaryKey: string[] = []) {} - - withColumn(column: Column) { - this.columns.push(column); - return this; - } - - withColumns(...columns: Column[]) { - for (const column of columns) { - this.columns.push(column); - } - return this; - } - - withPrimaryKey(key: string) { - this.primaryKey.push(key); - return this; - } - - withPrimaryKeys(...keys: string[]) { - for (const key of keys) { - this.primaryKey.push(key); - } - return this; - } - - /** @deprecated use TableDescription options instead */ - withProfile(profile: TableProfile) { - this.profile = profile; - return this; - } - - withIndex(index: TableIndex) { - this.indexes.push(index); - return this; - } - - withIndexes(...indexes: TableIndex[]) { - for (const index of indexes) { - this.indexes.push(index); - } - return this; - } - - withTtl(columnName: string, expireAfterSeconds: number = 0) { - this.ttlSettings = new TtlSettings(columnName, expireAfterSeconds); - return this; - } - - withPartitioningSettings(partitioningSettings: Ydb.Table.IPartitioningSettings) { - this.partitioningSettings = partitioningSettings; - } -} - -export class AlterTableDescription { - public addColumns: Column[] = []; - public dropColumns: string[] = []; - public alterColumns: Column[] = []; - public setTtlSettings?: TtlSettings; - public dropTtlSettings?: {}; - public addIndexes: TableIndex[] = []; - public dropIndexes: string[] = []; - public alterStorageSettings?: Ydb.Table.IStorageSettings; - public addColumnFamilies?: Ydb.Table.IColumnFamily[]; - public alterColumnFamilies?: Ydb.Table.IColumnFamily[]; - public alterAttributes?: { [k: string]: string }; - public setCompactionPolicy?: string; - public alterPartitioningSettings?: Ydb.Table.IPartitioningSettings; - public setKeyBloomFilter?: Ydb.FeatureFlag.Status; - public setReadReplicasSettings?: Ydb.Table.IReadReplicasSettings; - public addChangefeeds?: Ydb.Table.IChangefeed[]; - public dropChangefeeds?: string[]; - public renameIndexes?: Ydb.Table.IRenameIndexItem[]; - - constructor() {} - - withAddColumn(column: Column) { - this.addColumns.push(column); - return this; - } - - withDropColumn(columnName: string) { - this.dropColumns.push(columnName); - return this; - } - - withAlterColumn(column: Column) { - this.alterColumns.push(column); - return this; - } +class Test extends TransactionSettings { - withSetTtl(columnName: string, expireAfterSeconds: number = 0) { - this.setTtlSettings = new TtlSettings(columnName, expireAfterSeconds); - return this; - } - - withDropTtl() { - this.dropTtlSettings = {}; - return this; - } } diff --git a/src/table/table-session.ts b/src/table/table-session.ts new file mode 100644 index 00000000..4189f348 --- /dev/null +++ b/src/table/table-session.ts @@ -0,0 +1,928 @@ +import * as grpc from '@grpc/grpc-js'; +import {google, Ydb} from 'ydb-sdk-proto'; +import BeginTransactionResult = Ydb.Table.BeginTransactionResult; +import ITransactionSettings = Ydb.Table.ITransactionSettings; +import ITransactionMeta = Ydb.Table.ITransactionMeta; +import {ensureOperationSucceeded, getOperationPayload, pessimizable,} from '../utils'; +import {Endpoint} from '../discovery'; +import {Logger} from '../logging'; +import {retryable} from '../retries'; +import {MissingStatus, SchemeError} from '../errors'; +import { + OperationMode, + Session +} from "./session"; +import TableService = Ydb.Table.V1.TableService; +import IQuery = Ydb.Table.IQuery; +import IType = Ydb.IType; +import DescribeTableResult = Ydb.Table.DescribeTableResult; +import PrepareQueryResult = Ydb.Table.PrepareQueryResult; +import ExecuteQueryResult = Ydb.Table.ExecuteQueryResult; +import ExplainQueryResult = Ydb.Table.ExplainQueryResult; +import AutoPartitioningPolicy = Ydb.Table.PartitioningPolicy.AutoPartitioningPolicy; +import ITypedValue = Ydb.ITypedValue; +import FeatureFlag = Ydb.FeatureFlag.Status; +import Compression = Ydb.Table.ColumnFamilyPolicy.Compression; +import ExecuteScanQueryPartialResult = Ydb.Table.ExecuteScanQueryPartialResult; +import IKeyRange = Ydb.Table.IKeyRange; +import TypedValue = Ydb.TypedValue; +import BulkUpsertResult = Ydb.Table.BulkUpsertResult; + +export interface IExistingTransaction { + txId: string +} + +export interface INewTransaction { + beginTx: ITransactionSettings, + commitTx: boolean +} + +export const AUTO_TX: INewTransaction = { + beginTx: { + serializableReadWrite: {} + }, + commitTx: true +}; + +export class OperationParams implements Ydb.Operations.IOperationParams { + operationMode?: OperationMode; + operationTimeout?: google.protobuf.IDuration; + cancelAfter?: google.protobuf.IDuration; + labels?: { [k: string]: string }; + reportCostInfo?: Ydb.FeatureFlag.Status; + + withSyncMode() { + this.operationMode = OperationMode.SYNC; + return this; + } + + withAsyncMode() { + this.operationMode = OperationMode.ASYNC; + return this; + } + + withOperationTimeout(duration: google.protobuf.IDuration) { + this.operationTimeout = duration; + return this; + } + + withOperationTimeoutSeconds(seconds: number) { + this.operationTimeout = {seconds}; + return this; + } + + withCancelAfter(duration: google.protobuf.IDuration) { + this.cancelAfter = duration; + return this; + } + + withCancelAfterSeconds(seconds: number) { + this.cancelAfter = {seconds}; + return this; + } + + withLabels(labels: { [k: string]: string }) { + this.labels = labels; + return this; + } + + withReportCostInfo() { + this.reportCostInfo = Ydb.FeatureFlag.Status.ENABLED; + return this; + } +} + +export class OperationParamsSettings { + operationParams?: OperationParams; + + withOperationParams(operationParams: OperationParams) { + this.operationParams = operationParams; + return this; + } +} + +export class BeginTransactionSettings extends OperationParamsSettings { +} + +export class CommitTransactionSettings extends OperationParamsSettings { + collectStats?: Ydb.Table.QueryStatsCollection.Mode; + + withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) { + this.collectStats = collectStats; + return this; + } +} + +export class RollbackTransactionSettings extends OperationParamsSettings { +} + +interface IQueryParams { + [k: string]: Ydb.ITypedValue +} + +export class CreateTableSettings extends OperationParamsSettings { +} + +export class AlterTableSettings extends OperationParamsSettings { +} + +interface IDropTableSettings { + muteNonExistingTableErrors: boolean; +} + +export class DropTableSettings extends OperationParamsSettings { + muteNonExistingTableErrors: boolean; + + constructor({muteNonExistingTableErrors = true} = {} as IDropTableSettings) { + super(); + this.muteNonExistingTableErrors = muteNonExistingTableErrors; + } +} + +export class DescribeTableSettings extends OperationParamsSettings { + includeShardKeyBounds?: boolean; + includeTableStats?: boolean; + includePartitionStats?: boolean; + + withIncludeShardKeyBounds(includeShardKeyBounds: boolean) { + this.includeShardKeyBounds = includeShardKeyBounds; + return this; + } + + withIncludeTableStats(includeTableStats: boolean) { + this.includeTableStats = includeTableStats; + return this; + } + + withIncludePartitionStats(includePartitionStats: boolean) { + this.includePartitionStats = includePartitionStats; + return this; + } +} + +export class PrepareQuerySettings extends OperationParamsSettings { +} + +export class ExecuteQuerySettings extends OperationParamsSettings { + keepInCache: boolean = false; + collectStats?: Ydb.Table.QueryStatsCollection.Mode; + onResponseMetadata?: (metadata: grpc.Metadata) => void; + + withKeepInCache(keepInCache: boolean) { + this.keepInCache = keepInCache; + return this; + } + + withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) { + this.collectStats = collectStats; + return this; + } +} + +export class BulkUpsertSettings extends OperationParamsSettings { +} + +export class ReadTableSettings { + columns?: string[]; + ordered?: boolean; + rowLimit?: number; + keyRange?: Ydb.Table.IKeyRange; + + withRowLimit(rowLimit: number) { + this.rowLimit = rowLimit; + return this; + } + + withColumns(...columns: string[]) { + this.columns = columns; + return this; + } + + withOrdered(ordered: boolean) { + this.ordered = ordered; + return this; + } + + withKeyRange(keyRange: IKeyRange) { + this.keyRange = keyRange; + return this; + } + + withKeyGreater(value: ITypedValue) { + this.getOrInitKeyRange().greater = value; + return this; + } + + withKeyGreaterOrEqual(value: ITypedValue) { + this.getOrInitKeyRange().greaterOrEqual = value; + return this; + } + + withKeyLess(value: ITypedValue) { + this.getOrInitKeyRange().less = value; + return this; + } + + withKeyLessOrEqual(value: ITypedValue) { + this.getOrInitKeyRange().lessOrEqual = value; + return this; + } + + private getOrInitKeyRange() { + if (!this.keyRange) { + this.keyRange = {}; + } + return this.keyRange; + } +} + +export class ExecuteScanQuerySettings { + mode?: Ydb.Table.ExecuteScanQueryRequest.Mode; + collectStats?: Ydb.Table.QueryStatsCollection.Mode; + + withMode(mode: Ydb.Table.ExecuteScanQueryRequest.Mode) { + this.mode = mode; + return this; + } + + withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) { + this.collectStats = collectStats; + return this; + } +} + +export class TableSession extends Session { + + constructor( + api: TableService, + endpoint: Endpoint, + sessionId: string, + logger: Logger, + getResponseMetadata: (request: object) => grpc.Metadata | undefined + ) { + super(api, endpoint, sessionId, logger, getResponseMetadata); + } + + @retryable() + @pessimizable + public async createTable( + tablePath: string, + description: TableDescription, + settings?: CreateTableSettings, + ): Promise { + const request: Ydb.Table.ICreateTableRequest = { + ...description, + sessionId: this.sessionId, + path: `${this.endpoint.database}/${tablePath}`, + }; + + if (settings) { + request.operationParams = settings.operationParams; + } + const response = await this.api.createTable(request); + ensureOperationSucceeded(this.processResponseMetadata(request, response)); + } + + @retryable() + @pessimizable + public async alterTable( + tablePath: string, + description: AlterTableDescription, + settings?: AlterTableSettings + ): Promise { + const request: Ydb.Table.IAlterTableRequest = { + ...description, + sessionId: this.sessionId, + path: `${this.endpoint.database}/${tablePath}`, + }; + if (settings) { + request.operationParams = settings.operationParams; + } + + const response = await this.api.alterTable(request); + try { + ensureOperationSucceeded(this.processResponseMetadata(request, response)); + } catch (error) { + // !! does not returns response status if async operation mode + if (request.operationParams?.operationMode !== OperationMode.SYNC && error instanceof MissingStatus) return; + throw error; + } + } + + /* + Drop table located at `tablePath` in the current database. By default dropping non-existent tables does not + throw an error, to throw an error pass `new DropTableSettings({muteNonExistingTableErrors: true})` as 2nd argument. + */ + @retryable() + @pessimizable + public async dropTable(tablePath: string, settings?: DropTableSettings): Promise { + const request: Ydb.Table.IDropTableRequest = { + sessionId: this.sessionId, + path: `${this.endpoint.database}/${tablePath}`, + }; + if (settings) { + request.operationParams = settings.operationParams; + } + settings = settings || new DropTableSettings(); + const suppressedErrors = settings?.muteNonExistingTableErrors ? [SchemeError.status] : []; + const response = await this.api.dropTable(request); + ensureOperationSucceeded(this.processResponseMetadata(request, response), suppressedErrors); + } + + @retryable() + @pessimizable + public async describeTable(tablePath: string, settings?: DescribeTableSettings): Promise { + const request: Ydb.Table.IDescribeTableRequest = { + sessionId: this.sessionId, + path: `${this.endpoint.database}/${tablePath}`, + operationParams: settings?.operationParams, + }; + + if (settings) { + request.includeTableStats = settings.includeTableStats; + request.includeShardKeyBounds = settings.includeShardKeyBounds; + request.includePartitionStats = settings.includePartitionStats; + request.operationParams = settings.operationParams; + } + + const response = await this.api.describeTable(request); + const payload = getOperationPayload(this.processResponseMetadata(request, response)); + return DescribeTableResult.decode(payload); + } + + @retryable() + @pessimizable + public async describeTableOptions( + settings?: DescribeTableSettings, + ): Promise { + const request: Ydb.Table.IDescribeTableOptionsRequest = { + operationParams: settings?.operationParams, + }; + + const response = await this.api.describeTableOptions(request); + const payload = getOperationPayload(this.processResponseMetadata(request, response)); + return Ydb.Table.DescribeTableOptionsResult.decode(payload); + } + + @retryable() + @pessimizable + public async prepareQuery(queryText: string, settings?: PrepareQuerySettings): Promise { + const request: Ydb.Table.IPrepareDataQueryRequest = { + sessionId: this.sessionId, + yqlText: queryText, + }; + if (settings) { + request.operationParams = settings.operationParams; + } + const response = await this.api.prepareDataQuery(request); + const payload = getOperationPayload(this.processResponseMetadata(request, response)); + return PrepareQueryResult.decode(payload); + } + + @pessimizable + public async executeQuery( + query: PrepareQueryResult | string, + params: IQueryParams = {}, + txControl: IExistingTransaction | INewTransaction = AUTO_TX, + settings?: ExecuteQuerySettings, + ): Promise { + this.logger.trace('preparedQuery %o', query); + this.logger.trace('parameters %o', params); + let queryToExecute: IQuery; + let keepInCache = false; + if (typeof query === 'string') { + queryToExecute = { + yqlText: query + }; + if (settings?.keepInCache !== undefined) { + keepInCache = settings.keepInCache; + } + } else { + queryToExecute = { + id: query.queryId + }; + } + const request: Ydb.Table.IExecuteDataQueryRequest = { + sessionId: this.sessionId, + txControl, + parameters: params, + query: queryToExecute, + }; + if (settings) { + request.operationParams = settings.operationParams; + request.collectStats = settings.collectStats; + } + if (keepInCache) { + request.queryCachePolicy = {keepInCache}; + } + const response = await this.api.executeDataQuery(request); + const payload = getOperationPayload(this.processResponseMetadata(request, response, settings?.onResponseMetadata)); + return ExecuteQueryResult.decode(payload); + } + + @pessimizable + public async bulkUpsert(tablePath: string, rows: TypedValue, settings?: BulkUpsertSettings) { + const request: Ydb.Table.IBulkUpsertRequest = { + table: `${this.endpoint.database}/${tablePath}`, + rows, + }; + if (settings) { + request.operationParams = settings.operationParams; + } + const response = await this.api.bulkUpsert(request); + const payload = getOperationPayload(this.processResponseMetadata(request, response)); + return BulkUpsertResult.decode(payload); + } + + @pessimizable + public async streamReadTable( + tablePath: string, + consumer: (result: Ydb.Table.ReadTableResult) => void, + settings?: ReadTableSettings): Promise { + const request: Ydb.Table.IReadTableRequest = { + sessionId: this.sessionId, + path: `${this.endpoint.database}/${tablePath}`, + }; + if (settings) { + request.columns = settings.columns; + request.ordered = settings.ordered; + request.rowLimit = settings.rowLimit; + request.keyRange = settings.keyRange; + } + + return this.executeStreamRequest( + request, + this.api.streamReadTable.bind(this.api), + Ydb.Table.ReadTableResult.create, + consumer); + } + + @pessimizable + public async streamExecuteScanQuery( + query: PrepareQueryResult | string, + consumer: (result: ExecuteScanQueryPartialResult) => void, + params: IQueryParams = {}, + settings?: ExecuteScanQuerySettings): Promise { + let queryToExecute: IQuery; + if (typeof query === 'string') { + queryToExecute = { + yqlText: query + }; + } else { + queryToExecute = { + id: query.queryId + }; + } + + const request: Ydb.Table.IExecuteScanQueryRequest = { + query: queryToExecute, + parameters: params, + mode: settings?.mode || Ydb.Table.ExecuteScanQueryRequest.Mode.MODE_EXEC, + }; + + if (settings) { + request.collectStats = settings.collectStats; + } + + return this.executeStreamRequest( + request, + this.api.streamExecuteScanQuery.bind(this.api), + ExecuteScanQueryPartialResult.create, + consumer); + } + + public async explainQuery(query: string, operationParams?: Ydb.Operations.IOperationParams): Promise { + const request: Ydb.Table.IExplainDataQueryRequest = { + sessionId: this.sessionId, + yqlText: query, + operationParams + }; + const response = await this.api.explainDataQuery(request); + const payload = getOperationPayload(this.processResponseMetadata(request, response)); + return ExplainQueryResult.decode(payload); + } + + @retryable() + @pessimizable + public async beginTransaction( + txSettings: ITransactionSettings, + settings?: BeginTransactionSettings, + ): Promise { + const request: Ydb.Table.IBeginTransactionRequest = { + sessionId: this.sessionId, + txSettings, + }; + if (settings) { + request.operationParams = settings.operationParams; + } + const response = await this.api.beginTransaction(request); + const payload = getOperationPayload(this.processResponseMetadata(request, response)); + const {txMeta} = BeginTransactionResult.decode(payload); + if (txMeta) { + return txMeta; + } + throw new Error('Could not begin new transaction, txMeta is empty!'); + } + + @retryable() + @pessimizable + public async commitTransaction(txControl: IExistingTransaction, settings?: CommitTransactionSettings): Promise { + const request: Ydb.Table.ICommitTransactionRequest = { + sessionId: this.sessionId, + txId: txControl.txId, + }; + if (settings) { + request.operationParams = settings.operationParams; + request.collectStats = settings.collectStats; + } + const response = await this.api.commitTransaction(request); + ensureOperationSucceeded(this.processResponseMetadata(request, response)); + } + + @retryable() + @pessimizable + public async rollbackTransaction(txControl: IExistingTransaction, settings?: RollbackTransactionSettings): Promise { + const request: Ydb.Table.IRollbackTransactionRequest = { + sessionId: this.sessionId, + txId: txControl.txId, + }; + if (settings) { + request.operationParams = settings.operationParams; + } + const response = await this.api.rollbackTransaction(request); + ensureOperationSucceeded(this.processResponseMetadata(request, response)); + } + + @retryable() + @pessimizable + public async keepAlive(): Promise { + if (typeof this.api.keepAlive !== 'function') { + throw new Error('This service does not support keep alive method'); + } + const request = {sessionId: this.sessionId}; + const response = await this.api.keepAlive(request); + ensureOperationSucceeded(this.processResponseMetadata(request, response)); + } +} + +export class Column implements Ydb.Table.IColumnMeta { + constructor(public name: string, public type: IType, public family?: string) {} +} + +export class StorageSettings implements Ydb.Table.IStoragePool { + constructor(public media: string) {} +} + +export class ColumnFamilyPolicy implements Ydb.Table.IColumnFamilyPolicy { + public name?: string; + public data?: StorageSettings; + public external?: StorageSettings; + public keepInMemory?: FeatureFlag; + public compression?: Compression; + + withName(name: string) { + this.name = name; + return this; + } + + withData(data: StorageSettings) { + this.data = data; + return this; + } + + withExternal(external: StorageSettings) { + this.external = external; + return this; + } + + withKeepInMemory(keepInMemory: FeatureFlag) { + this.keepInMemory = keepInMemory; + return this; + } + + withCompression(compression: Compression) { + this.compression = compression; + return this; + } +} + +export class StoragePolicy implements Ydb.Table.IStoragePolicy { + public presetName?: string; + public syslog?: StorageSettings; + public log?: StorageSettings; + public data?: StorageSettings; + public external?: StorageSettings; + public keepInMemory?: FeatureFlag; + public columnFamilies: ColumnFamilyPolicy[] = []; + + withPresetName(presetName: string) { + this.presetName = presetName; + return this; + } + + withSyslog(syslog: StorageSettings) { + this.syslog = syslog; + return this; + } + + withLog(log: StorageSettings) { + this.log = log; + return this; + } + + withData(data: StorageSettings) { + this.data = data; + return this; + } + + withExternal(external: StorageSettings) { + this.external = external; + return this; + } + + withKeepInMemory(keepInMemory: FeatureFlag) { + this.keepInMemory = keepInMemory; + return this; + } + + withColumnFamilies(...columnFamilies: ColumnFamilyPolicy[]) { + for (const policy of columnFamilies) { + this.columnFamilies.push(policy); + } + return this; + } +} + +export class ExplicitPartitions implements Ydb.Table.IExplicitPartitions { + constructor(public splitPoints: ITypedValue[]) {} +} + +export class PartitioningPolicy implements Ydb.Table.IPartitioningPolicy { + public presetName?: string; + public autoPartitioning?: AutoPartitioningPolicy; + public uniformPartitions?: number; + public explicitPartitions?: ExplicitPartitions; + + withPresetName(presetName: string) { + this.presetName = presetName; + return this; + } + + withUniformPartitions(uniformPartitions: number) { + this.uniformPartitions = uniformPartitions; + return this; + } + + withAutoPartitioning(autoPartitioning: AutoPartitioningPolicy) { + this.autoPartitioning = autoPartitioning; + return this; + } + + withExplicitPartitions(explicitPartitions: ExplicitPartitions) { + this.explicitPartitions = explicitPartitions; + return this; + } +} + +export class ReplicationPolicy implements Ydb.Table.IReplicationPolicy { + presetName?: string; + replicasCount?: number; + createPerAvailabilityZone?: FeatureFlag; + allowPromotion?: FeatureFlag; + + withPresetName(presetName: string) { + this.presetName = presetName; + return this; + } + + withReplicasCount(replicasCount: number) { + this.replicasCount = replicasCount; + return this; + } + + withCreatePerAvailabilityZone(createPerAvailabilityZone: FeatureFlag) { + this.createPerAvailabilityZone = createPerAvailabilityZone; + return this; + } + + withAllowPromotion(allowPromotion: FeatureFlag) { + this.allowPromotion = allowPromotion; + return this; + } +} + +export class CompactionPolicy implements Ydb.Table.ICompactionPolicy { + constructor(public presetName: string) {} +} + +export class ExecutionPolicy implements Ydb.Table.IExecutionPolicy { + constructor(public presetName: string) {} +} + +export class CachingPolicy implements Ydb.Table.ICachingPolicy { + constructor(public presetName: string) {} +} + +export class TableProfile implements Ydb.Table.ITableProfile { + public presetName?: string; + public storagePolicy?: StoragePolicy; + public compactionPolicy?: CompactionPolicy; + public partitioningPolicy?: PartitioningPolicy; + public executionPolicy?: ExecutionPolicy; + public replicationPolicy?: ReplicationPolicy; + public cachingPolicy?: CachingPolicy; + + withPresetName(presetName: string) { + this.presetName = presetName; + return this; + } + + withStoragePolicy(storagePolicy: StoragePolicy) { + this.storagePolicy = storagePolicy; + return this; + } + + withCompactionPolicy(compactionPolicy: CompactionPolicy) { + this.compactionPolicy = compactionPolicy; + return this; + } + + withPartitioningPolicy(partitioningPolicy: PartitioningPolicy) { + this.partitioningPolicy = partitioningPolicy; + return this; + } + + withExecutionPolicy(executionPolicy: ExecutionPolicy) { + this.executionPolicy = executionPolicy; + return this; + } + + withReplicationPolicy(replicationPolicy: ReplicationPolicy) { + this.replicationPolicy = replicationPolicy; + return this; + } + + withCachingPolicy(cachingPolicy: CachingPolicy) { + this.cachingPolicy = cachingPolicy; + return this; + } +} + +export class TableIndex implements Ydb.Table.ITableIndex { + public indexColumns: string[] = []; + public dataColumns: string[] | null = null; + public globalIndex: Ydb.Table.IGlobalIndex|null = null; + public globalAsyncIndex: Ydb.Table.IGlobalAsyncIndex|null = null; + + constructor(public name: string) {} + + withIndexColumns(...indexColumns: string[]) { + this.indexColumns.push(...indexColumns); + return this; + } + + /** Adds [covering index](https://ydb.tech/en/docs/concepts/secondary_indexes#covering) over columns */ + withDataColumns(...dataColumns: string[]) { + if(!this.dataColumns) this.dataColumns = [] + this.dataColumns?.push(...dataColumns) + return this + } + + withGlobalAsync(isAsync: boolean) { + if(isAsync) { + this.globalAsyncIndex = new Ydb.Table.GlobalAsyncIndex() + this.globalIndex = null + } + else { + this.globalAsyncIndex = null + this.globalIndex = new Ydb.Table.GlobalIndex() + } + return this + } +} + +export class TtlSettings implements Ydb.Table.ITtlSettings { + public dateTypeColumn?: Ydb.Table.IDateTypeColumnModeSettings | null; + constructor(columnName: string, expireAfterSeconds: number = 0) { + this.dateTypeColumn = { columnName, expireAfterSeconds }; + } +} + +export class TableDescription implements Ydb.Table.ICreateTableRequest { + /** @deprecated use TableDescription options instead */ + public profile?: TableProfile; + public indexes: TableIndex[] = []; + public ttlSettings?: TtlSettings; + public partitioningSettings?: Ydb.Table.IPartitioningSettings; + public uniformPartitions?: number; + public columnFamilies?: Ydb.Table.IColumnFamily[]; + public attributes?: {[k: string]: string}; + public compactionPolicy?: 'default' | 'small_table' | 'log_table'; + public keyBloomFilter?: FeatureFlag; + public partitionAtKeys?: Ydb.Table.IExplicitPartitions; + public readReplicasSettings?: Ydb.Table.IReadReplicasSettings; + public storageSettings?: Ydb.Table.IStorageSettings; + // path and operationPrams defined in createTable, + // columns and primaryKey are in constructor + + constructor(public columns: Column[] = [], public primaryKey: string[] = []) {} + + withColumn(column: Column) { + this.columns.push(column); + return this; + } + + withColumns(...columns: Column[]) { + for (const column of columns) { + this.columns.push(column); + } + return this; + } + + withPrimaryKey(key: string) { + this.primaryKey.push(key); + return this; + } + + withPrimaryKeys(...keys: string[]) { + for (const key of keys) { + this.primaryKey.push(key); + } + return this; + } + + /** @deprecated use TableDescription options instead */ + withProfile(profile: TableProfile) { + this.profile = profile; + return this; + } + + withIndex(index: TableIndex) { + this.indexes.push(index); + return this; + } + + withIndexes(...indexes: TableIndex[]) { + for (const index of indexes) { + this.indexes.push(index); + } + return this; + } + + withTtl(columnName: string, expireAfterSeconds: number = 0) { + this.ttlSettings = new TtlSettings(columnName, expireAfterSeconds); + return this; + } + + withPartitioningSettings(partitioningSettings: Ydb.Table.IPartitioningSettings) { + this.partitioningSettings = partitioningSettings; + } +} + +export class AlterTableDescription { + public addColumns: Column[] = []; + public dropColumns: string[] = []; + public alterColumns: Column[] = []; + public setTtlSettings?: TtlSettings; + public dropTtlSettings?: {}; + public addIndexes: TableIndex[] = []; + public dropIndexes: string[] = []; + public alterStorageSettings?: Ydb.Table.IStorageSettings; + public addColumnFamilies?: Ydb.Table.IColumnFamily[]; + public alterColumnFamilies?: Ydb.Table.IColumnFamily[]; + public alterAttributes?: { [k: string]: string }; + public setCompactionPolicy?: string; + public alterPartitioningSettings?: Ydb.Table.IPartitioningSettings; + public setKeyBloomFilter?: Ydb.FeatureFlag.Status; + public setReadReplicasSettings?: Ydb.Table.IReadReplicasSettings; + public addChangefeeds?: Ydb.Table.IChangefeed[]; + public dropChangefeeds?: string[]; + public renameIndexes?: Ydb.Table.IRenameIndexItem[]; + + constructor() {} + + withAddColumn(column: Column) { + this.addColumns.push(column); + return this; + } + + withDropColumn(columnName: string) { + this.dropColumns.push(columnName); + return this; + } + + withAlterColumn(column: Column) { + this.alterColumns.push(column); + return this; + } + + withSetTtl(columnName: string, expireAfterSeconds: number = 0) { + this.setTtlSettings = new TtlSettings(columnName, expireAfterSeconds); + return this; + } + + withDropTtl() { + this.dropTtlSettings = {}; + return this; + } +} diff --git a/src/test-utils.ts b/src/test-utils.ts index e06f2062..65c949b9 100644 --- a/src/test-utils.ts +++ b/src/test-utils.ts @@ -2,7 +2,7 @@ import fs from 'fs'; import path from 'path'; import Driver, {IDriverSettings} from "./driver"; import {declareType, TypedData, Types} from "./types"; -import {Column, TableSession, TableDescription} from "./table/table-client"; +import {Column, TableSession, TableDescription} from "./table/table-session"; import {withRetries} from "./retries"; import {AnonymousAuthService} from "./credentials";