diff --git a/src/__tests__/e2e/topic-service/topic-service.test.ts b/src/__tests__/e2e/topic-service/topic-service.test.ts index 18321841..4133cb04 100644 --- a/src/__tests__/e2e/topic-service/topic-service.test.ts +++ b/src/__tests__/e2e/topic-service/topic-service.test.ts @@ -21,30 +21,25 @@ describe('Topic: General', () => { if (topicService) topicService.dispose(); }); - it.only('write: simple', async () => { - let waitResolve: any, waitPromise: Promise; - + it('general', async () => { await topicService.createTopic({ - path: 'myTopic2', + path: 'myTopic', }); console.info(`Service created`); - const writer = await topicService.openWriteStream({ - path: 'myTopic2', + const writer = await topicService.openWriteStreamWithEvent({ + path: 'myTopic', }); writer.events.on('error', (err) => { - console.error(err); + console.error('Writer error:', err); }); console.info(`Topic writer created`); - waitPromise = new Promise((resolve) => { - waitResolve = resolve; - }); - writer.events.on('initResponse', (_v) => { - waitResolve(); + await stepResult(`Writer initialized`, (resolve) => { + writer.events.once('initResponse', (_v) => { + resolve(undefined); + }); }); - await waitPromise; - console.info(`Writer initialized`); await writer.writeRequest({ // tx: @@ -62,64 +57,63 @@ describe('Topic: General', () => { // metadataItems: // TODO: Should I use this? }], }); - - waitPromise = new Promise((resolve) => { - waitResolve = resolve; - }); - writer.events.on("writeResponse", (_v) => { - waitResolve(); + await stepResult(`Message sent`, (resolve) => { + writer.events.once("writeResponse", (_v) => { + resolve(undefined); + }); }); - await waitPromise; - console.info(`Message sent`); - await writer.dispose(); - console.info(`Writer disposed`); + writer.close(); + await stepResult(`Writer closed`, (resolve) => { + writer.events.once("end", () => { + resolve(undefined); + }); + }); ///////////////////////////////////////////////// // Now read the message - const reader = await topicService.openReadStream({ - readerName: 'reasder1', - consumer: 'testC', - topicsReadSettings: [{ - path: 'myTopic2', - // partitionIds: [1], - }], - }); - reader.events.on('error', (err) => { - console.error(err); - }); - - waitPromise = new Promise((resolve) => { - waitResolve = resolve; - }); - reader.events.on('initResponse', (_v) => { - waitResolve(); - }); - await waitPromise; - console.info(`Topic reader created`); - - // reader.readRequest({ + // const reader = await topicService.openReadStreamWithEvents({ + // readerName: 'reader1', + // consumer: 'testC', + // topicsReadSettings: [{ + // path: 'myTopic2', + // partitionIds: [1], + // }], + // }); + // reader.events.on('error', (err) => { + // console.error('Reader error:', err); // }); // - // waitPromise = new Promise((resolve) => { - // waitResolve = resolve; + // await stepResult(`Topic reader created`, (resolve) => { + // reader.events.once("initResponse", () => { + // resolve(undefined); + // }); // }); - // reader.events.on('readResponse', (v) => { - // console.info(`Message read: ${v}`) - // waitResolve(); + // + // await stepResult(`Start partition`, (resolve) => { + // reader.events.once('startPartitionSessionRequest', (v) => { + // console.info(`Partition: ${v}`) + // reader.startPartitionSessionResponse({ + // partitionSessionId: v.partitionSession?.partitionSessionId, + // }); + // resolve(undefined); + // }); + // }); + // + // await stepResult(`Message read`, (resolve) => { + // reader.events.once('readResponse', (v) => { + // console.info(`Message: ${v}`) + // resolve(undefined); + // }); + // }); + // + // reader.close(); + // await stepResult(`Reader closed`, (resolve) => { + // reader.events.once("end", () => { + // resolve(undefined); + // }); // }); - // await waitPromise; - - await reader.dispose(); - console.info(`Reader disposed`); - - await topicService.dispose(); - console.info(`Topic service disposed`); - }); - - it('read: simple', async () => { - }); async function testOnOneSessionWithoutDriver() { @@ -140,4 +134,16 @@ describe('Topic: General', () => { logger, ); } + + async function stepResult(message: String, cb: (resolve: (value: T | PromiseLike) => void, reject: (reason?: any) => void) => T): Promise { + return new Promise((resolve, reject) => { + try { + cb(resolve, reject); + console.info(message); + } catch (err) { + reject(err); + console.error('Step failed:', err); + } + }); + } }); diff --git a/src/driver.ts b/src/driver.ts index 264502fa..0ef1f7c9 100644 --- a/src/driver.ts +++ b/src/driver.ts @@ -11,6 +11,7 @@ import {parseConnectionString} from "./utils/parse-connection-string"; import {QueryClient} from "./query"; import {Logger} from "./logger/simple-logger"; import {getDefaultLogger} from "./logger/get-default-logger"; +import {TopicService} from "./topic"; export interface IPoolSettings { minLimit?: number; @@ -38,10 +39,25 @@ export default class Driver { private clientOptions?: ClientOptions; private logger: Logger; private discoveryService: DiscoveryService; + private _topicClient?: TopicService; - public tableClient: TableClient; - public queryClient: QueryClient; - public schemeClient: SchemeService; + public readonly tableClient: TableClient; + public readonly queryClient: QueryClient; + public readonly schemeClient: SchemeService; + + public async getTopicClient() { + if (!this._topicClient) { + this._topicClient = new TopicService( + await this.discoveryService.getEndpoint(), + this.database, + this.authService, + this.logger, + this.sslCredentials, + this.clientOptions, + ); + } + return this._topicClient; + } constructor(settings: IDriverSettings) { this.logger = settings.logger || getDefaultLogger(); diff --git a/src/topic/README.md b/src/topic/README.md index 006a6dad..2a7713c8 100644 --- a/src/topic/README.md +++ b/src/topic/README.md @@ -6,9 +6,8 @@ Notice: This API is EXPERIMENTAL and may be changed or removed in a later releas # TODO -- Internal service implementation -- Client - Retryer -- Update token on streams +- Promise like streams wrappers +- Update auth token on streams - Compression - Transactions diff --git a/src/topic/topic-read-stream.ts b/src/topic/topic-read-stream-with-event.ts similarity index 86% rename from src/topic/topic-read-stream.ts rename to src/topic/topic-read-stream-with-event.ts index 0e23c97a..120ced9c 100644 --- a/src/topic/topic-read-stream.ts +++ b/src/topic/topic-read-stream-with-event.ts @@ -41,8 +41,6 @@ export type ReadStreamStopPartitionSessionArgs = Ydb.Topic.StreamReadMessage.ISt export type ReadStreamStopPartitionSessionResult = Ydb.Topic.StreamReadMessage.IStopPartitionSessionResponse; // & Required>; -export const STREAM_DESTROYED = 'stream-destroyed'; - type ReadStreamEvents = { initResponse: (resp: ReadStreamInitResult) => void, readResponse: (resp: ReadStreamReadResult) => void, @@ -50,11 +48,8 @@ type ReadStreamEvents = { partitionSessionStatusResponse: (resp: ReadStreamPartitionSessionStatusResult) => void, startPartitionSessionRequest: (resp: ReadStreamStartPartitionSessionArgs) => void, stopPartitionSessionRequest: (resp: ReadStreamStopPartitionSessionArgs) => void, - updateTokenResponse: (resp: ReadStreamUpdateTokenResult) => void, - error: (err: Error) => void, - 'stream-destroyed': (stream: { dispose: () => {} }) => void, // TODO: Is end is not enough end: (cause: any) => void, } @@ -65,7 +60,7 @@ export const enum TopicWriteStreamState { Closed } -export class TopicReadStream { +export class TopicReadStreamWithEvent { public events = new EventEmitter() as TypedEmitter; private _state: TopicWriteStreamState = TopicWriteStreamState.Init; @@ -86,8 +81,16 @@ export class TopicReadStream { (v: Ydb.Topic.StreamReadMessage.IFromClient) => Ydb.Topic.StreamReadMessage.FromClient.encode(v).finish() as Buffer, Ydb.Topic.StreamReadMessage.FromServer.decode, this.topicService.metadata); + + //// Uncomment to see all events + const stream = this.readBidiStream; + const oldEmit = stream.emit; + stream.emit = ((...args) => { + console.info('read event:', args); + return oldEmit.apply(stream, args as unknown as ['readable']); + }) as typeof oldEmit; + this.readBidiStream.on('data', (value) => { - console.info(2000, value) try { YdbError.checkStatus(value!) } catch (err) { @@ -108,29 +111,22 @@ export class TopicReadStream { if (TransportError.isMember(err)) err = TransportError.convertToYdbError(err); this.events.emit('error', err); }) - // this.writeBidiStream.on('status', (v) => { - // console.info(8200, v); - // }) - // this.writeBidiStream.on('metadata', (v) => { - // console.info(8000, v); - // }) - // this.writeBidiStream.on('finish', (v: any) => { - // console.info(8060, v); - // }) - + this.readBidiStream.on('end', () => { + this._state = TopicWriteStreamState.Closed; + delete this.readBidiStream; // so there was no way to send more messages + }); this.initRequest(opts); }; private initRequest(opts: ReadStreamInitArgs) { - if (!this.readBidiStream) throw new Error('Stream is not opened') - this.readBidiStream.write( + this.readBidiStream!.write( Ydb.Topic.StreamReadMessage.create({ initRequest: Ydb.Topic.StreamReadMessage.InitRequest.create(opts), })); } public readRequest(opts: ReadStreamReadArgs) { - if (!this.readBidiStream) throw new Error('Stream is not opened') + if (!this.readBidiStream) throw new Error('Stream is closed') this.readBidiStream.write( Ydb.Topic.StreamReadMessage.FromClient.create({ readRequest: Ydb.Topic.StreamReadMessage.ReadRequest.create(opts), @@ -138,7 +134,7 @@ export class TopicReadStream { } public commitOffsetRequest(opts: ReadStreamCommitOffsetArgs) { - if (!this.readBidiStream) throw new Error('Stream is not opened') + if (!this.readBidiStream) throw new Error('Stream is closed') this.readBidiStream.write( Ydb.Topic.StreamReadMessage.FromClient.create({ commitOffsetRequest: Ydb.Topic.StreamReadMessage.CommitOffsetRequest.create(opts), @@ -146,7 +142,7 @@ export class TopicReadStream { } public partitionSessionStatusRequest(opts: ReadStreamPartitionSessionStatusArgs) { - if (!this.readBidiStream) throw new Error('Stream is not opened') + if (!this.readBidiStream) throw new Error('Stream is closed') this.readBidiStream.write( Ydb.Topic.StreamReadMessage.FromClient.create({ partitionSessionStatusRequest: Ydb.Topic.StreamReadMessage.PartitionSessionStatusRequest.create(opts), @@ -154,7 +150,7 @@ export class TopicReadStream { } public updateTokenRequest(opts: ReadStreamUpdateTokenArgs) { - if (!this.readBidiStream) throw new Error('Stream is not opened') + if (!this.readBidiStream) throw new Error('Stream is closed') this.readBidiStream.write( Ydb.Topic.StreamReadMessage.FromClient.create({ updateTokenRequest: Ydb.Topic.UpdateTokenRequest.create(opts), @@ -162,7 +158,7 @@ export class TopicReadStream { } public startPartitionSessionResponse(opts: ReadStreamStartPartitionSessionResult) { - if (!this.readBidiStream) throw new Error('Stream is not opened') + if (!this.readBidiStream) throw new Error('Stream is closed') this.readBidiStream.write( Ydb.Topic.StreamReadMessage.FromClient.create({ startPartitionSessionResponse: Ydb.Topic.StreamReadMessage.StartPartitionSessionResponse.create(opts), @@ -170,7 +166,7 @@ export class TopicReadStream { } public stopPartitionSessionResponse(opts: ReadStreamStopPartitionSessionResult) { - if (!this.readBidiStream) throw new Error('Stream is not opened') + if (!this.readBidiStream) throw new Error('Stream is closed') this.readBidiStream.write( Ydb.Topic.StreamReadMessage.FromClient.create({ stopPartitionSessionResponse: Ydb.Topic.StreamReadMessage.StopPartitionSessionResponse.create(opts), @@ -178,15 +174,14 @@ export class TopicReadStream { } public async close() { - if (!this.readBidiStream) throw new Error('Stream is not opened') + if (!this.readBidiStream) throw new Error('Stream is closed') + this._state = TopicWriteStreamState.Closing; this.readBidiStream.end(); delete this.readBidiStream; // so there was no way to send more messages - // TODO: Is there a way to keep waiting for later ACKs? } public async dispose() { await this.close(); - this.events.emit(STREAM_DESTROYED, this); this._state = TopicWriteStreamState.Closed; } diff --git a/src/topic/topic-service.ts b/src/topic/topic-service.ts index f23856f9..12cc7de9 100644 --- a/src/topic/topic-service.ts +++ b/src/topic/topic-service.ts @@ -6,14 +6,13 @@ import ICreateTopicResult = Ydb.Topic.ICreateTopicResult; import {AuthenticatedService, ClientOptions} from "../utils"; import {IAuthService} from "../credentials/i-auth-service"; import {ISslCredentials} from "../utils/ssl-credentials"; -import {TopicWriteStream, STREAM_DESTROYED, WriteStreamInitArgs} from "./topic-write-stream"; -import {TopicReadStream, ReadStreamInitArgs} from "./topic-read-stream"; +import {TopicWriteStreamWithEvent, WriteStreamInitArgs} from "./topic-write-stream-with-event"; +import {TopicReadStreamWithEvent, ReadStreamInitArgs} from "./topic-read-stream-with-event"; -// TODO: Typed events // TODO: Proper stream close/dispose and a reaction on end of stream from server // TODO: Retries with the same options // TODO: Batches -// TODO: Zip +// TODO: Zip compression // TODO: Sync queue // TODO: Make as close as posible to pythone API // TODO: Regular auth token update @@ -46,7 +45,7 @@ type DropTopicResult = Ydb.Topic.DropTopicResponse; export class TopicService extends AuthenticatedService implements ICreateTopicResult { public endpoint: Endpoint; private readonly logger: Logger; - private allStreams: { dispose(): void }[] = []; + private allStreams: { close(): void }[] = []; constructor(endpoint: Endpoint, database: string, authService: IAuthService, logger: Logger, sslCredentials?: ISslCredentials, clientOptions?: ClientOptions) { const host = endpoint.toString(); @@ -55,30 +54,30 @@ export class TopicService extends AuthenticatedService { - s.dispose() + s.close() }); } - public async openWriteStream(opts: WriteStreamInitArgs) { + public async openWriteStreamWithEvent(opts: WriteStreamInitArgs) { await this.updateMetadata(); // TODO: Check for update on every message - const writerStream = new TopicWriteStream(opts, this, this.logger); - writerStream.events.once(STREAM_DESTROYED, (stream: { dispose: () => {} }) => { - const index = this.allStreams.findIndex(v => v === stream) + const writerStream = new TopicWriteStreamWithEvent(opts, this, this.logger); + writerStream.events.once('end', () => { + const index = this.allStreams.findIndex(v => v === writerStream) if (index >= 0) this.allStreams.splice(index, 1); }); this.allStreams.push(writerStream); // TODO: Is is possible to have multiple streams in a time? I.e. while server errors return writerStream; } - public async openReadStream(opts: ReadStreamInitArgs) { + public async openReadStreamWithEvents(opts: ReadStreamInitArgs) { await this.updateMetadata(); // TODO: Check for update on every message - const readStream = new TopicReadStream(opts, this, this.logger); - readStream.events.once(STREAM_DESTROYED, (stream: { dispose: () => {} }) => { - const index = this.allStreams.findIndex(v => v === stream) + const readStream = new TopicReadStreamWithEvent(opts, this, this.logger); + readStream.events.once('end', () => { + const index = this.allStreams.findIndex(v => v === readStream) if (index >= 0) this.allStreams.splice(index, 1); }); this.allStreams.push(readStream); // TODO: Is is possible to have multiple streams in a time? I.e. while server errors diff --git a/src/topic/topic-write-stream.ts b/src/topic/topic-write-stream-with-event.ts similarity index 77% rename from src/topic/topic-write-stream.ts rename to src/topic/topic-write-stream-with-event.ts index 716fee54..635a13f3 100644 --- a/src/topic/topic-write-stream.ts +++ b/src/topic/topic-write-stream-with-event.ts @@ -27,17 +27,12 @@ export type WriteStreamUpdateTokenResult = Readonly; // & Required>; -export const STREAM_DESTROYED = 'stream-destroyed'; - type WriteStreamEvents = { initResponse: (resp: WriteStreamInitResult) => void, writeResponse: (resp: WriteStreamWriteResult) => void, - updateTokenResponse: (resp: WriteStreamUpdateTokenResult) => void, - error: (err: Error) => void, - 'stream-destroyed': (stream: { dispose: () => {} }) => void, // TODO: Is end is not enough - end: (cause: any) => void, + end: () => void, } export const enum TopicWriteStreamState { @@ -47,7 +42,7 @@ export const enum TopicWriteStreamState { Closed } -export class TopicWriteStream { +export class TopicWriteStreamWithEvent { public events = new EventEmitter() as TypedEmitter; private _state: TopicWriteStreamState = TopicWriteStreamState.Init; @@ -69,6 +64,14 @@ export class TopicWriteStream { Ydb.Topic.StreamWriteMessage.FromServer.decode, this.topicService.metadata); + //// Uncomment to see all events + const stream = this.writeBidiStream; + const oldEmit = stream.emit; + stream.emit = ((...args) => { + console.info('write event:', args); + return oldEmit.apply(stream, args as unknown as ['readable']); + }) as typeof oldEmit; + this.writeBidiStream.on('data', (value) => { try { YdbError.checkStatus(value!) @@ -80,34 +83,30 @@ export class TopicWriteStream { else if (value!.initResponse) { this._state = TopicWriteStreamState.Active; this.events.emit('initResponse', value!.initResponse!); - } else if (value!.updateTokenResponse) this.events.emit('writeResponse', value!.updateTokenResponse!); - }) + } else if (value!.updateTokenResponse) this.events.emit('updateTokenResponse', value!.updateTokenResponse!); + }); this.writeBidiStream.on('error', (err) => { if (TransportError.isMember(err)) err = TransportError.convertToYdbError(err); this.events.emit('error', err); - }) - // this.writeBidiStream.on('status', (v) => { - // console.info(8200, v); - // }) - // this.writeBidiStream.on('metadata', (v) => { - // console.info(8000, v); - // }) - // this.writeBidiStream.on('finish', (v: any) => { - // console.info(8060, v); - // }) - this.initRequest(opts); // TODO: Think of retry cycle + }); + this.writeBidiStream.on('end', () => { + console.info(3000) + this._state = TopicWriteStreamState.Closed; + delete this.writeBidiStream; // so there was no way to send more messages + setTimeout(() => this.events.emit('end'), 0); + }); + this.initRequest(opts); }; private initRequest(opts: WriteStreamInitArgs) { - if (!this.writeBidiStream) throw new Error('Stream is not opened') - this.writeBidiStream.write( + this.writeBidiStream!.write( Ydb.Topic.StreamWriteMessage.FromClient.create({ initRequest: Ydb.Topic.StreamWriteMessage.InitRequest.create(opts), })); } public writeRequest(opts: WriteStreamWriteArgs) { - if (!this.writeBidiStream) throw new Error('Stream is not opened') + if (!this.writeBidiStream) throw new Error('Stream is closed') this.writeBidiStream.write( Ydb.Topic.StreamWriteMessage.FromClient.create({ writeRequest: Ydb.Topic.StreamWriteMessage.WriteRequest.create(opts), @@ -115,25 +114,22 @@ export class TopicWriteStream { } public updateTokenRequest(opts: WriteStreamUpdateTokenArgs) { - if (!this.writeBidiStream) throw new Error('Stream is not opened') + if (!this.writeBidiStream) throw new Error('Stream is closed') this.writeBidiStream.write( Ydb.Topic.StreamWriteMessage.FromClient.create({ updateTokenRequest: Ydb.Topic.UpdateTokenRequest.create(opts), })); } - public async close() { - if (!this.writeBidiStream) throw new Error('Stream is not opened') + public close() { + if (!this.writeBidiStream) return; + this._state = TopicWriteStreamState.Closing; this.writeBidiStream.end(); delete this.writeBidiStream; // so there was no way to send more messages - // TODO: Is there a way to keep waiting for later ACKs? + // TODO: Should be a way to keep waiting for later ACKs? } - public async dispose() { - await this.close(); - this.events.emit(STREAM_DESTROYED, this); - this._state = TopicWriteStreamState.Closed; - } + // TODO: Add [dispose] that call close() // TODO: Update token when the auth provider returns a new one }