diff --git a/query-service/exec-query.test.ts b/src/__tests__/e2e/query-service/exec-query.test.ts similarity index 99% rename from query-service/exec-query.test.ts rename to src/__tests__/e2e/query-service/exec-query.test.ts index eb6e72ad..95bfb3c8 100644 --- a/query-service/exec-query.test.ts +++ b/src/__tests__/e2e/query-service/exec-query.test.ts @@ -56,7 +56,6 @@ describe('Query service', () => { }, }); - // { // const rows = await execQuery(session, new ReadTableSettings()); // expect(rows).toEqual(expectedRows); diff --git a/src/__tests__/e2e/table-service/connection.test.ts b/src/__tests__/e2e/table-service/connection.test.ts index 627533fc..bbeb320d 100644 --- a/src/__tests__/e2e/table-service/connection.test.ts +++ b/src/__tests__/e2e/table-service/connection.test.ts @@ -1,5 +1,7 @@ import {initDriver, destroyDriver} from '../../../test-utils'; +Error.stackTraceLimit = Infinity; + describe('Connection', () => { it('Test GRPC connection', async () => { let driver = await initDriver({endpoint: 'grpc://localhost:2136'}); @@ -9,7 +11,7 @@ describe('Connection', () => { await destroyDriver(driver) }); - it('Test GRPCS connection', async () => { + xit('Test GRPCS connection', async () => { let driver = await initDriver(); await driver.tableClient.withSession(async (session) => { await session.executeQuery('SELECT 1'); diff --git a/src/__tests__/e2e/table-service/graceful-session-close.test.ts b/src/__tests__/e2e/table-service/graceful-session-close.test.ts index 5a33d3bd..bf1ff5d5 100644 --- a/src/__tests__/e2e/table-service/graceful-session-close.test.ts +++ b/src/__tests__/e2e/table-service/graceful-session-close.test.ts @@ -1,7 +1,8 @@ import http from 'http'; import Driver from "../../../driver"; import {destroyDriver, initDriver} from "../../../test-utils"; -import {sleep} from "../../../utils"; + +import {sleep} from "../../../utils/sleep"; const SHUTDOWN_URL = process.env.YDB_SHUTDOWN_URL || 'http://localhost:8765/actors/kqp_proxy?force_shutdown=all'; diff --git a/src/__tests__/e2e/table-service/retries.test.ts b/src/__tests__/e2e/table-service/retries.test.ts index ef08830d..3ac46043 100644 --- a/src/__tests__/e2e/table-service/retries.test.ts +++ b/src/__tests__/e2e/table-service/retries.test.ts @@ -22,7 +22,8 @@ import { import {FallbackLogger} from '../../../logging'; import {RetryParameters, retryable} from '../../../retries'; import {destroyDriver, initDriver} from '../../../test-utils'; -import {pessimizable} from '../../../utils'; + +import {pessimizable} from "../../../utils"; const logger = new FallbackLogger({level: 'error'}); class ErrorThrower { diff --git a/src/__tests__/unit/BackoffSettings.test.ts b/src/__tests__/unit/BackoffSettings.test.ts index e031e9d6..1d32333b 100644 --- a/src/__tests__/unit/BackoffSettings.test.ts +++ b/src/__tests__/unit/BackoffSettings.test.ts @@ -1,5 +1,5 @@ import {BackoffSettings} from '../../retries'; -import * as utils from '../../utils'; +import * as utils from '../../utils/sleep'; function runTest(backoff: BackoffSettings, retries: number, min: number, max: number) { it(`have correct value for ${retries} retries`, () => { diff --git a/src/credentials.ts b/src/credentials.ts index 39a2c83c..8a1f4d2a 100644 --- a/src/credentials.ts +++ b/src/credentials.ts @@ -1,7 +1,6 @@ import * as grpc from '@grpc/grpc-js'; import jwt from 'jsonwebtoken'; import {DateTime} from 'luxon'; -import {GrpcService, sleep, withTimeout} from './utils'; import {yandex, Ydb} from 'ydb-sdk-proto'; import {ISslCredentials, makeDefaultSslCredentials} from './ssl-credentials'; import IamTokenService = yandex.cloud.iam.v1.IamTokenService; @@ -9,7 +8,10 @@ import AuthServiceResult = Ydb.Auth.LoginResult; import ICreateIamTokenResponse = yandex.cloud.iam.v1.ICreateIamTokenResponse; import type {MetadataTokenService} from '@yandex-cloud/nodejs-sdk/dist/token-service/metadata-token-service'; import {retryable} from './retries'; -import {getOperationPayload} from "./table/table-utils"; +import {sleep} from "./utils/sleep"; +import {withTimeout} from "./utils/with-timeout"; +import {GrpcService} from "./utils/grpc-service"; +import {getOperationPayload} from "./table/utils/get-operation-payload"; function makeCredentialsMetadata(token: string): grpc.Metadata { const metadata = new grpc.Metadata(); diff --git a/src/decimal.ts b/src/decimal.ts index 55773166..01599b83 100644 --- a/src/decimal.ts +++ b/src/decimal.ts @@ -1,7 +1,7 @@ import {Ydb} from 'ydb-sdk-proto'; import Long from 'long'; -import {toLong} from './utils'; import IValue = Ydb.IValue; +import {toLong} from "./utils/to-long"; const DECIMAL_REGEX = /^-?\d+(\.\d+)?/; diff --git a/src/discovery.ts b/src/discovery.ts index e642178b..8530de9a 100644 --- a/src/discovery.ts +++ b/src/discovery.ts @@ -2,7 +2,6 @@ import _ from 'lodash'; import EventEmitter from 'events'; import {DateTime} from 'luxon'; import {Ydb} from "ydb-sdk-proto"; -import {AuthenticatedService, withTimeout} from "./utils"; import {IAuthService} from "./credentials"; import {retryable} from "./retries"; // noinspection ES6PreferShortImport @@ -11,7 +10,9 @@ import DiscoveryServiceAPI = Ydb.Discovery.V1.DiscoveryService; import IEndpointInfo = Ydb.Discovery.IEndpointInfo; import {Events} from "./constants"; import {ISslCredentials} from './ssl-credentials'; -import {getOperationPayload} from "./table/table-utils"; +import {TableAuthenticatedService} from "./table/table-authenticated-service"; +import {withTimeout} from "./utils/with-timeout"; +import {getOperationPayload} from "./table/utils/get-operation-payload"; type SuccessDiscoveryHandler = (result: Endpoint[]) => void; @@ -81,7 +82,7 @@ interface IDiscoverySettings { sslCredentials?: ISslCredentials, } -export default class DiscoveryService extends AuthenticatedService { +export default class DiscoveryService extends TableAuthenticatedService { private readonly database: string; private readonly discoveryPeriod: number; private readonly endpointsPromise: Promise; diff --git a/src/driver.ts b/src/driver.ts index aedea4db..e93ae73b 100644 --- a/src/driver.ts +++ b/src/driver.ts @@ -5,12 +5,13 @@ import {IAuthService} from './credentials'; import {TimeoutExpired} from './errors'; import {getLogger, Logger} from './logging'; import SchemeClient from './scheme'; -import {ClientOptions} from './utils'; import {parseConnectionString} from './parse-connection-string'; import {makeSslCredentials, ISslCredentials} from './ssl-credentials'; import {TableClient} from "./table"; import {QueryClient} from "./query/query-client"; +import {ClientOptions} from "./utils/client-options"; + export interface IPoolSettings { minLimit?: number; maxLimit?: number; @@ -72,6 +73,7 @@ export default class Driver { discoveryPeriod: ENDPOINT_DISCOVERY_PERIOD, logger: this.logger, }); + this.tableClient = new TableClient({ database: this.database, authService: this.authService, @@ -82,11 +84,12 @@ export default class Driver { logger: this.logger, }); + // this.queryClient = {} as unknown as QueryClient; this.queryClient = new QueryClient({ database: this.database, authService: this.authService, sslCredentials: this.sslCredentials, - poolSettings: this.poolSettings, + poolSettings: {...this.poolSettings, minLimit: 1}, clientOptions: this.clientOptions, discoveryService: this.discoveryService, logger: this.logger, diff --git a/src/query/_contextProto.ts b/src/query/_contextProto.ts new file mode 100644 index 00000000..6a5ab106 --- /dev/null +++ b/src/query/_contextProto.ts @@ -0,0 +1,47 @@ +// import {Logger} from "../logging"; +// +// const CancelPromise = Symbol('CancelPromise'); +// +// class Context { +// withId(id:) +// withValue +// witghCancel +// withTimeout() +// cancel(), +// getValue(name: string, type: function), +// +// } +// +// +// +// class SomeContext /*extends Context*/ { +// +// constructor(opts: { +// id?, +// ctx?: SomeContext, // parent context +// timeout?: number, // fluxon duration +// // cancellable?: boolean, +// // requestId?: string, +// logger?: Logger, +// // something about spans +// } = {}) { +// +// +// +// +// +// // ensure logger +// } +// +// cancel() { +// +// } +// +// +// [CancelPromise]?: Promise; +// +// +// get cancelPromise() { +// return this[CancelPromise]; +// } +// } diff --git a/src/query/client-factory.ts b/src/query/client-factory.ts new file mode 100644 index 00000000..855fcd39 --- /dev/null +++ b/src/query/client-factory.ts @@ -0,0 +1,27 @@ +// // TODO: Ref count +// +// import {Client} from "@grpc/grpc-js"; +// import {IAuthService} from "../credentials"; +// import {Logger} from "../logging"; +// import {ISslCredentials} from "../ssl-credentials"; +// import {ClientOptions} from "../utils"; +// +// class GrpcClientFactory { +// +// constructor(database: string, logger: Logger, sslCredentials?: ISslCredentials, clientOptions?: ClientOptions) { +// } +// +// // TODO: Consider to close clients af some period of time of inactivity +// // TODO: We may close clients that are no longer in discovery list +// private cache = new WeakMap(); +// +// aquire(): Client { +// +// } +// +// release(client: Client) { +// +// } +// } +// +// diff --git a/src/query/query-authenticated-service.ts b/src/query/query-authenticated-service.ts new file mode 100644 index 00000000..79e4bc6a --- /dev/null +++ b/src/query/query-authenticated-service.ts @@ -0,0 +1,108 @@ +import * as $protobuf from "protobufjs"; +import {ISslCredentials} from "../ssl-credentials"; +import * as grpc from "@grpc/grpc-js"; +import _ from "lodash"; +import {IAuthService} from "../credentials"; +import {getVersionHeader} from "../version"; +import {StreamEnd} from "../table/utils"; +import {ClientOptions, getDatabaseHeader, MetadataHeaders, removeProtocol, ServiceFactory} from "../utils"; +import {TableAuthenticatedService} from "../table/table-authenticated-service"; + +export abstract class QueryAuthenticatedService { + protected api: Api; // TODO: Consider remove + protected metadata: grpc.Metadata; + private responseMetadata: WeakMap; + private lastRequest!: object; + + protected readonly headers: MetadataHeaders; + + static isServiceAsyncMethod(target: object, prop: string | number | symbol, receiver: any) { + return ( + Reflect.has(target, prop) && + typeof Reflect.get(target, prop, receiver) === 'function' && + prop !== 'create' + ); + } + + public getResponseMetadata(request: object) { + return this.responseMetadata.get(request); + } + + /** + * + * @param host + * @param database + * @param name + * @param apiCtor + * @param authService + * @param sslCredentials + * @param clientOptions + * @param streamMethods In query service API unlike table service API methods that return stream + * are not labeled with the word Stream in the name. So they must be listed explicitly + * @protected + */ + protected constructor( + host: string, + database: string, + private name: string, + private apiCtor: ServiceFactory, + protected authService: IAuthService, + protected sslCredentials?: ISslCredentials, + protected clientOptions?: ClientOptions, + private streamMethods?: string[], + ) { + this.headers = new Map([getVersionHeader(), getDatabaseHeader(database)]); + this.metadata = new grpc.Metadata(); + this.responseMetadata = new WeakMap(); + this.api = new Proxy( + this.getClient(removeProtocol(host), this.sslCredentials, clientOptions), + { + get: (target, prop, receiver) => { + const property = Reflect.get(target, prop, receiver); + return TableAuthenticatedService.isServiceAsyncMethod(target, prop, receiver) ? + async (...args: any[]) => { + if (!['emit', 'rpcCall', 'rpcImpl'].includes(String(prop))) { + if (args.length) { + this.lastRequest = args[0]; + } + } + + this.metadata = await this.authService.getAuthMetadata(); + for (const [name, value] of this.headers) { + if (value) { + this.metadata.add(name, value); + } + } + + return property.call(receiver, ...args); + } : + property; + } + } + ); + } + + protected getClient(host: string, sslCredentials?: ISslCredentials, clientOptions?: ClientOptions): Api { + const client = sslCredentials ? + new grpc.Client(host, grpc.credentials.createSsl(sslCredentials.rootCertificates, sslCredentials.clientCertChain, sslCredentials.clientPrivateKey), clientOptions) : + new grpc.Client(host, grpc.credentials.createInsecure(), clientOptions); + const rpcImpl: $protobuf.RPCImpl = (method, requestData, callback) => { + const path = `/${this.name}/${method.name}`; + if (method.name.startsWith('Stream') || this.streamMethods?.findIndex((v) => v === method.name)) { + client.makeServerStreamRequest(path, _.identity, _.identity, requestData, this.metadata) + .on('data', (data) => callback(null, data)) + .on('end', () => callback(new StreamEnd(), null)) + .on('error', (error) => callback(error, null)); + } else { + const req = client.makeUnaryRequest(path, _.identity, _.identity, requestData, this.metadata, callback); + const lastRequest = this.lastRequest; + req.on('status', ({metadata}: grpc.StatusObject) => { + if (lastRequest) { + this.responseMetadata.set(lastRequest, metadata); + } + }); + } + }; + return this.apiCtor.create(rpcImpl); + } +} diff --git a/src/query/query-client.ts b/src/query/query-client.ts index 00662f1a..9b36fda8 100644 --- a/src/query/query-client.ts +++ b/src/query/query-client.ts @@ -6,13 +6,14 @@ import {QuerySession} from "./query-session"; import {IAuthService} from "../credentials"; import {ISslCredentials} from "../ssl-credentials"; import {IPoolSettings} from "../driver"; -import {ClientOptions} from "../utils"; import DiscoveryService from "../discovery"; import {Logger} from "../logging"; import {QuerySessionsPool} from "./query-sessions-pool"; import {Ydb} from "ydb-sdk-proto"; import TransactionSettings = Ydb.Query.TransactionSettings; +import {ClientOptions} from "../utils/client-options"; + type SessionCallback = (session: QuerySession) => Promise; export interface IQueryClientSettings { database: string; diff --git a/src/query/query-session.ts b/src/query/query-session.ts index 64c07d1d..a9624fb5 100644 --- a/src/query/query-session.ts +++ b/src/query/query-session.ts @@ -2,7 +2,6 @@ import EventEmitter from 'events'; // import * as grpc from '@grpc/grpc-js'; import {Ydb} from 'ydb-sdk-proto'; -import {pessimizable} from '../utils'; import {Endpoint} from '../discovery'; import {Logger} from '../logging'; import {retryable} from '../retries'; @@ -11,6 +10,7 @@ import ICreateSessionResponse = Ydb.Query.ICreateSessionResponse; import ITransactionSettings = Ydb.Query.ITransactionSettings; import {SessionEvent} from "../table"; import {ensureOperationSucceeded} from "./query-utils"; +import {pessimizable} from "../utils"; // TODO: Add context to session export class QuerySession extends EventEmitter implements ICreateSessionResponse { @@ -31,17 +31,19 @@ export class QuerySession extends EventEmitter implements ICreateSessionResponse } async attach() { + // TODO: Rewrite + // promisify(this.api.attachSession)() - const state = new Promise((resolve, reject) => { - this.api.attachSession({sessionId: this.sessionId}, (error, response) => { - console.info(1000, 'sessionId', this.sessionId, error, response); - if (error) reject(error); - resolve(response); - }); - }); - if (state) { - console.info(1100, 'state', state); - } + // const state = new Promise((resolve, reject) => { + // this.api.attachSession({sessionId: this.sessionId}, (error, response) => { + // console.info(1000, 'sessionId', this.sessionId, error, response); + // if (error) reject(error); + // resolve(response); + // }); + // }); + // if (state) { + // console.info(1100, 'state', state); + // } } acquire() { @@ -73,7 +75,7 @@ export class QuerySession extends EventEmitter implements ICreateSessionResponse return Promise.resolve(); } this.beingDeleted = true; - ensureOperationSucceeded(await this.api.deleteSession({sessionId: this.sessionId})); // TODO: Shouldn't session delete has retry? + ensureOperationSucceeded(await this.api.deleteSession({sessionId: this.sessionId})); } @retryable() diff --git a/src/query/query-sessions-pool.ts b/src/query/query-sessions-pool.ts index baf7a170..e36da869 100644 --- a/src/query/query-sessions-pool.ts +++ b/src/query/query-sessions-pool.ts @@ -1,7 +1,6 @@ import {Ydb} from 'ydb-sdk-proto'; import {IAuthService} from "../credentials"; import {ISslCredentials} from "../ssl-credentials"; -import {AuthenticatedService, ClientOptions, pessimizable} from "../utils"; import DiscoveryService, {Endpoint} from "../discovery"; import {Logger} from "../logging"; import EventEmitter from "events"; @@ -10,13 +9,16 @@ import _ from "lodash"; import {SessionPoolEmpty} from "../errors"; import {retryable} from "../retries"; import {QuerySession} from "./query-session"; -import {SessionEvent} from "../table/session-event"; +import {ClientOptions, SessionEvent, pessimizable, removeProtocol} from "../utils"; import {ensureOperationSucceeded} from "./query-utils"; import {IQueryClientSettings} from "./query-client"; import QueryService = Ydb.Query.V1.QueryService; import CreateSessionRequest = Ydb.Query.CreateSessionRequest; +import {QueryAuthenticatedService} from "./query-authenticated-service"; +import * as grpc from "@grpc/grpc-js"; +import CreateSessionResponse = Ydb.Query.CreateSessionResponse; -export class QuerySessionCreator extends AuthenticatedService { +export class QuerySessionCreator extends QueryAuthenticatedService { public endpoint: Endpoint; private readonly logger: Logger; @@ -32,10 +34,53 @@ export class QuerySessionCreator extends AuthenticatedService { @retryable() @pessimizable async create(): Promise { + console.info(2000); + const host = removeProtocol(this.endpoint.toString()); + console.info(2900, this.sslCredentials) + const client = this.sslCredentials ? + new grpc.Client(host, grpc.credentials.createSsl(this.sslCredentials.rootCertificates, this.sslCredentials.clientCertChain, this.sslCredentials.clientPrivateKey), this.clientOptions) : + new grpc.Client(host, grpc.credentials.createInsecure(), this.clientOptions); + const metadata = await this.authService.getAuthMetadata(); + for (const [name, value] of this.headers) { + if (value) { + metadata.add(name, value); + } + } + // TODO: add rights thru metadata + const res = await new Promise((resolve, reject) => { + console.info(3000, Buffer.isBuffer(CreateSessionRequest.encode(CreateSessionRequest.create()).finish())); + + /* + for (const [name, value] of this.headers) { + if (value) { + this.metadata.add(name, value); + } + } + */ + + // console.info(3050, this.metadata); + console.info(3050, metadata); + // client.makeUnaryRequest('/Ydb.Query.V1.QueryService/CreateSession', + client.makeUnaryRequest('/Ydb.Query.V1.QueryService/create', + (req: CreateSessionRequest) => CreateSessionRequest.encode(req).finish() as Buffer, + CreateSessionResponse.decode, + CreateSessionRequest.create(), + metadata, + (err, resp) => { + console.info(3200, err, resp); + if (err) reject(err); + resolve(resp!); + }); + }); + console.info(3300, res); + + // Query.V1.QueryService.create() const response = ensureOperationSucceeded(await this.api.createSession(CreateSessionRequest.create())); + console.info(2100, response); const {sessionId} = response; const session = new QuerySession(this.api, this.endpoint, sessionId, this.logger/*, this.getResponseMetadata.bind(this)*/); await session.attach(); + console.info(2200); return session; } } @@ -45,6 +90,7 @@ export class QuerySessionsPool extends EventEmitter { private readonly authService: IAuthService; private readonly sslCredentials?: ISslCredentials; private readonly clientOptions?: ClientOptions; + // @ts-ignore private readonly minLimit: number; private readonly maxLimit: number; private readonly sessions: Set; @@ -86,12 +132,15 @@ export class QuerySessionsPool extends EventEmitter { } private prepopulateSessions() { - _.forEach(_.range(this.minLimit), () => this.createSession()); + // TODO: No error handling + // _.forEach(_.range(this.minLimit), () => this.createSession()); } private async getSessionCreator(): Promise { const endpoint = await this.discoveryService.getEndpoint(); + console.info(1000, endpoint); if (!this.sessionCreators.has(endpoint)) { + console.info(1100) const sessionService = new QuerySessionCreator(endpoint, this.database, this.authService, this.logger, this.sslCredentials, this.clientOptions); this.sessionCreators.set(endpoint, sessionService); } diff --git a/src/retries.ts b/src/retries.ts index c11adfb9..2ec4d437 100644 --- a/src/retries.ts +++ b/src/retries.ts @@ -1,7 +1,8 @@ import {YdbError, TransportError} from './errors'; import {getLogger, Logger} from './logging'; import * as errors from './errors'; -import {sleep} from './utils'; + +import {sleep} from "./utils/sleep"; export class BackoffSettings { /** diff --git a/src/scheme.ts b/src/scheme.ts index a50aa8dc..6dcfe16f 100644 --- a/src/scheme.ts +++ b/src/scheme.ts @@ -1,9 +1,4 @@ import {Ydb} from "ydb-sdk-proto"; -import { - AuthenticatedService, - pessimizable, - ClientOptions -} from "./utils"; import {IAuthService} from "./credentials"; // noinspection ES6PreferShortImport import {Logger} from './logging'; @@ -20,7 +15,11 @@ import IMakeDirectoryRequest = Ydb.Scheme.IMakeDirectoryRequest; import IPermissions = Ydb.Scheme.IPermissions; import {util} from "protobufjs"; import EventEmitter = util.EventEmitter; -import {ensureOperationSucceeded, getOperationPayload} from "./table/table-utils"; +import {TableAuthenticatedService} from "./table/table-authenticated-service"; +import {pessimizable} from "./utils/pessimizable"; +import {getOperationPayload} from "./table/utils/get-operation-payload"; +import {ensureOperationSucceeded} from "./table/utils/ensure-operation-succeeded"; +import {ClientOptions} from "./utils/client-options"; function preparePermissions(action?: IPermissions | null) { @@ -117,7 +116,7 @@ export default class SchemeClient extends EventEmitter { } } -class SchemeService extends AuthenticatedService { +class SchemeService extends TableAuthenticatedService { private logger: Logger; private readonly database: string; public endpoint: Endpoint; diff --git a/src/table/index.ts b/src/table/index.ts index 405c7f7d..9286b67e 100644 --- a/src/table/index.ts +++ b/src/table/index.ts @@ -1,3 +1,3 @@ export * from './table-sessions-pool'; export * from './table-session'; -export * from "./session-event"; +export * from "../utils/session-event"; diff --git a/src/utils.ts b/src/table/table-authenticated-service.ts similarity index 52% rename from src/utils.ts rename to src/table/table-authenticated-service.ts index c7e3ac0c..afef12d2 100644 --- a/src/utils.ts +++ b/src/table/table-authenticated-service.ts @@ -1,74 +1,17 @@ -import * as grpc from '@grpc/grpc-js'; -import * as $protobuf from 'protobufjs'; -import _ from 'lodash'; -import Long from 'long'; -import {NotFound, TimeoutExpired} from "./errors"; - -import {Endpoint} from './discovery'; -import {IAuthService} from './credentials'; -import {getVersionHeader} from './version'; -import {ISslCredentials} from './ssl-credentials'; - -function getDatabaseHeader(database: string): [string, string] { - return ['x-ydb-database', database]; -} - -export interface Pessimizable { - endpoint: Endpoint; -} - -type ServiceFactory = { - create(rpcImpl: $protobuf.RPCImpl, requestDelimited?: boolean, responseDelimited?: boolean): T -}; - -function removeProtocol(endpoint: string) { - const re = /^(grpc:\/\/|grpcs:\/\/)?(.+)/; - const match = re.exec(endpoint) as string[]; - return match[2]; -} - -export function withTimeout(promise: Promise, timeoutMs: number): Promise { - let timeoutId: NodeJS.Timeout; - const timedRejection: Promise = new Promise((_, reject) => { - timeoutId = setTimeout(() => { - reject(new TimeoutExpired(`Timeout of ${timeoutMs}ms has expired`)); - }, timeoutMs); - }); - return Promise.race([promise.finally(() => { - clearTimeout(timeoutId); - }), timedRejection]); -} - -export class StreamEnd extends Error {} - -export abstract class GrpcService { - protected api: Api; - - protected constructor(host: string, private name: string, private apiCtor: ServiceFactory, sslCredentials?: ISslCredentials) { - this.api = this.getClient(removeProtocol(host), sslCredentials); - } - - protected getClient(host: string, sslCredentials?: ISslCredentials): Api { - const client = sslCredentials ? - new grpc.Client(host, grpc.credentials.createSsl(sslCredentials.rootCertificates, sslCredentials.clientPrivateKey, sslCredentials.clientCertChain)) : - new grpc.Client(host, grpc.credentials.createInsecure()); - const rpcImpl: $protobuf.RPCImpl = (method, requestData, callback) => { - if(null===method && requestData === null && callback === null) { - // signal `end` from protobuf service - client.close() - return - } - const path = `/${this.name}/${method.name}`; - client.makeUnaryRequest(path, _.identity, _.identity, requestData, callback); - }; - return this.apiCtor.create(rpcImpl); - } -} - -export type MetadataHeaders = Map; -export type ClientOptions = Record; - -export abstract class AuthenticatedService { +import * as $protobuf from "protobufjs"; +import {ISslCredentials} from "../ssl-credentials"; +import * as grpc from "@grpc/grpc-js"; +import _ from "lodash"; +import {IAuthService} from "../credentials"; +import {getVersionHeader} from "../version"; +import {removeProtocol} from "../utils/remove-protocol"; +import {StreamEnd} from "./utils/stream-end"; +import {ServiceFactory} from "../utils/service-factory"; +import {MetadataHeaders} from "../utils/metadata-headers"; +import {ClientOptions} from "../utils/client-options"; +import {getDatabaseHeader} from "../utils/get-database-header"; + +export abstract class TableAuthenticatedService { protected api: Api; private metadata: grpc.Metadata; private responseMetadata: WeakMap; @@ -76,7 +19,7 @@ export abstract class AuthenticatedService { private readonly headers: MetadataHeaders; - static isServiceAsyncMethod(target: object, prop: string|number|symbol, receiver: any) { + static isServiceAsyncMethod(target: object, prop: string | number | symbol, receiver: any) { return ( Reflect.has(target, prop) && typeof Reflect.get(target, prop, receiver) === 'function' && @@ -119,7 +62,7 @@ export abstract class AuthenticatedService { { get: (target, prop, receiver) => { const property = Reflect.get(target, prop, receiver); - return AuthenticatedService.isServiceAsyncMethod(target, prop, receiver) ? + return TableAuthenticatedService.isServiceAsyncMethod(target, prop, receiver) ? async (...args: any[]) => { if (!['emit', 'rpcCall', 'rpcImpl'].includes(String(prop))) { if (args.length) { @@ -128,11 +71,13 @@ export abstract class AuthenticatedService { } this.metadata = await this.authService.getAuthMetadata(); + // console.info(100, this.metadata) for (const [name, value] of this.headers) { if (value) { this.metadata.add(name, value); } } + // console.info(200, this.metadata) return property.call(receiver, ...args); } : @@ -148,6 +93,7 @@ export abstract class AuthenticatedService { new grpc.Client(host, grpc.credentials.createInsecure(), clientOptions); const rpcImpl: $protobuf.RPCImpl = (method, requestData, callback) => { const path = `/${this.name}/${method.name}`; + // console.info(300, path) if (method.name.startsWith('Stream') || this.stremMethods?.findIndex((v) => v === method.name)) { client.makeServerStreamRequest(path, _.identity, _.identity, requestData, this.metadata) .on('data', (data) => callback(null, data)) @@ -166,29 +112,3 @@ export abstract class AuthenticatedService { return this.apiCtor.create(rpcImpl); } } - -export function pessimizable(_target: Pessimizable, _propertyKey: string, descriptor: PropertyDescriptor) { - const originalMethod = descriptor.value; - descriptor.value = async function (this: Pessimizable, ...args: any) { - try { - return await originalMethod.call(this, ...args); - } catch (error) { - if (!(error instanceof NotFound)) { - this.endpoint.pessimize(); - } - throw error; - } - }; - return descriptor; -} - -export async function sleep(milliseconds: number) { - await new Promise((resolve) => setTimeout(resolve, milliseconds)); -} - -export function toLong(value: Long | number): Long { - if (typeof value === 'number') { - return Long.fromNumber(value); - } - return value; -} diff --git a/src/table/table-session.ts b/src/table/table-session.ts index f5d77a7c..85142939 100644 --- a/src/table/table-session.ts +++ b/src/table/table-session.ts @@ -1,7 +1,6 @@ import EventEmitter from 'events'; import * as grpc from '@grpc/grpc-js'; import {google, Ydb} from 'ydb-sdk-proto'; -import {pessimizable, StreamEnd} from '../utils'; import {Endpoint} from '../discovery'; import {ResponseMetadataKeys} from '../constants'; import {Logger} from '../logging'; @@ -27,8 +26,12 @@ import IKeyRange = Ydb.Table.IKeyRange; import TypedValue = Ydb.TypedValue; import BulkUpsertResult = Ydb.Table.BulkUpsertResult; import OperationMode = Ydb.Operations.OperationParams.OperationMode; -import {SessionEvent} from "./session-event"; -import {ensureOperationSucceeded, getOperationPayload, TableAsyncResponse} from "./table-utils"; +import {SessionEvent} from "../utils/session-event"; +import {pessimizable} from "../utils/pessimizable"; +import {StreamEnd} from "./utils/stream-end"; +import {TableAsyncResponse} from "./utils/table-async-response"; +import {getOperationPayload} from "./utils/get-operation-payload"; +import {ensureOperationSucceeded} from "./utils/ensure-operation-succeeded"; interface PartialResponse { status?: (Ydb.StatusIds.StatusCode|null); diff --git a/src/table/table-sessions-pool.ts b/src/table/table-sessions-pool.ts index 89f0d522..3b441fb0 100644 --- a/src/table/table-sessions-pool.ts +++ b/src/table/table-sessions-pool.ts @@ -2,7 +2,6 @@ import {Ydb} from 'ydb-sdk-proto'; import {IAuthService} from "../credentials"; import {ISslCredentials} from "../ssl-credentials"; import {IPoolSettings} from "../driver"; -import {AuthenticatedService, ClientOptions, pessimizable} from "../utils"; import DiscoveryService, {Endpoint} from "../discovery"; import {Logger} from "../logging"; import EventEmitter from "events"; @@ -11,13 +10,16 @@ import _ from "lodash"; import {BadSession, SessionBusy, SessionPoolEmpty} from "../errors"; import {retryable} from "../retries"; import {TableSession} from "./table-session"; -import {SessionEvent} from "./session-event"; +import {SessionEvent} from "../utils/session-event"; import TableService = Ydb.Table.V1.TableService; import CreateSessionRequest = Ydb.Table.CreateSessionRequest; import CreateSessionResult = Ydb.Table.CreateSessionResult; -import {getOperationPayload} from "./table-utils"; +import {TableAuthenticatedService} from "./table-authenticated-service"; +import {pessimizable} from "../utils/pessimizable"; +import {getOperationPayload} from "./utils/get-operation-payload"; +import {ClientOptions} from "../utils/client-options"; -export class TableSessionBuilder extends AuthenticatedService { +export class TableSessionBuilder extends TableAuthenticatedService { public endpoint: Endpoint; private readonly logger: Logger; diff --git a/src/table/table-utils.ts b/src/table/table-utils.ts deleted file mode 100644 index e879856e..00000000 --- a/src/table/table-utils.ts +++ /dev/null @@ -1,36 +0,0 @@ -import {MissingOperation, MissingValue, StatusCode, YdbError} from "../errors"; -import {Ydb} from "ydb-sdk-proto"; - -export interface TableAsyncResponse { - operation?: Ydb.Operations.IOperation | null -} - -export function getOperationPayload(response: TableAsyncResponse): Uint8Array { - const {operation} = response; - - if (operation) { - YdbError.checkStatus(operation); - const value = operation?.result?.value; - if (!value) { - throw new MissingValue('Missing operation result value!'); - } - return value; - } else { - throw new MissingOperation('No operation in response!'); - } -} - -export function ensureOperationSucceeded(response: TableAsyncResponse, suppressedErrors: StatusCode[] = []): void { - try { - getOperationPayload(response); - } catch (error) { - const e = error as any; - if (suppressedErrors.indexOf(e.constructor.status) > -1) { - return; - } - - if (!(e instanceof MissingValue)) { - throw e; - } - } -} diff --git a/src/table/utils/ensure-operation-succeeded.ts b/src/table/utils/ensure-operation-succeeded.ts new file mode 100644 index 00000000..fb0be973 --- /dev/null +++ b/src/table/utils/ensure-operation-succeeded.ts @@ -0,0 +1,18 @@ +import {TableAsyncResponse} from "./table-async-response"; +import {MissingValue, StatusCode} from "../../errors"; +import {getOperationPayload} from "./get-operation-payload"; + +export function ensureOperationSucceeded(response: TableAsyncResponse, suppressedErrors: StatusCode[] = []): void { + try { + getOperationPayload(response); + } catch (error) { + const e = error as any; + if (suppressedErrors.indexOf(e.constructor.status) > -1) { + return; + } + + if (!(e instanceof MissingValue)) { + throw e; + } + } +} diff --git a/src/table/utils/get-operation-payload.ts b/src/table/utils/get-operation-payload.ts new file mode 100644 index 00000000..53b5f2a5 --- /dev/null +++ b/src/table/utils/get-operation-payload.ts @@ -0,0 +1,17 @@ +import {TableAsyncResponse} from "./table-async-response"; +import {MissingOperation, MissingValue, YdbError} from "../../errors"; + +export function getOperationPayload(response: TableAsyncResponse): Uint8Array { + const {operation} = response; + + if (operation) { + YdbError.checkStatus(operation); + const value = operation?.result?.value; + if (!value) { + throw new MissingValue('Missing operation result value!'); + } + return value; + } else { + throw new MissingOperation('No operation in response!'); + } +} diff --git a/src/table/utils/index.ts b/src/table/utils/index.ts new file mode 100644 index 00000000..5ba0d6b7 --- /dev/null +++ b/src/table/utils/index.ts @@ -0,0 +1,4 @@ +export * from './ensure-operation-succeeded'; +export * from './get-operation-payload'; +export * from './stream-end'; +export * from './table-async-response'; diff --git a/src/table/utils/stream-end.ts b/src/table/utils/stream-end.ts new file mode 100644 index 00000000..cb17a641 --- /dev/null +++ b/src/table/utils/stream-end.ts @@ -0,0 +1,2 @@ +export class StreamEnd extends Error { +} diff --git a/src/table/utils/table-async-response.ts b/src/table/utils/table-async-response.ts new file mode 100644 index 00000000..0dd83a96 --- /dev/null +++ b/src/table/utils/table-async-response.ts @@ -0,0 +1,5 @@ +import {Ydb} from "ydb-sdk-proto"; + +export interface TableAsyncResponse { + operation?: Ydb.Operations.IOperation | null +} diff --git a/src/utils/client-options.ts b/src/utils/client-options.ts new file mode 100644 index 00000000..f4d956fd --- /dev/null +++ b/src/utils/client-options.ts @@ -0,0 +1 @@ +export type ClientOptions = Record; diff --git a/src/utils/get-database-header.ts b/src/utils/get-database-header.ts new file mode 100644 index 00000000..95c0101d --- /dev/null +++ b/src/utils/get-database-header.ts @@ -0,0 +1,3 @@ +export function getDatabaseHeader(database: string): [string, string] { + return ['x-ydb-database', database]; +} diff --git a/src/utils/gprc_client/index.ts b/src/utils/gprc_client/index.ts new file mode 100644 index 00000000..18a0f37a --- /dev/null +++ b/src/utils/gprc_client/index.ts @@ -0,0 +1,120 @@ +// same options as for table client, incluiding log +// header + auth metadata +// prefix +// paramters - method, args, res +// output stream +// connected to endpoint + +import {IAuthService} from "../../credentials"; +import {Logger} from "../../logging"; +import {ISslCredentials} from "../../ssl-credentials"; +import {ClientOptions} from "../client-options"; +import * as grpc from "@grpc/grpc-js"; +import {removeProtocol} from "../remove-protocol"; +import pkgInfo from "../../../package.json"; +import {BufferWriter, Writer} from "protobufjs"; + +const SDK_VERSION_HDR = 'x-ydb-sdk-build-info'; +const SDK_VERSION = `ydb-nodejs-sdk/${pkgInfo.version}`; + +const DB_HDR = 'x-ydb-database'; + +const enum GRPC_NS { + TableV1 = 'Ydb.Table.V1.TableService', + QueryV1 = 'Ydb.Query.V1.QueryService', +} + +export class GrpcClient { + // @ts-ignore + private logger: Logger; + private endpoint: string; + private client: grpc.Client; + private database: string; + private authService: IAuthService; + private sslCredentials?: ISslCredentials; + private clientOptions?: ClientOptions; + + constructor(opts: { + logger: Logger, + // TODO: consider named args + host: string, + endpoint: string, + database: string, + authService: IAuthService, + sslCredentials?: ISslCredentials, + clientOptions?: ClientOptions + }) { + this.logger = opts.logger; + + if (!(opts.host || opts.endpoint)) throw new Error('Either "host" or "endpoint" must be specified'); + this.endpoint = opts.host ? removeProtocol(opts.host) : opts.endpoint.toString(); + + this.database = opts.database; + this.authService = opts.authService; + if (opts.sslCredentials) this.sslCredentials = opts.sslCredentials; + if (opts.clientOptions) this.clientOptions = opts.clientOptions; + + this.client = this.sslCredentials ? + new grpc.Client( + this.endpoint, + grpc.credentials.createSsl( + this.sslCredentials.rootCertificates, + this.sslCredentials.clientCertChain, + this.sslCredentials.clientPrivateKey), + this.clientOptions) : + new grpc.Client( + this.endpoint, + grpc.credentials.createInsecure(), + this.clientOptions); + } + + async destroy() { + // not init + } + + async call< + IReq extends { encode: (value: IReq) => BufferWriter | Writer }, + IResp extends { decode: (value: Buffer) => IResp } + >( + namespace: GRPC_NS, + grpcMethod: { + (request: IReq): IResp; + name: string; + }, + reqCtor: IReq, + respCtor: IResp, + request: IReq) { // {metadata. response} + + const reqMetadata = new grpc.Metadata(); + reqMetadata.add(SDK_VERSION_HDR, SDK_VERSION); + reqMetadata.add(DB_HDR, this.database); + for (const [key, value] of Object.entries(await this.authService.getAuthMetadata())) { + reqMetadata.add(key, value); + } + + let respMetadata: grpc.Metadata | undefined; + let response = await new Promise((resolve, reject) => { + const events = this.client.makeUnaryRequest( + `/${namespace}/${grpcMethod.name}`, + (req: IReq) => reqCtor.encode(req).finish() as Buffer, + respCtor.decode, + request, + reqMetadata, + (err, resp) => { + console.info(3200, err, resp); + if (err) reject(err); + resolve(resp!); + }); + events.on('status', ({metadata}) => { + respMetadata = metadata; + }); + }); + return {metadata: respMetadata, response}; + }; + + // callStreamOut(method, request); // event emitter + // + // streamInResponse(method); // {writer, async getResult()} + // + // streamInStreamOut(method); // {writer, event emitter} +} diff --git a/src/utils/grpc-service.ts b/src/utils/grpc-service.ts new file mode 100644 index 00000000..9224d677 --- /dev/null +++ b/src/utils/grpc-service.ts @@ -0,0 +1,30 @@ +import * as $protobuf from "protobufjs"; +import {ServiceFactory} from "./service-factory"; +import {ISslCredentials} from "../ssl-credentials"; +import {removeProtocol} from "./remove-protocol"; +import * as grpc from "@grpc/grpc-js"; +import _ from "lodash"; + +export abstract class GrpcService { + protected api: Api; + + protected constructor(host: string, private name: string, private apiCtor: ServiceFactory, sslCredentials?: ISslCredentials) { + this.api = this.getClient(removeProtocol(host), sslCredentials); + } + + protected getClient(host: string, sslCredentials?: ISslCredentials): Api { + const client = sslCredentials ? + new grpc.Client(host, grpc.credentials.createSsl(sslCredentials.rootCertificates, sslCredentials.clientPrivateKey, sslCredentials.clientCertChain)) : + new grpc.Client(host, grpc.credentials.createInsecure()); + const rpcImpl: $protobuf.RPCImpl = (method, requestData, callback) => { + if (null === method && requestData === null && callback === null) { + // signal `end` from protobuf service + client.close() + return + } + const path = `/${this.name}/${method.name}`; + client.makeUnaryRequest(path, _.identity, _.identity, requestData, callback); + }; + return this.apiCtor.create(rpcImpl); + } +} diff --git a/src/utils/index.ts b/src/utils/index.ts new file mode 100644 index 00000000..3c45911d --- /dev/null +++ b/src/utils/index.ts @@ -0,0 +1,11 @@ +export * from './client-options'; +export * from './get-database-header'; +export * from './grpc-service'; +export * from './metadata-headers'; +export * from './pessimizable'; +export * from './remove-protocol'; +export * from './service-factory'; +export * from './session-event'; +export * from './sleep'; +export * from './to-long'; +export * from './with-timeout'; diff --git a/src/utils/metadata-headers.ts b/src/utils/metadata-headers.ts new file mode 100644 index 00000000..80131b7f --- /dev/null +++ b/src/utils/metadata-headers.ts @@ -0,0 +1 @@ +export type MetadataHeaders = Map; diff --git a/src/utils/pessimizable.ts b/src/utils/pessimizable.ts new file mode 100644 index 00000000..9aa20caa --- /dev/null +++ b/src/utils/pessimizable.ts @@ -0,0 +1,21 @@ +import {NotFound} from "../errors"; +import {Endpoint} from "../discovery"; + +export interface Pessimizable { + endpoint: Endpoint; +} + +export function pessimizable(_target: Pessimizable, _propertyKey: string, descriptor: PropertyDescriptor) { + const originalMethod = descriptor.value; + descriptor.value = async function (this: Pessimizable, ...args: any) { + try { + return await originalMethod.call(this, ...args); + } catch (error) { + if (!(error instanceof NotFound)) { + this.endpoint.pessimize(); + } + throw error; + } + }; + return descriptor; +} diff --git a/src/utils/remove-protocol.ts b/src/utils/remove-protocol.ts new file mode 100644 index 00000000..6d071922 --- /dev/null +++ b/src/utils/remove-protocol.ts @@ -0,0 +1,5 @@ +export function removeProtocol(endpoint: string) { + const re = /^(grpc:\/\/|grpcs:\/\/)?(.+)/; + const match = re.exec(endpoint) as string[]; + return match[2]; +} diff --git a/src/utils/service-factory.ts b/src/utils/service-factory.ts new file mode 100644 index 00000000..23c92f01 --- /dev/null +++ b/src/utils/service-factory.ts @@ -0,0 +1,5 @@ +import * as $protobuf from "protobufjs"; + +export type ServiceFactory = { + create(rpcImpl: $protobuf.RPCImpl, requestDelimited?: boolean, responseDelimited?: boolean): T +}; diff --git a/src/table/session-event.ts b/src/utils/session-event.ts similarity index 100% rename from src/table/session-event.ts rename to src/utils/session-event.ts diff --git a/src/utils/sleep.ts b/src/utils/sleep.ts new file mode 100644 index 00000000..50966440 --- /dev/null +++ b/src/utils/sleep.ts @@ -0,0 +1,3 @@ +export async function sleep(milliseconds: number) { + await new Promise((resolve) => setTimeout(resolve, milliseconds)); +} diff --git a/src/utils/to-long.ts b/src/utils/to-long.ts new file mode 100644 index 00000000..19a3dc26 --- /dev/null +++ b/src/utils/to-long.ts @@ -0,0 +1,8 @@ +import Long from "long"; + +export function toLong(value: Long | number): Long { + if (typeof value === 'number') { + return Long.fromNumber(value); + } + return value; +} diff --git a/src/utils/with-timeout.ts b/src/utils/with-timeout.ts new file mode 100644 index 00000000..827e1a94 --- /dev/null +++ b/src/utils/with-timeout.ts @@ -0,0 +1,13 @@ +import {TimeoutExpired} from "../errors"; + +export function withTimeout(promise: Promise, timeoutMs: number): Promise { + let timeoutId: NodeJS.Timeout; + const timedRejection: Promise = new Promise((_, reject) => { + timeoutId = setTimeout(() => { + reject(new TimeoutExpired(`Timeout of ${timeoutMs}ms has expired`)); + }, timeoutMs); + }); + return Promise.race([promise.finally(() => { + clearTimeout(timeoutId); + }), timedRejection]); +} diff --git a/src/uuid.ts b/src/uuid.ts index e82d3d1e..ad892717 100644 --- a/src/uuid.ts +++ b/src/uuid.ts @@ -1,8 +1,8 @@ import {Ydb} from 'ydb-sdk-proto'; import * as uuid from 'uuid'; import Long from 'long'; -import {toLong} from './utils'; import IValue = Ydb.IValue; +import {toLong} from "./utils/to-long"; /** * Every UUID string value represents as hex digits displayed in five groups separated by hyphens: