diff --git a/examples/topic-service/index.ts b/examples/topic-service/index.ts index 9d4968f9..df8e8445 100644 --- a/examples/topic-service/index.ts +++ b/examples/topic-service/index.ts @@ -1,36 +1,60 @@ -import {Driver as YDB} from '../../src'; -import {AnonymousAuthService} from '../../src/credentials/anonymous-auth-service'; -import {SimpleLogger} from "../../src/logger/simple-logger"; +import {Driver as YDB, getCredentialsFromEnv} from 'ydb-sdk'; import {Ydb} from "ydb-sdk-proto"; -import {Context} from "../../src/context"; +import {getDefaultLogger} from "../../src/logger/get-default-logger"; +import {main} from "../utils"; +import Codec = Ydb.Topic.Codec; +import {Context} from "ydb-sdk/build/cjs/src/context/context"; require('dotenv').config(); const DATABASE = '/local'; const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136'; -async function main() { +async function run() { + const logger = getDefaultLogger(); + const authService = getCredentialsFromEnv(logger); const db = new YDB({ - endpoint: ENDPOINT, - database: DATABASE, - authService: new AnonymousAuthService(), - logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}), + endpoint: ENDPOINT, // i.e.: grc(s):// + database: DATABASE, // i.e.: '/local' + authService, logger + // logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}), }); if (!(await db.ready(3000))) throw new Error('Driver is not ready!'); try { await db.topic.createTopic({ path: 'demoTopic', + supportedCodecs: { + codecs: [Ydb.Topic.Codec.CODEC_RAW], + }, + partitioningSettings: { + minActivePartitions: 3, + }, consumers: [{ - name: 'demo', + name: 'demoConsumer', }], }); + + await db.topic.alterTopic({ + path: 'demoTopic', + addConsumers: [{ + name: 'anotherqDemoConsumer', + }], + setSupportedCodecs: { + codecs: [Ydb.Topic.Codec.CODEC_RAW, Codec.CODEC_GZIP], + }, + }); + + logger.info(await db.topic.describeTopic({ + path: 'demoTopic', + })); + const writer = await db.topic.createWriter({ path: 'demoTopic', // producerId: '...', // will be genereted automatically // messageGroupId: '...' // will be the same as producerId getLastSeqNo: true, // seqNo will be assigned automatically }); - await writer.sendMessages({ + await writer.send({ codec: Ydb.Topic.Codec.CODEC_RAW, messages: [{ data: Buffer.from('Hello, world'), @@ -39,24 +63,34 @@ async function main() { }); const promises = []; for (let n = 0; n < 4; n++) { - promises.push(writer.sendMessages({ + promises.push(writer.send({ codec: Ydb.Topic.Codec.CODEC_RAW, messages: [{ data: Buffer.from(`Message N${n}`), uncompressedSize: `Message N${n}`.length, + metadataItems: [ + { + key: 'key', + value: new TextEncoder().encode('value'), + }, + { + key: 'key2', + value: new TextEncoder().encode('value2'), + } + ], }], - })); + })); } - await writer.close(); + await writer.close(); // // graceful close() - will finish after receiving confirmation that all messages have been processed by the server + // await Promise.all(promises); // another option - await Promise.all(promises); const reader = await db.topic.createReader(Context.createNew({ timeout: 3000, }).ctx, { topicsReadSettings: [{ path: 'demoTopic', }], - consumer: 'demo', + consumer: 'demoConsumer', receiveBufferSizeInBytes: 10_000_000, }); try { @@ -68,10 +102,15 @@ async function main() { if (!Context.isTimeout(err)) throw err; console.info('Timeout is over!'); } - await reader.close(true); // graceful close() - complete when all messages are commited + await reader.close(); // graceful close() - will complete when processing of all currently processed messages will finish + + await db.topic.dropTopic({ + path: 'demoTopic', + }); + } finally { await db.destroy(); } } -main(); +main(run); diff --git a/src/__tests__/e2e/topic-service/internal.test.ts.md b/src/__tests__/e2e/topic-service/internal.test.ts.md index 4ad0e44b..2fe97b42 100644 --- a/src/__tests__/e2e/topic-service/internal.test.ts.md +++ b/src/__tests__/e2e/topic-service/internal.test.ts.md @@ -12,7 +12,7 @@ import { ReadStreamStartPartitionSessionArgs } from "../../../topic/internal/topic-read-stream-with-events"; import {WriteStreamInitResult, WriteStreamWriteResult} from "../../../topic/internal/topic-write-stream-with-events"; -import {TopicNodeClient} from "../../../topic/internal/topic-node-client"; +import {TopicClientOnParticularNode} from "../../../topic/internal/topic-node-client"; import {Context} from "../../../context"; import {RetryParameters} from "../../../retries/retryParameters"; import {RetryStrategy} from "../../../retries/retryStrategy"; @@ -24,7 +24,7 @@ const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136'; describe('Topic: General', () => { let discoveryService: DiscoveryService; - let topicService: TopicNodeClient; + let topicService: TopicClientOnParticularNode; const ctx = Context.createNew().ctx; beforeEach(async () => { @@ -191,7 +191,7 @@ describe('Topic: General', () => { logger, }); await discoveryService.ready(ENDPOINT_DISCOVERY_PERIOD); - topicService = new TopicNodeClient( + topicService = new TopicClientOnParticularNode( await discoveryService.getEndpoint(), // TODO: Should be one per endpoint DATABASE, authService, diff --git a/src/context/context.ts b/src/context/context.ts index 9907b3e4..6bd5152f 100644 --- a/src/context/context.ts +++ b/src/context/context.ts @@ -154,7 +154,7 @@ export class Context { } /** - * Makes a promise cancellable through context, if the context allows cancel or has a timeout. + * Makes a pr omise cancellable through context, if the context allows cancel or has a timeout. */ public cancelRace(promise: Promise): Promise { if (!this.onCancel) return promise; diff --git a/src/discovery/discovery-service.ts b/src/discovery/discovery-service.ts index eaf6a66d..3b75bdee 100644 --- a/src/discovery/discovery-service.ts +++ b/src/discovery/discovery-service.ts @@ -8,7 +8,7 @@ import {retryable} from "../retries_obsoleted"; import {getOperationPayload} from "../utils/process-ydb-operation-result"; import {AuthenticatedService, withTimeout} from "../utils"; import {Logger} from "../logger/simple-logger"; -import {TopicNodeClient} from "../topic/internal/topic-node-client"; +import {InternalTopicClient} from "../topic/internal/internal-topic-client"; import {IDiscoverySettings} from "../client/settings"; type FailureDiscoveryHandler = (err: Error) => void; @@ -146,7 +146,7 @@ export default class DiscoveryService extends AuthenticatedService void; @@ -18,7 +18,7 @@ export class Endpoint extends Ydb.Discovery.EndpointInfo { private pessimizedAt: DateTime | null; - public topicNodeClient?: TopicNodeClient; + public topicNodeClient?: InternalTopicClient; static fromString(host: string) { const match = Endpoint.HOST_RE.exec(host); diff --git a/src/query/query-session-execute.ts b/src/query/query-session-execute.ts index f8a8191f..db0ccb0b 100644 --- a/src/query/query-session-execute.ts +++ b/src/query/query-session-execute.ts @@ -20,6 +20,35 @@ import {CtxUnsubcribe} from "../context"; import IExecuteQueryRequest = Ydb.Query.IExecuteQueryRequest; import IColumn = Ydb.IColumn; +export type IExecuteArgs = { + /** + * SQL query / DDL etc. + * + */ + text: string, + /** + * Default value is SYNTAX_YQL_V1. + */ + syntax?: Ydb.Query.Syntax, + /** + * SQL query parameters. + */ + parameters?: { [k: string]: Ydb.ITypedValue }, + txControl?: Ydb.Query.ITransactionControl, + execMode?: Ydb.Query.ExecMode, + statsMode?: Ydb.Query.StatsMode, + concurrentResultSets?: boolean, + /** + * Operation timeout in ms + */ + // timeout?: number, // TODO: that make sense to timeout one op? + /** + * Default Native. + */ + rowMode?: RowType, + idempotent?: boolean, +}; + export type IExecuteResult = { resultSets: AsyncGenerator, execStats?: Ydb.TableStats.IQueryStats; @@ -51,49 +80,22 @@ export const enum RowType { * Finishes when the first data block is received or when the end of the stream is received. So if you are sure * that the operation does not return any data, you may not process resultSets. */ -export function execute(this: QuerySession, opts: { - /** - * SQL query / DDL etc. - * - */ - text: string, - /** - * Default value is SYNTAX_YQL_V1. - */ - syntax?: Ydb.Query.Syntax, - /** - * SQL query parameters. - */ - parameters?: { [k: string]: Ydb.ITypedValue }, - txControl?: Ydb.Query.ITransactionControl, - execMode?: Ydb.Query.ExecMode, - statsMode?: Ydb.Query.StatsMode, - concurrentResultSets?: boolean, - /** - * Operation timeout in ms - */ - // timeout?: number, // TODO: that make sense to timeout one op? - /** - * Default Native. - */ - rowMode?: RowType, - idempotent?: boolean, -}): Promise { - // Validate opts - if (!opts.text.trim()) throw new Error('"text" parameter is empty') - if (opts.parameters) - Object.keys(opts.parameters).forEach(n => { +export function execute(this: QuerySession, args: IExecuteArgs): Promise { + // Validate args + if (!args.text.trim()) throw new Error('"text" parameter is empty') + if (args.parameters) + Object.keys(args.parameters).forEach(n => { if (!n.startsWith('$')) throw new Error(`Parameter name must start with "$": ${n}`); }) - if (opts.txControl && this[sessionTxSettingsSymbol]) + if (args.txControl && this[sessionTxSettingsSymbol]) throw new Error(CANNOT_MANAGE_TRASACTIONS_ERROR); - if (opts.txControl?.txId) + if (args.txControl?.txId) throw new Error('Cannot contain txControl.txId because the current session transaction is used (see session.txId)'); if (this[sessionTxIdSymbol]) { - if (opts.txControl?.beginTx) + if (args.txControl?.beginTx) throw new Error('txControl.beginTx when there\'s already an open transaction'); } else { - if (opts.txControl?.commitTx && !opts.txControl?.beginTx) + if (args.txControl?.commitTx && !args.txControl?.beginTx) throw new Error('txControl.commitTx === true when no open transaction and there\'s no txControl.beginTx'); } @@ -101,23 +103,23 @@ export function execute(this: QuerySession, opts: { const executeQueryRequest: IExecuteQueryRequest = { sessionId: this.sessionId, queryContent: { - text: opts.text, - syntax: opts.syntax ?? Ydb.Query.Syntax.SYNTAX_YQL_V1, + text: args.text, + syntax: args.syntax ?? Ydb.Query.Syntax.SYNTAX_YQL_V1, }, - execMode: opts.execMode ?? Ydb.Query.ExecMode.EXEC_MODE_EXECUTE, + execMode: args.execMode ?? Ydb.Query.ExecMode.EXEC_MODE_EXECUTE, }; - if (opts.statsMode) executeQueryRequest.statsMode = opts.statsMode; - if (opts.parameters) executeQueryRequest.parameters = opts.parameters; + if (args.statsMode) executeQueryRequest.statsMode = args.statsMode; + if (args.parameters) executeQueryRequest.parameters = args.parameters; if (this[sessionTxSettingsSymbol] && !this[sessionTxIdSymbol]) executeQueryRequest.txControl = {beginTx: this[sessionTxSettingsSymbol], commitTx: false}; - else if (opts.txControl) - executeQueryRequest.txControl = opts.txControl; + else if (args.txControl) + executeQueryRequest.txControl = args.txControl; if (this[sessionTxIdSymbol]) (executeQueryRequest.txControl || (executeQueryRequest.txControl = {})).txId = this[sessionTxIdSymbol]; - executeQueryRequest.concurrentResultSets = opts.concurrentResultSets ?? false; - if (opts.hasOwnProperty('idempotent')) { + executeQueryRequest.concurrentResultSets = args.concurrentResultSets ?? false; + if (args.hasOwnProperty('idempotent')) { if (this[isIdempotentDoLevelSymbol]) throw new Error('The attribute of idempotency is already set at the level of do()'); - if (opts.idempotent) this[isIdempotentSymbol] = true; + if (args.idempotent) this[isIdempotentSymbol] = true; } // Run the operation @@ -197,13 +199,13 @@ export function execute(this: QuerySession, opts: { let resultSetTuple = resultSetByIndex[index]; if (!resultSetTuple) { iterator = buildAsyncQueueIterator(); - switch (opts.rowMode) { + switch (args.rowMode) { case RowType.Ydb: - resultSet = new ResultSet(index, partialResp.resultSet!.columns as IColumn[], opts.rowMode ?? RowType.Native, iterator); + resultSet = new ResultSet(index, partialResp.resultSet!.columns as IColumn[], args.rowMode ?? RowType.Native, iterator); break; default: // Native const nativeColumnsNames = (partialResp.resultSet!.columns as IColumn[]).map(v => snakeToCamelCaseConversion.ydbToJs(v.name!)); - resultSet = new ResultSet(index, nativeColumnsNames, opts.rowMode ?? RowType.Native, iterator); + resultSet = new ResultSet(index, nativeColumnsNames, args.rowMode ?? RowType.Native, iterator); resultSet[resultsetYdbColumnsSymbol] = partialResp.resultSet!.columns as IColumn[]; } resultSetIterator.push(resultSet); @@ -216,7 +218,7 @@ export function execute(this: QuerySession, opts: { [iterator, resultSet] = resultSetTuple; } - switch (opts.rowMode) { + switch (args.rowMode) { case RowType.Ydb: for (const row of partialResp.resultSet!.rows!) iterator.push(row); break; diff --git a/src/topic/internal/topic-node-client.ts b/src/topic/internal/internal-topic-client.ts similarity index 55% rename from src/topic/internal/topic-node-client.ts rename to src/topic/internal/internal-topic-client.ts index 0576c834..9dc56af8 100644 --- a/src/topic/internal/topic-node-client.ts +++ b/src/topic/internal/internal-topic-client.ts @@ -6,8 +6,8 @@ import ICreateTopicResult = Ydb.Topic.ICreateTopicResult; import {AuthenticatedService, ClientOptions} from "../../utils"; import {IAuthService} from "../../credentials/i-auth-service"; import {ISslCredentials} from "../../utils/ssl-credentials"; -import {TopicWriteStreamWithEvents, WriteStreamInitArgs} from "./topic-write-stream-with-events"; -import {TopicReadStreamWithEvents, ReadStreamInitArgs} from "./topic-read-stream-with-events"; +import {InternalTopicWriteStream, InternalWriteStreamInitArgs} from "./internal-topic-write-stream"; +import {InternalTopicReadStream, InternalReadStreamInitArgs} from "./internal-topic-read-stream"; import {v4 as uuid_v4} from 'uuid'; import {Context} from "../../context"; import * as grpc from "@grpc/grpc-js"; @@ -15,38 +15,37 @@ import * as grpc from "@grpc/grpc-js"; // TODO: Retries with the same options // TODO: Batches // TODO: Zip compression -// TODO: Regular auth token update // TODO: Graceful shutdown and close -export type CommitOffsetArgs = +export type InternalCommitOffsetArgs = Ydb.Topic.ICommitOffsetRequest & Required>; -export type CommitOffsetResult = Readonly; +export type InternalCommitOffsetResult = Readonly; -export type UpdateOffsetsInTransactionArgs = +export type InternalUpdateOffsetsInTransactionArgs = Ydb.Topic.IUpdateOffsetsInTransactionRequest & Required>; -export type UpdateOffsetsInTransactionResult = Readonly; +export type InternalUpdateOffsetsInTransactionResult = Readonly; -export type CreateTopicArgs = Ydb.Topic.ICreateTopicRequest & Required>; -export type CreateTopicResult = Readonly; +export type InternalCreateTopicArgs = Ydb.Topic.ICreateTopicRequest & Required>; +export type InternalCreateTopicResult = Readonly; -export type DescribeTopicArgs = +export type InternalDescribeTopicArgs = Ydb.Topic.IDescribeTopicRequest & Required>; -export type DescribeTopicResult = Readonly; +export type InternalDescribeTopicResult = Readonly; -export type DescribeConsumerArgs = +export type InternalDescribeConsumerArgs = Ydb.Topic.IDescribeConsumerRequest & Required>; -export type DescribeConsumerResult = Readonly; +export type InternalDescribeConsumerResult = Readonly; -export type AlterTopicArgs = Ydb.Topic.IAlterTopicRequest & Required>; -export type AlterTopicResult = Readonly -export type DropTopicArgs = Ydb.Topic.IDropTopicRequest & Required>; -export type DropTopicResult = Readonly; +export type InternalAlterTopicArgs = Ydb.Topic.IAlterTopicRequest & Required>; +export type InternalAlterTopicResult = Readonly +export type InternalDropTopicArgs = Ydb.Topic.IDropTopicRequest & Required>; +export type InternalDropTopicResult = Readonly; -export class TopicNodeClient extends AuthenticatedService implements ICreateTopicResult { +export class InternalTopicClient extends AuthenticatedService implements ICreateTopicResult { public endpoint: Endpoint; private readonly logger: Logger; private allStreams: { close(ctx: Context, fakeError?: Error): void }[] = []; @@ -78,14 +77,14 @@ export class TopicNodeClient extends AuthenticatedService) { + public async openWriteStreamWithEvents(ctx: Context, args: InternalWriteStreamInitArgs & Pick) { if (args.producerId === undefined || args.producerId === null) { const newGUID = uuid_v4(); args = {...args, producerId: newGUID, messageGroupId: newGUID} } else if (args.messageGroupId === undefined || args.messageGroupId === null) { args = {...args, messageGroupId: args.producerId}; } - const writerStream = new TopicWriteStreamWithEvents(ctx, args, this, this.logger); + const writerStream = new InternalTopicWriteStream(ctx, args, this, this.logger); writerStream.events.once('end', () => { const index = this.allStreams.findIndex(v => v === writerStream) if (index >= 0) this.allStreams.splice(index, 1); @@ -95,8 +94,8 @@ export class TopicNodeClient extends AuthenticatedService { const index = this.allStreams.findIndex(v => v === readStream) if (index >= 0) this.allStreams.splice(index, 1); @@ -106,31 +105,31 @@ export class TopicNodeClient extends AuthenticatedService> & {receiveBufferSizeInBytes: number}; -export type ReadStreamInitResult = Readonly; +export type InternalReadStreamInitResult = Readonly; -export type ReadStreamReadArgs = Ydb.Topic.StreamReadMessage.IReadRequest; -export type ReadStreamReadResult = Readonly; +export type InternalReadStreamReadArgs = Ydb.Topic.StreamReadMessage.IReadRequest; +export type InternalReadStreamReadResult = Readonly; -export type ReadStreamCommitOffsetArgs = Ydb.Topic.StreamReadMessage.ICommitOffsetRequest; -export type ReadStreamCommitOffsetResult = Readonly; +export type InternalReadStreamCommitOffsetArgs = Ydb.Topic.StreamReadMessage.ICommitOffsetRequest; +export type InternalReadStreamCommitOffsetResult = Readonly; -export type ReadStreamPartitionSessionStatusArgs = Ydb.Topic.StreamReadMessage.IPartitionSessionStatusRequest; -export type ReadStreamPartitionSessionStatusResult = Readonly; +export type InternalReadStreamPartitionSessionStatusArgs = Ydb.Topic.StreamReadMessage.IPartitionSessionStatusRequest; +export type InternalReadStreamPartitionSessionStatusResult = Readonly; -export type ReadStreamUpdateTokenArgs = Ydb.Topic.IUpdateTokenRequest; -export type ReadStreamUpdateTokenResult = Readonly; +export type InternalReadStreamUpdateTokenArgs = Ydb.Topic.IUpdateTokenRequest; +export type InternalReadStreamUpdateTokenResult = Readonly; -export type ReadStreamStartPartitionSessionArgs = Ydb.Topic.StreamReadMessage.IStartPartitionSessionRequest; -export type ReadStreamStartPartitionSessionResult = Readonly; +export type InternalReadStreamStartPartitionSessionArgs = Ydb.Topic.StreamReadMessage.IStartPartitionSessionRequest; +export type InternalReadStreamStartPartitionSessionResult = Readonly; -export type ReadStreamStopPartitionSessionArgs = Ydb.Topic.StreamReadMessage.IStopPartitionSessionRequest; -export type ReadStreamStopPartitionSessionResult = Readonly; +export type InternalReadStreamStopPartitionSessionArgs = Ydb.Topic.StreamReadMessage.IStopPartitionSessionRequest; +export type InternalReadStreamStopPartitionSessionResult = Readonly; export type ReadStreamEvents = { - initResponse: (resp: ReadStreamInitResult) => void, - readResponse: (resp: ReadStreamReadResult) => void, - commitOffsetResponse: (resp: ReadStreamCommitOffsetResult) => void, - partitionSessionStatusResponse: (resp: ReadStreamPartitionSessionStatusResult) => void, - startPartitionSessionRequest: (resp: ReadStreamStartPartitionSessionArgs) => void, - stopPartitionSessionRequest: (resp: ReadStreamStopPartitionSessionArgs) => void, - updateTokenResponse: (resp: ReadStreamUpdateTokenResult) => void, + initResponse: (resp: InternalReadStreamInitResult) => void, + readResponse: (resp: InternalReadStreamReadResult) => void, + commitOffsetResponse: (resp: InternalReadStreamCommitOffsetResult) => void, + partitionSessionStatusResponse: (resp: InternalReadStreamPartitionSessionStatusResult) => void, + startPartitionSessionRequest: (resp: InternalReadStreamStartPartitionSessionArgs) => void, + stopPartitionSessionRequest: (resp: InternalReadStreamStopPartitionSessionArgs) => void, + updateTokenResponse: (resp: InternalReadStreamUpdateTokenResult) => void, error: (err: Error) => void, end: (cause: Error) => void, } @@ -53,7 +53,7 @@ export const enum TopicWriteStreamState { Closed } -export class TopicReadStreamWithEvents { +export class InternalTopicReadStream { public events = new EventEmitter() as TypedEmitter; private reasonForClose?: Error; @@ -61,8 +61,8 @@ export class TopicReadStreamWithEvents { constructor( ctx: Context, - args: ReadStreamInitArgs, - private topicService: TopicNodeClient, + args: InternalReadStreamInitArgs, + private topicService: InternalTopicClient, // @ts-ignore public readonly logger: Logger) { this.logger.trace('%s: new TopicReadStreamWithEvents()', ctx); @@ -113,7 +113,7 @@ export class TopicReadStreamWithEvents { this.initRequest(ctx, args); }; - private initRequest(ctx: Context, args: ReadStreamInitArgs) { + private initRequest(ctx: Context, args: InternalReadStreamInitArgs) { this.logger.trace('%s: TopicReadStreamWithEvents.initRequest()', ctx); this.readBidiStream!.write( Ydb.Topic.StreamReadMessage.create({ @@ -121,7 +121,7 @@ export class TopicReadStreamWithEvents { })); } - public async readRequest(ctx: Context, args: ReadStreamReadArgs) { + public async readRequest(ctx: Context, args: InternalReadStreamReadArgs) { this.logger.trace('%s: TopicReadStreamWithEvents.readRequest()', ctx); if (!this.readBidiStream) throw new Error('Stream is closed') await this.updateToken(ctx); @@ -131,7 +131,7 @@ export class TopicReadStreamWithEvents { })); } - public async commitOffsetRequest(ctx: Context, args: ReadStreamCommitOffsetArgs) { + public async commitOffsetRequest(ctx: Context, args: InternalReadStreamCommitOffsetArgs) { this.logger.trace('%s: TopicReadStreamWithEvents.commitOffsetRequest()', ctx); if (!this.readBidiStream) { const err = new Error('Inner stream where from the message was received is closed. The message needs to be re-processed.'); @@ -145,7 +145,7 @@ export class TopicReadStreamWithEvents { })); } - public async partitionSessionStatusRequest(ctx: Context, args: ReadStreamPartitionSessionStatusArgs) { + public async partitionSessionStatusRequest(ctx: Context, args: InternalReadStreamPartitionSessionStatusArgs) { this.logger.trace('%s: TopicReadStreamWithEvents.partitionSessionStatusRequest()', ctx); if (!this.readBidiStream) throw new Error('Stream is closed') await this.updateToken(ctx); @@ -155,7 +155,7 @@ export class TopicReadStreamWithEvents { })); } - public async updateTokenRequest(ctx: Context, args: ReadStreamUpdateTokenArgs) { + public async updateTokenRequest(ctx: Context, args: InternalReadStreamUpdateTokenArgs) { this.logger.trace('%s: TopicReadStreamWithEvents.updateTokenRequest()', ctx); if (!this.readBidiStream) throw new Error('Stream is closed') await this.updateToken(ctx); @@ -166,7 +166,7 @@ export class TopicReadStreamWithEvents { // TODO: process response } - public async startPartitionSessionResponse(ctx: Context, args: ReadStreamStartPartitionSessionResult) { + public async startPartitionSessionResponse(ctx: Context, args: InternalReadStreamStartPartitionSessionResult) { this.logger.trace('%s: TopicReadStreamWithEvents.startPartitionSessionResponse()', ctx); if (!this.readBidiStream) throw new Error('Stream is closed') await this.updateToken(ctx); @@ -176,7 +176,7 @@ export class TopicReadStreamWithEvents { })); } - public async stopPartitionSessionResponse(ctx: Context, args: ReadStreamStopPartitionSessionResult) { + public async stopPartitionSessionResponse(ctx: Context, args: InternalReadStreamStopPartitionSessionResult) { this.logger.trace('%s: TopicReadStreamWithEvents.stopPartitionSessionResponse()', ctx); if (this.reasonForClose) throw new Error('Stream is not open'); await this.updateToken(ctx); diff --git a/src/topic/internal/topic-write-stream-with-events.ts b/src/topic/internal/internal-topic-write-stream.ts similarity index 83% rename from src/topic/internal/topic-write-stream-with-events.ts rename to src/topic/internal/internal-topic-write-stream.ts index d1d3f736..a78ab8bf 100644 --- a/src/topic/internal/topic-write-stream-with-events.ts +++ b/src/topic/internal/internal-topic-write-stream.ts @@ -1,6 +1,6 @@ import {Logger} from "../../logger/simple-logger"; import {Ydb} from "ydb-sdk-proto"; -import {TopicNodeClient} from "./topic-node-client"; +import {InternalTopicClient} from "./internal-topic-client"; import EventEmitter from "events"; import TypedEmitter from "typed-emitter/rxjs"; import {ClientDuplexStream} from "@grpc/grpc-js/build/src/call"; @@ -9,34 +9,34 @@ import {Context} from "../../context"; import {getTokenFromMetadata} from "../../credentials/add-credentials-to-metadata"; import {StatusObject} from "@grpc/grpc-js"; -export type WriteStreamInitArgs = - // Currently, messageGroupId must always equal producerId. This enforced in the TopicNodeClient.openWriteStreamWithEvents method +export type InternalWriteStreamInitArgs = + // Currently, messageGroupId must always be equal to producerId. This enforced in the TopicClientOnParticularNode.openWriteStreamWithEvents method Omit & Required>; -export type WriteStreamInitResult = +export type InternalWriteStreamInitResult = Readonly; -export type WriteStreamWriteArgs = +export type InternalWriteStreamWriteArgs = Ydb.Topic.StreamWriteMessage.IWriteRequest & Required>; -export type WriteStreamWriteResult = +export type InternalWriteStreamWriteResult = Ydb.Topic.StreamWriteMessage.IWriteResponse; -export type WriteStreamUpdateTokenArgs = +export type InternalWriteStreamUpdateTokenArgs = Ydb.Topic.IUpdateTokenRequest & Required>; -export type WriteStreamUpdateTokenResult = +export type InternalWriteStreamUpdateTokenResult = Readonly; export type WriteStreamEvents = { - initResponse: (resp: WriteStreamInitResult) => void, - writeResponse: (resp: WriteStreamWriteResult) => void, - updateTokenResponse: (resp: WriteStreamUpdateTokenResult) => void, + initResponse: (resp: InternalWriteStreamInitResult) => void, + writeResponse: (resp: InternalWriteStreamWriteResult) => void, + updateTokenResponse: (resp: InternalWriteStreamUpdateTokenResult) => void, error: (err: Error) => void, end: (cause: Error) => void, } -export class TopicWriteStreamWithEvents { +export class InternalTopicWriteStream { private reasonForClose?: Error; private writeBidiStream: ClientDuplexStream; @@ -44,8 +44,8 @@ export class TopicWriteStreamWithEvents { constructor( ctx: Context, - args: WriteStreamInitArgs, - private topicService: TopicNodeClient, + args: InternalWriteStreamInitArgs, + private topicService: InternalTopicClient, // @ts-ignore private logger: Logger) { this.logger.trace('%s: new TopicWriteStreamWithEvents|()', ctx); @@ -91,7 +91,7 @@ export class TopicWriteStreamWithEvents { this.initRequest(ctx, args); }; - private initRequest(ctx: Context, args: WriteStreamInitArgs) { + private initRequest(ctx: Context, args: InternalWriteStreamInitArgs) { this.logger.trace('%s: TopicWriteStreamWithEvents.initRequest()', ctx); // TODO: Consider zod.js this.writeBidiStream!.write( @@ -103,7 +103,7 @@ export class TopicWriteStreamWithEvents { })); } - public async writeRequest(ctx: Context, args: WriteStreamWriteArgs) { + public async writeRequest(ctx: Context, args: InternalWriteStreamWriteArgs) { this.logger.trace('%s: TopicWriteStreamWithEvents.writeRequest()', ctx); if (this.reasonForClose) throw new Error('Stream is not open'); await this.updateToken(ctx); @@ -113,7 +113,7 @@ export class TopicWriteStreamWithEvents { })); } - public async updateTokenRequest(ctx: Context, args: WriteStreamUpdateTokenArgs) { + public async updateTokenRequest(ctx: Context, args: InternalWriteStreamUpdateTokenArgs) { this.logger.trace('%s: TopicWriteStreamWithEvents.updateTokenRequest()', ctx); if (this.reasonForClose) throw new Error('Stream is not open'); await this.updateToken(ctx); diff --git a/src/topic/topic-client.ts b/src/topic/topic-client.ts index eafa9c64..a5afdda4 100644 --- a/src/topic/topic-client.ts +++ b/src/topic/topic-client.ts @@ -1,27 +1,136 @@ -import { - TopicNodeClient, - AlterTopicArgs, AlterTopicResult, - CommitOffsetArgs, CommitOffsetResult, - CreateTopicArgs, CreateTopicResult, - DescribeConsumerArgs, DescribeConsumerResult, - DescribeTopicArgs, DescribeTopicResult, - DropTopicArgs, DropTopicResult, - UpdateOffsetsInTransactionArgs, UpdateOffsetsInTransactionResult -} from "./internal/topic-node-client"; -import EventEmitter from "events"; -import {WriteStreamInitArgs} from "./internal/topic-write-stream-with-events"; -import {ReadStreamInitArgs} from "./internal/topic-read-stream-with-events"; +import {InternalWriteStreamInitArgs} from "./internal/internal-topic-write-stream"; +import {InternalReadStreamInitArgs} from "./internal/internal-topic-read-stream"; import {TopicWriter} from "./topic-writer"; import {Context, ensureContext} from "../context"; import {IClientSettings} from "../client/settings"; import {TopicReader} from "./topic-reader"; import {asIdempotentRetryableLambda} from "../retries/asIdempotentRetryableLambda"; +import {google, Ydb} from "ydb-sdk-proto"; +import {InternalTopicClient} from "./internal/internal-topic-client"; -export class TopicClient extends EventEmitter { // TODO: Reconsider why I need to have EventEmitter in any client - private service?: TopicNodeClient; +// TODO: Consider support for "operationParams?: (Ydb.Operations.IOperationParams|null);". It presents in eve+ry jdbc operation + +export type ICommitOffsetArgs = { + path: (string|null); + partitionId?: (number|Long|null); + consumer: (string|null); + offset: (number|Long|null); +} +export type IUpdateOffsetsInTransactionArgs = { + operationParams?: (Ydb.Operations.IOperationParams|null); + tx?: (Ydb.Topic.ITransactionIdentity|null); + topics: Ydb.Topic.UpdateOffsetsInTransactionRequest.ITopicOffsets[]; + consumer: string; +} +export type ICreateTopicArgs = { + path: (string|null); + partitioningSettings?: ({ + minActivePartitions?: (number|Long|null); + partitionCountLimit?: (number|Long|null); + }|null); + retentionPeriod?: (google.protobuf.ITimestamp|null); + retentionStorageMb?: (number|Long|null); + supportedCodecs?: ({ + codecs?: (number[]|null); + }|null); + partitionWriteSpeedBytesPerSecond?: (number|Long|null); + partitionWriteBurstBytes?: (number|Long|null); + attributes?: ({ [k: string]: string }|null); + consumers?: ({ + name?: (string|null); + important?: (boolean|null); + readFrom?: (google.protobuf.ITimestamp|null); + supportedCodecs?: ({ + codecs?: (number[]|null); + }|null); + attributes?: ({ [k: string]: string }|null); + consumerStats?: ({ + minPartitionsLastReadTime?: (google.protobuf.ITimestamp|null); + maxReadTimeLag?: (google.protobuf.IDuration|null); + maxWriteTimeLag?: (google.protobuf.IDuration|null); + bytesRead?: ({ + perMinute?: (number|Long|null); + perHour?: (number|Long|null); + perDay?: (number|Long|null); + }|null); + }|null); + }[]|null); + meteringMode?: (Ydb.Topic.MeteringMode|null); // UNSPECIFIED, RESERVED_CAPACITY, REQUEST_UNITS +}; +export type IDescribeTopicArgs = { + path: string; + includeStats?: (boolean|null); +} +export type IDescribeConsumerArgs = { + path: string; + consumer: string; +} +export type IAlterTopicArgs = { + path: string; + alterPartitioningSettings?: ({ + setMinActivePartitions?: (number|Long|null); + setPartitionCountLimit?: (number|Long|null); + }|null); + setRetentionPeriod?: (google.protobuf.IDuration|null); + setRetentionStorageMb?: (number|Long|null); + setSupportedCodecs?: ({ + codecs?: (number[]|null); + }|null); + setPartitionWriteSpeedBytesPerSecond?: (number|Long|null); + setPartitionWriteBurstBytes?: (number|Long|null); + alterAttributes?: ({ [k: string]: string }|null); + addConsumers?: ({ + name?: (string|null); + important?: (boolean|null); + readFrom?: (google.protobuf.ITimestamp|null); + supportedCodecs?: ({ + codecs?: (number[]|null); + }|null); + attributes?: ({ [k: string]: string }|null); + consumerStats?: ({ + minPartitionsLastReadTime?: (google.protobuf.ITimestamp|null); + maxReadTimeLag?: (google.protobuf.IDuration|null); + maxWriteTimeLag?: (google.protobuf.IDuration|null); + bytesRead?: ({ + perMinute?: (number|Long|null); + perHour?: (number|Long|null); + perDay?: (number|Long|null); + }|null); + }|null); + }[]|null); + dropConsumers?: (string[]|null); + alterConsumers?: ({ + name: string; + setImportant?: (boolean|null); + setReadFrom?: (google.protobuf.ITimestamp|null); + setSupportedCodecs?: ({ + codecs?: (number[]|null); + }|null); + alterAttributes?: ({ [k: string]: string }|null); + }[]|null); + setMeteringMode?: (Ydb.Topic.MeteringMode|null); // UNSPECIFIED, RESERVED_CAPACITY, REQUEST_UNITS + +}; +export type IDropTopicArgs = { + path: string; +}; + +export type IOperationResult = { + readonly operation?: ({ + readonly id?: (string|null); + readonly ready?: (boolean|null); + readonly status?: (Ydb.StatusIds.StatusCode|null); + readonly issues?: (Ydb.Issue.IIssueMessage[]|null); + readonly result?: (google.protobuf.IAny|null); + readonly metadata?: (google.protobuf.IAny|null); + readonly costInfo?: (Ydb.ICostInfo|null); + }|null); +}; + +export class TopicClient { + private service?: InternalTopicClient; constructor(private settings: IClientSettings) { - super(); } /** @@ -38,37 +147,33 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t public destroy(_ctx: Context): void; @ensureContext(true) public async destroy(_ctx: Context): Promise { - // if (this.service) await this.service.destroy(); // TODO: service should be destroyed at the end + // TODO: Close opened readers and writers } // @ts-ignore - public createWriter(args: WriteStreamInitArgs): TopicWriter; - public createWriter(ctx: Context, args: WriteStreamInitArgs): TopicWriter; + public createWriter(args: InternalWriteStreamInitArgs): TopicWriter; + public createWriter(ctx: Context, args: InternalWriteStreamInitArgs): TopicWriter; @ensureContext(true) - public async createWriter(ctx: Context, args: WriteStreamInitArgs) { + public async createWriter(ctx: Context, args: InternalWriteStreamInitArgs) { if (args.getLastSeqNo === undefined) args = {...args, getLastSeqNo: true}; return new TopicWriter(ctx, args, this.settings.retrier, this.settings.discoveryService, this.settings.logger); } // @ts-ignore - public createReader(args: ReadStreamInitArgs): TopicReader; - public createReader(ctx: Context, args: ReadStreamInitArgs): TopicReader; + public createReader(args: InternalReadStreamInitArgs): TopicReader; + public createReader(ctx: Context, args: InternalReadStreamInitArgs): TopicReader; @ensureContext(true) - public async createReader(ctx: Context, args: ReadStreamInitArgs) { + public async createReader(ctx: Context, args: InternalReadStreamInitArgs) { return new TopicReader(ctx, args, this.settings.retrier, this.settings.discoveryService, this.settings.logger); } // TODO: Add commit a queue - same as in writer, to confirm commits // @ts-ignore - public commitOffset(request: CommitOffsetArgs): Promise; - public commitOffset(ctx: Context, request: CommitOffsetArgs): Promise; + public commitOffset(request: ICommitOffsetArgs): Promise; + public commitOffset(ctx: Context, request: ICommitOffsetArgs): Promise; @ensureContext(true) - // TODO: Add retryer - public async commitOffset(ctx: Context, request: CommitOffsetArgs): Promise { - // if (!(typeof request.path === 'string' && request.path!.length > 0)) throw new Error('path is required'); - // if (!(typeof request.consumer === 'string' && request.consumer!.length > 0)) throw new Error('consumer is required'); - // if (!(typeof request.offset !== undefined && request.offset !== null)) throw new Error('offset is required'); + public async commitOffset(ctx: Context, request: ICommitOffsetArgs): Promise { return this.settings.retrier.retry(ctx, /*async*/ () => { return /*await*/ asIdempotentRetryableLambda(async () => { return /*await*/ (await this.nextNodeService()).commitOffset(ctx, request); @@ -77,12 +182,10 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t } // @ts-ignore - public updateOffsetsInTransaction(request: UpdateOffsetsInTransactionArgs): Promise; - public updateOffsetsInTransaction(ctx: Context, request: UpdateOffsetsInTransactionArgs): Promise; + public updateOffsetsInTransaction(request: IUpdateOffsetsInTransactionArgs): Promise; + public updateOffsetsInTransaction(ctx: Context, request: IUpdateOffsetsInTransactionArgs): Promise; @ensureContext(true) - public async updateOffsetsInTransaction(ctx: Context, request: UpdateOffsetsInTransactionArgs): Promise { - // if (!(request.topics && request.topics.length > 0)) throw new Error('topics is required'); - // if (!(typeof request.consumer === 'string' && request.consumer!.length > 0)) throw new Error('consumer is required'); + public async updateOffsetsInTransaction(ctx: Context, request: IUpdateOffsetsInTransactionArgs): Promise { return this.settings.retrier.retry(ctx, /*async*/ () => { return /*await*/ asIdempotentRetryableLambda(async () => { return /*await*/ (await this.nextNodeService()).updateOffsetsInTransaction(ctx, request); @@ -91,11 +194,10 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t } // @ts-ignore - public createTopic(request: CreateTopicArgs): Promise; - public createTopic(ctx: Context, request: CreateTopicArgs): Promise; + public createTopic(request: ICreateTopicArgs): Promise; + public createTopic(ctx: Context, request: ICreateTopicArgs): Promise; @ensureContext(true) - public async createTopic(ctx: Context, request: CreateTopicArgs): Promise { - // if (!(typeof request.path === 'string' && request.path!.length > 0)) throw new Error('path is required'); + public async createTopic(ctx: Context, request: ICreateTopicArgs): Promise { return this.settings.retrier.retry(ctx, /*async*/ () => { return /*await*/ asIdempotentRetryableLambda(async () => { return /*await*/ (await this.nextNodeService()).createTopic(ctx, request); @@ -104,11 +206,10 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t } // @ts-ignore - public describeTopic(request: DescribeTopicArgs): Promise; - public describeTopic(ctx: Context, request: DescribeTopicArgs): Promise; + public describeTopic(request: IDescribeTopicArgs): Promise; + public describeTopic(ctx: Context, request: IDescribeTopicArgs): Promise; @ensureContext(true) - public async describeTopic(ctx: Context, request: DescribeTopicArgs): Promise { - // if (!(typeof request.path === 'string' && request.path!.length > 0)) throw new Error('path is required'); + public async describeTopic(ctx: Context, request: IDescribeTopicArgs): Promise { return this.settings.retrier.retry(ctx, /*async*/ () => { return /*await*/ asIdempotentRetryableLambda(async () => { return /*await*/ (await this.nextNodeService()).describeTopic(ctx, request); @@ -117,11 +218,10 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t } // @ts-ignore - public describeConsumer(request: DescribeConsumerArgs): Promise; - public describeConsumer(ctx: Context, request: DescribeConsumerArgs): Promise; + public describeConsumer(request: IDescribeConsumerArgs): Promise; + public describeConsumer(ctx: Context, request: IDescribeConsumerArgs): Promise; @ensureContext(true) - public async describeConsumer(ctx: Context, request: DescribeConsumerArgs): Promise { - // if (!(typeof request.path === 'string' && request.path!.length > 0)) throw new Error('path is required'); + public async describeConsumer(ctx: Context, request: IDescribeConsumerArgs): Promise { return this.settings.retrier.retry(ctx, /*async*/ () => { return /*await*/ asIdempotentRetryableLambda(async () => { return /*await*/ (await this.nextNodeService()).describeConsumer(ctx, request); @@ -130,11 +230,10 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t } // @ts-ignore - public alterTopic(request: AlterTopicArgs): Promise; - public alterTopic(ctx: Context, request: AlterTopicArgs): Promise; + public alterTopic(request: IAlterTopicArgs): Promise; + public alterTopic(ctx: Context, request: IAlterTopicArgs): Promise; @ensureContext(true) - public async alterTopic(ctx: Context, request: AlterTopicArgs): Promise { - // if (!(typeof request.path === 'string' && request.path!.length > 0)) throw new Error('path is required'); + public async alterTopic(ctx: Context, request: IAlterTopicArgs): Promise { return this.settings.retrier.retry(ctx, /*async*/ () => { return /*await*/ asIdempotentRetryableLambda(async () => { return /*await*/ (await this.nextNodeService()).alterTopic(ctx, request); @@ -143,11 +242,10 @@ export class TopicClient extends EventEmitter { // TODO: Reconsider why I need t } // @ts-ignore - public dropTopic(request: DropTopicArgs): Promise; - public dropTopic(ctx: Context, request: DropTopicArgs): Promise; + public dropTopic(request: IDropTopicArgs): Promise; + public dropTopic(ctx: Context, request: IDropTopicArgs): Promise; @ensureContext(true) - public async dropTopic(ctx: Context, request: DropTopicArgs): Promise { - // if (!(typeof request.path === 'string' && request.path!.length > 0)) throw new Error('path is required'); + public async dropTopic(ctx: Context, request: IDropTopicArgs): Promise { return this.settings.retrier.retry(ctx, /*async*/ () => { return asIdempotentRetryableLambda(async () => { return /*await*/ (await this.nextNodeService()).dropTopic(ctx, request); diff --git a/src/topic/topic-reader.ts b/src/topic/topic-reader.ts index ca10cdff..e9e021b6 100644 --- a/src/topic/topic-reader.ts +++ b/src/topic/topic-reader.ts @@ -1,7 +1,7 @@ import { - ReadStreamInitArgs, - TopicReadStreamWithEvents -} from "./internal/topic-read-stream-with-events"; + InternalReadStreamInitArgs, + InternalTopicReadStream +} from "./internal/internal-topic-read-stream"; import DiscoveryService from "../discovery/discovery-service"; import {RetryLambdaResult, RetryStrategy} from "../retries/retryStrategy"; import {Context, CtxUnsubcribe, ensureContext} from "../context"; @@ -10,15 +10,7 @@ import {closeSymbol} from "./symbols"; import {google, Ydb} from "ydb-sdk-proto"; import Long from "long"; -type IReadResponseFields = Omit; -type IDataFields = Omit; -type IBatchFields = Omit; - -export class Message implements - IReadResponseFields, - IDataFields, - IBatchFields, - Ydb.Topic.StreamReadMessage.ReadResponse.IMessageData { +export class Message { // from IReadResponse bytesSize?: number | Long | null; @@ -41,7 +33,7 @@ export class Message implements uncompressedSize?: number | Long | null; constructor( - private innerReader: TopicReadStreamWithEvents, + private innerReader: InternalTopicReadStream, partition: Ydb.Topic.StreamReadMessage.ReadResponse.IPartitionData, batch: Ydb.Topic.StreamReadMessage.ReadResponse.IBatch, message: Ydb.Topic.StreamReadMessage.ReadResponse.IMessageData, @@ -84,7 +76,7 @@ export class TopicReader { private attemptPromiseReject?: (value: any) => void; private queue: Message[] = []; private waitNextResolve?: (value: unknown) => void; - private innerReadStream?: TopicReadStreamWithEvents; + private innerReadStream?: InternalTopicReadStream; private closePromise?: Promise; private _messages?: { [Symbol.asyncIterator]: () => AsyncGenerator }; @@ -120,7 +112,7 @@ export class TopicReader { return this._messages!; } - constructor(private ctx: Context, private readStreamArgs: ReadStreamInitArgs, private retrier: RetryStrategy, private discovery: DiscoveryService, private logger: Logger) { + constructor(private ctx: Context, private readStreamArgs: InternalReadStreamInitArgs, private retrier: RetryStrategy, private discovery: DiscoveryService, private logger: Logger) { logger.trace('%s: new TopicReader', ctx); if (!(readStreamArgs.receiveBufferSizeInBytes > 0)) throw new Error('receivingBufferSize must be greater than 0'); let onCancelUnsub: CtxUnsubcribe; @@ -165,7 +157,7 @@ export class TopicReader { private async initInnerStream(ctx: Context) { this.logger.trace('%s: TopicReader.initInnerStream()', ctx); - this.innerReadStream = new TopicReadStreamWithEvents(ctx, this.readStreamArgs, await this.discovery.getTopicNodeClient(), this.logger); + this.innerReadStream = new InternalTopicReadStream(ctx, this.readStreamArgs, await this.discovery.getTopicNodeClient(), this.logger); // this.innerReadStream.events.on('initResponse', async (resp) => { // try { @@ -271,7 +263,7 @@ export class TopicReader { } }); - this.innerReadStream.readRequest(ctx,{ + this.innerReadStream.readRequest(ctx, { bytesSize: this.readStreamArgs.receiveBufferSizeInBytes, }); } diff --git a/src/topic/topic-writer.ts b/src/topic/topic-writer.ts index e38da8d4..7f4dafb8 100644 --- a/src/topic/topic-writer.ts +++ b/src/topic/topic-writer.ts @@ -1,23 +1,35 @@ import { - TopicWriteStreamWithEvents, - WriteStreamInitArgs, - WriteStreamWriteArgs, WriteStreamWriteResult -} from "./internal/topic-write-stream-with-events"; + InternalTopicWriteStream, + InternalWriteStreamInitArgs, +} from "./internal/internal-topic-write-stream"; import {Logger} from "../logger/simple-logger"; import {RetryLambdaResult, RetryStrategy} from "../retries/retryStrategy"; import {Context, CtxUnsubcribe, ensureContext} from "../context"; import Long from "long"; import {closeSymbol} from "./symbols"; -import {Ydb} from "ydb-sdk-proto"; +import {google, Ydb} from "ydb-sdk-proto"; import DiscoveryService from "../discovery/discovery-service"; -type SendMessagesResult = - Omit - & Ydb.Topic.StreamWriteMessage.WriteResponse.IWriteAck; +export interface SendArgs { + messages: ({ + data: Uint8Array; + seqNo?: (number|Long|null); + createdAt?: (google.protobuf.ITimestamp|null); + uncompressedSize?: (number|Long|null); + messageGroupId?: (string|null); + partitionId?: (number|Long|null); + metadataItems?: (Ydb.Topic.IMetadataItem[]|null); + }[]|null); + codec?: (number|null); + tx?: (Ydb.Topic.ITransactionIdentity|null); +} + +export type SendResult = { +} type messageQueueItem = { - args: WriteStreamWriteArgs, - resolve: (value: SendMessagesResult | PromiseLike) => void, + args: SendArgs, + resolve: (value: SendResult | PromiseLike) => void, reject: (reason?: any) => void }; @@ -29,11 +41,11 @@ export class TopicWriter { private getLastSeqNo?: boolean; // true if client to proceed sequence based on last known seqNo private lastSeqNo?: Long.Long; private attemptPromiseReject?: (value: any) => void; - private innerWriteStream?: TopicWriteStreamWithEvents; + private innerWriteStream?: InternalTopicWriteStream; constructor( ctx: Context, - private writeStreamArgs: WriteStreamInitArgs, + private writeStreamArgs: InternalWriteStreamInitArgs, private retrier: RetryStrategy, private discovery: DiscoveryService, private logger: Logger) { @@ -95,7 +107,7 @@ export class TopicWriter { delete this.writeStreamArgs.getLastSeqNo; } delete this.firstInnerStreamInitResp; - const stream = new TopicWriteStreamWithEvents(ctx, this.writeStreamArgs, await this.discovery.getTopicNodeClient(), this.logger); + const stream = new InternalTopicWriteStream(ctx, this.writeStreamArgs, await this.discovery.getTopicNodeClient(), this.logger); stream.events.on('initResponse', (resp) => { this.logger.trace('%s: TopicWriter.on "initResponse"', ctx); try { @@ -191,10 +203,10 @@ export class TopicWriter { } // @ts-ignore - public sendMessages(sendMessagesArgs: WriteStreamWriteArgs): Promise; - public sendMessages(ctx: Context, sendMessagesArgs: WriteStreamWriteArgs): Promise; + public send(sendMessagesArgs: SendArgs): Promise; + public send(ctx: Context, sendMessagesArgs: SendArgs): Promise; @ensureContext(true) - public sendMessages(ctx: Context, sendMessagesArgs: WriteStreamWriteArgs): Promise { + public send(ctx: Context, sendMessagesArgs: SendArgs): Promise { this.logger.trace('%s: TopicWriter.sendMessages()', ctx); if (this.reasonForClose) return Promise.reject(this.reasonForClose); sendMessagesArgs.messages?.forEach((msg) => { @@ -207,7 +219,7 @@ export class TopicWriter { if (msg.seqNo === undefined || msg.seqNo === null) throw new Error('Writer was created without getLastSeqNo = true, explicit seqNo must be provided'); } }); - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { this.messageQueue.push({args: sendMessagesArgs, resolve, reject}) this.innerWriteStream?.writeRequest(ctx, sendMessagesArgs); });