From 0e1de988c92237995cc82ffc0db92dbb9c019dec Mon Sep 17 00:00:00 2001 From: Alexey Zorkaltsev Date: Tue, 23 Jul 2024 15:16:32 +0300 Subject: [PATCH] chore: initial MVP of Topic service --- .../e2e/topic-service/non-jest-test.ts | 112 ++++++++++++++++ .../e2e/topic-service/topic-service.test.ts | 120 +++++++++++++----- src/topic/topic-read-stream.ts | 101 ++++++++------- src/topic/topic-service.ts | 3 +- src/topic/topic-write-stream.ts | 77 +++++------ 5 files changed, 290 insertions(+), 123 deletions(-) create mode 100644 src/__tests__/e2e/topic-service/non-jest-test.ts diff --git a/src/__tests__/e2e/topic-service/non-jest-test.ts b/src/__tests__/e2e/topic-service/non-jest-test.ts new file mode 100644 index 00000000..9a597739 --- /dev/null +++ b/src/__tests__/e2e/topic-service/non-jest-test.ts @@ -0,0 +1,112 @@ +import {getDefaultLogger} from "../../../logger/get-default-logger"; +import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service"; +import DiscoveryService from "../../../discovery/discovery-service"; +import {ENDPOINT_DISCOVERY_PERIOD} from "../../../constants"; +import {TopicService} from "../../../topic"; +// @ts-ignore +import {google, Ydb} from "ydb-sdk-proto"; + +const DATABASE = '/local'; +const ENDPOINT = 'grpc://localhost:2136'; + +async function testOnOneSessionWithoutDriver() { + const logger = getDefaultLogger(); + const authService = new AnonymousAuthService(); + const discoveryService = new DiscoveryService({ + endpoint: ENDPOINT, + database: DATABASE, + authService, + discoveryPeriod: ENDPOINT_DISCOVERY_PERIOD, + logger, + }); + await discoveryService.ready(ENDPOINT_DISCOVERY_PERIOD); + const topicService = new TopicService( + await discoveryService.getEndpoint(), // TODO: Should be one per endpoint + DATABASE, + authService, + logger, + ); + return topicService; +} + +(async () => { + const topicService = await testOnOneSessionWithoutDriver(); + + console.info(1000); + await topicService.createTopic({ + path: 'MyTopic', + }); + + const writer = await topicService.openWriteStream({ + path: 'MyTopic', + }); + + // @ts-ignore + let waitResolve: any; + // @ts-ignore + let waitPromise = new Promise((resolve) => { + waitResolve = resolve; + }) + + writer.events.on('initResponse', (v) => { + console.info(3900, v); + waitResolve(); + }); + + waitPromise = new Promise((resolve) => { + waitResolve = resolve; + }) + writer.events.on("writeResponse", (v) => { + console.info(4000, v); + // waitResolve(); + }) + + writer.writeRequest({ + codec: Ydb.Topic.Codec.CODEC_RAW, + messages: [{ + data: Buffer.alloc(1000, 'test messsage'), + uncompressedSize: 'test messsage'.length, + seqNo: 1, + createdAt: google.protobuf.Timestamp.create({ + seconds: Date.now() / 1000, + nanos: Date.now() % 1000, + }), + messageGroupId: 'abc', // TODO: Check examples + partitionId: 1, + // metadataItems: // TODO: Should I use this? + }], + }); + + console.info(4100); + + // await new Promise((resolve) => setTimeout(resolve, 4_000)); + + // console.info(1300); + // + await writer.dispose(); + + // ----- + + const reader = await topicService.openReadStream({ + readerName: 'reasder1', + consumer: 'testConsumer', + topicsReadSettings: [{ + path: 'MyTopic', + partitionIds: [1], + }], + }); + + waitPromise = new Promise((resolve) => { + waitResolve = resolve; + }) + reader.events.on("readResponse", (data) => { + console.info(`Read from myTopic ${JSON.stringify(data, null, 2)}`); + waitResolve(); + }); + + await waitPromise; + + await reader.dispose(); + + console.info(`Test completed`); +})(); diff --git a/src/__tests__/e2e/topic-service/topic-service.test.ts b/src/__tests__/e2e/topic-service/topic-service.test.ts index 83ecebec..18321841 100644 --- a/src/__tests__/e2e/topic-service/topic-service.test.ts +++ b/src/__tests__/e2e/topic-service/topic-service.test.ts @@ -3,7 +3,7 @@ import {ENDPOINT_DISCOVERY_PERIOD} from "../../../constants"; import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service"; import {getDefaultLogger} from "../../../logger/get-default-logger"; import {TopicService} from "../../../topic"; -// import {google, Ydb} from "ydb-sdk-proto"; +import {google, Ydb} from "ydb-sdk-proto"; const DATABASE = '/local'; const ENDPOINT = 'grpc://localhost:2136'; @@ -22,46 +22,100 @@ describe('Topic: General', () => { }); it.only('write: simple', async () => { - console.info(1000); + let waitResolve: any, waitPromise: Promise; + await topicService.createTopic({ - path: 'MyTopic', + path: 'myTopic2', }); + console.info(`Service created`); const writer = await topicService.openWriteStream({ - path: 'MyTopic', + path: 'myTopic2', + }); + writer.events.on('error', (err) => { + console.error(err); + }); + console.info(`Topic writer created`); + + waitPromise = new Promise((resolve) => { + waitResolve = resolve; + }); + writer.events.on('initResponse', (_v) => { + waitResolve(); }); + await waitPromise; + console.info(`Writer initialized`); - // expect() - - // writer.events.on('initResponse', (resp) => { - // resp. - // }) - - // writer.write(Ydb.Topic.StreamWriteMessage.WriteRequest.create({ - // messages: [ - // Ydb.Topic.StreamWriteMessage.WriteRequest.MessageData.create({ - // seqNo: 1, - // createdAt: google.protobuf.Timestamp.create({ - // seconds: 100, - // nanos: 0, - // }), - // // metadataItems: [ - // // Ydb.Topic.MetadataItem.create({ - // // key: 'a', - // // value: [0, 1], - // // }), - // // ] - // // uncompressedSize: 100, - // // data: new Buffer(), - // }), - // ], - // })); - - await new Promise((resolve) => setTimeout(resolve, 4_000)); - - console.info(1300); + await writer.writeRequest({ + // tx: + codec: Ydb.Topic.Codec.CODEC_RAW, + messages: [{ + data: Buffer.alloc(100, '1234567890'), + uncompressedSize: '1234567890'.length, + seqNo: 1, + createdAt: google.protobuf.Timestamp.create({ + seconds: Date.now() / 1000, + nanos: Date.now() % 1000, + }), + messageGroupId: 'abc', // TODO: Check examples + partitionId: 1, + // metadataItems: // TODO: Should I use this? + }], + }); + + waitPromise = new Promise((resolve) => { + waitResolve = resolve; + }); + writer.events.on("writeResponse", (_v) => { + waitResolve(); + }); + await waitPromise; + console.info(`Message sent`); await writer.dispose(); + console.info(`Writer disposed`); + + ///////////////////////////////////////////////// + // Now read the message + + const reader = await topicService.openReadStream({ + readerName: 'reasder1', + consumer: 'testC', + topicsReadSettings: [{ + path: 'myTopic2', + // partitionIds: [1], + }], + }); + reader.events.on('error', (err) => { + console.error(err); + }); + + waitPromise = new Promise((resolve) => { + waitResolve = resolve; + }); + reader.events.on('initResponse', (_v) => { + waitResolve(); + }); + await waitPromise; + console.info(`Topic reader created`); + + // reader.readRequest({ + // }); + // + // waitPromise = new Promise((resolve) => { + // waitResolve = resolve; + // }); + // reader.events.on('readResponse', (v) => { + // console.info(`Message read: ${v}`) + // waitResolve(); + // }); + // await waitPromise; + + await reader.dispose(); + console.info(`Reader disposed`); + + await topicService.dispose(); + console.info(`Topic service disposed`); }); it('read: simple', async () => { diff --git a/src/topic/topic-read-stream.ts b/src/topic/topic-read-stream.ts index 672db897..0e23c97a 100644 --- a/src/topic/topic-read-stream.ts +++ b/src/topic/topic-read-stream.ts @@ -1,44 +1,44 @@ import {Logger} from "../logger/simple-logger"; import {Ydb} from "ydb-sdk-proto"; -import {ClientWritableStream/*, ServiceError*/} from "@grpc/grpc-js/build/src/call"; import EventEmitter from "events"; import {TransportError, YdbError} from "../errors"; import TypedEmitter from "typed-emitter/rxjs"; import {TopicService} from "./topic-service"; +import {ClientDuplexStream} from "@grpc/grpc-js/build/src/call"; -export type ReadStreamInitArgs = Ydb.Topic.StreamReadMessage.InitRequest; +export type ReadStreamInitArgs = Ydb.Topic.StreamReadMessage.IInitRequest; // & Required>; -export type ReadStreamInitResult = Ydb.Topic.StreamReadMessage.InitResponse; +export type ReadStreamInitResult = Ydb.Topic.StreamReadMessage.IInitResponse; // & Required>; -export type ReadStreamReadArgs = Ydb.Topic.StreamReadMessage.ReadRequest; +export type ReadStreamReadArgs = Ydb.Topic.StreamReadMessage.IReadRequest; // & Required>; -export type ReadStreamReadResult = Ydb.Topic.StreamReadMessage.ReadResponse; +export type ReadStreamReadResult = Ydb.Topic.StreamReadMessage.IReadResponse; // & Required>; -export type ReadStreamCommitOffsetArgs = Ydb.Topic.StreamReadMessage.CommitOffsetRequest; +export type ReadStreamCommitOffsetArgs = Ydb.Topic.StreamReadMessage.ICommitOffsetRequest; // & Required>; -export type ReadStreamCommitOffsetResult = Ydb.Topic.StreamReadMessage.CommitOffsetResponse; +export type ReadStreamCommitOffsetResult = Ydb.Topic.StreamReadMessage.ICommitOffsetResponse; // & Required>; -export type ReadStreamPartitionSessionStatusArgs = Ydb.Topic.StreamReadMessage.PartitionSessionStatusRequest; +export type ReadStreamPartitionSessionStatusArgs = Ydb.Topic.StreamReadMessage.IPartitionSessionStatusRequest; // & Required>; -export type ReadStreamPartitionSessionStatusResult = Ydb.Topic.StreamReadMessage.PartitionSessionStatusResponse; +export type ReadStreamPartitionSessionStatusResult = Ydb.Topic.StreamReadMessage.IPartitionSessionStatusResponse; // & Required>; -export type ReadStreamUpdateTokenArgs = Ydb.Topic.UpdateTokenRequest; +export type ReadStreamUpdateTokenArgs = Ydb.Topic.IUpdateTokenRequest; // & Required>; -export type ReadStreamUpdateTokenResult = Ydb.Topic.UpdateTokenResponse; +export type ReadStreamUpdateTokenResult = Ydb.Topic.IUpdateTokenResponse; // & Required>; -export type ReadStreamStartPartitionSessionArgs = Ydb.Topic.StreamReadMessage.StartPartitionSessionRequest; +export type ReadStreamStartPartitionSessionArgs = Ydb.Topic.StreamReadMessage.IStartPartitionSessionRequest; // & Required>; -export type ReadStreamStartPartitionSessionResult = Ydb.Topic.StreamReadMessage.StartPartitionSessionResponse; +export type ReadStreamStartPartitionSessionResult = Ydb.Topic.StreamReadMessage.IStartPartitionSessionResponse; // & Required>; -export type ReadStreamStopPartitionSessionArgs = Ydb.Topic.StreamReadMessage.StopPartitionSessionRequest; +export type ReadStreamStopPartitionSessionArgs = Ydb.Topic.StreamReadMessage.IStopPartitionSessionRequest; // & Required>; -export type ReadStreamStopPartitionSessionResult = Ydb.Topic.StreamReadMessage.StopPartitionSessionResponse; +export type ReadStreamStopPartitionSessionResult = Ydb.Topic.StreamReadMessage.IStopPartitionSessionResponse; // & Required>; export const STREAM_DESTROYED = 'stream-destroyed'; @@ -73,7 +73,7 @@ export class TopicReadStream { return this._state; } - public readBidiStream?: ClientWritableStream; + public readBidiStream?: ClientDuplexStream; constructor( opts: ReadStreamInitArgs, @@ -81,33 +81,43 @@ export class TopicReadStream { // @ts-ignore private _logger: Logger) { this.readBidiStream = this.topicService.grpcServiceClient! - .makeClientStreamRequest( + .makeBidiStreamRequest( '/Ydb.Topic.V1.TopicService/StreamRead', (v: Ydb.Topic.StreamReadMessage.IFromClient) => Ydb.Topic.StreamReadMessage.FromClient.encode(v).finish() as Buffer, Ydb.Topic.StreamReadMessage.FromServer.decode, - this.topicService.metadata, - (err: any /* ServiceError */, value?: Ydb.Topic.StreamReadMessage.FromServer) => { - try { - if (TransportError.isMember(err)) throw TransportError.convertToYdbError(err); - if (err) throw err; - YdbError.checkStatus(value!) - } catch (err) { - // TODO: Process end of stream - this.events.emit('error', err as Error); - return; - } - - // TODO: Optimize selection - if (value!.readResponse) this.events.emit('readResponse', value!.readResponse! as Ydb.Topic.StreamReadMessage.ReadResponse); - else if (value!.initResponse) { - this._state = TopicWriteStreamState.Active; - this.events.emit('initResponse', value!.initResponse! as Ydb.Topic.StreamReadMessage.InitResponse); - } else if (value!.commitOffsetResponse) this.events.emit('commitOffsetResponse', value!.commitOffsetResponse! as Ydb.Topic.StreamReadMessage.CommitOffsetResponse); - else if (value!.partitionSessionStatusResponse) this.events.emit('partitionSessionStatusResponse', value!.partitionSessionStatusResponse! as Ydb.Topic.StreamReadMessage.PartitionSessionStatusResponse); - else if (value!.startPartitionSessionRequest) this.events.emit('startPartitionSessionRequest', value!.startPartitionSessionRequest! as Ydb.Topic.StreamReadMessage.StartPartitionSessionRequest); - else if (value!.stopPartitionSessionRequest) this.events.emit('stopPartitionSessionRequest', value!.stopPartitionSessionRequest! as Ydb.Topic.StreamReadMessage.StopPartitionSessionRequest); - else if (value!.updateTokenResponse) this.events.emit('updateTokenResponse', value!.updateTokenResponse! as Ydb.Topic.UpdateTokenResponse); - }); + this.topicService.metadata); + this.readBidiStream.on('data', (value) => { + console.info(2000, value) + try { + YdbError.checkStatus(value!) + } catch (err) { + this.events.emit('error', err as Error); + return; + } + if (value!.readResponse) this.events.emit('readResponse', value!.readResponse! as Ydb.Topic.StreamReadMessage.ReadResponse); + else if (value!.initResponse) { + this._state = TopicWriteStreamState.Active; + this.events.emit('initResponse', value!.initResponse! as Ydb.Topic.StreamReadMessage.InitResponse); + } else if (value!.commitOffsetResponse) this.events.emit('commitOffsetResponse', value!.commitOffsetResponse! as Ydb.Topic.StreamReadMessage.CommitOffsetResponse); + else if (value!.partitionSessionStatusResponse) this.events.emit('partitionSessionStatusResponse', value!.partitionSessionStatusResponse! as Ydb.Topic.StreamReadMessage.PartitionSessionStatusResponse); + else if (value!.startPartitionSessionRequest) this.events.emit('startPartitionSessionRequest', value!.startPartitionSessionRequest! as Ydb.Topic.StreamReadMessage.StartPartitionSessionRequest); + else if (value!.stopPartitionSessionRequest) this.events.emit('stopPartitionSessionRequest', value!.stopPartitionSessionRequest! as Ydb.Topic.StreamReadMessage.StopPartitionSessionRequest); + else if (value!.updateTokenResponse) this.events.emit('updateTokenResponse', value!.updateTokenResponse! as Ydb.Topic.UpdateTokenResponse); + }) + this.readBidiStream.on('error', (err) => { + if (TransportError.isMember(err)) err = TransportError.convertToYdbError(err); + this.events.emit('error', err); + }) + // this.writeBidiStream.on('status', (v) => { + // console.info(8200, v); + // }) + // this.writeBidiStream.on('metadata', (v) => { + // console.info(8000, v); + // }) + // this.writeBidiStream.on('finish', (v: any) => { + // console.info(8060, v); + // }) + this.initRequest(opts); }; @@ -182,14 +192,3 @@ export class TopicReadStream { // TODO: Update token when the auth provider returns a new one } - -// const obj = new InternalTopicWrite() as unknown as (TypedEmitter & Omit); -// -// obj.on('writeResponse', (args) => { -// -// }); -// -// obj.on("test", () => { -// -// }) -// diff --git a/src/topic/topic-service.ts b/src/topic/topic-service.ts index c0300274..4723e1f4 100644 --- a/src/topic/topic-service.ts +++ b/src/topic/topic-service.ts @@ -56,13 +56,12 @@ export class TopicService extends AuthenticatedService {} }) => { + writerStream.events.once(STREAM_DESTROYED, (stream: { dispose: () => {} }) => { const index = this.allStreams.findIndex(v => v === stream) if (index >= 0) this.allStreams.splice(index, 1); }); this.allStreams.push(writerStream); // TODO: Is is possible to have multiple streams in a time? I.e. while server errors return writerStream; - } public async openReadStream(opts: ReadStreamInitArgs) { diff --git a/src/topic/topic-write-stream.ts b/src/topic/topic-write-stream.ts index 5ee3506f..80c0bb0d 100644 --- a/src/topic/topic-write-stream.ts +++ b/src/topic/topic-write-stream.ts @@ -1,12 +1,10 @@ import {Logger} from "../logger/simple-logger"; import {Ydb} from "ydb-sdk-proto"; import {TopicService} from "./topic-service"; -import FromClient = Ydb.Topic.StreamWriteMessage.FromClient; -import FromServer = Ydb.Topic.StreamWriteMessage.FromServer; -import {ClientWritableStream/*, ServiceError*/} from "@grpc/grpc-js/build/src/call"; import EventEmitter from "events"; -import {TransportError, YdbError} from "../errors"; import TypedEmitter from "typed-emitter/rxjs"; +import {ClientDuplexStream} from "@grpc/grpc-js/build/src/call"; +import {TransportError, YdbError} from "../errors"; // TODO: Typed events // TODO: Proper stream close/dispose and a reaction on end of stream from server @@ -58,7 +56,7 @@ export const enum TopicWriteStreamState { Closed } -export class TopicWriteStream extends EventEmitter /*implements TypedEmitter*/ { +export class TopicWriteStream { public events = new EventEmitter() as TypedEmitter; private _state: TopicWriteStreamState = TopicWriteStreamState.Init; @@ -66,48 +64,53 @@ export class TopicWriteStream extends EventEmitter /*implements TypedEmitter; + public writeBidiStream?: ClientDuplexStream; constructor( opts: WriteStreamInitArgs, private topicService: TopicService, // @ts-ignore private _logger: Logger) { - super(); this.writeBidiStream = this.topicService.grpcServiceClient! - .makeClientStreamRequest( + .makeBidiStreamRequest( '/Ydb.Topic.V1.TopicService/StreamWrite', - (v: FromClient) => FromClient.encode(v).finish() as Buffer, - FromServer.decode, - this.topicService.metadata, - (err: any /* ServiceError */, value?: FromServer) => { - try { - if (TransportError.isMember(err)) throw TransportError.convertToYdbError(err); - if (err) throw err; - YdbError.checkStatus(value!) - } catch (err) { - // TODO: Process end of stream - this.emit('error', err); - return; - } - if (value!.writeResponse) this.events.emit('writeResponse', value!.writeResponse!); - else if (value!.initResponse) { - this._state = TopicWriteStreamState.Active; - this.events.emit('initResponse', value!.initResponse!); - } else if (value!.updateTokenResponse) this.events.emit('writeResponse', value!.updateTokenResponse!); - - - // end of stream - // close / dispose() - }); - this.initRequest(opts); + (v: Ydb.Topic.StreamWriteMessage.FromClient) => Ydb.Topic.StreamWriteMessage.FromClient.encode(v).finish() as Buffer, + Ydb.Topic.StreamWriteMessage.FromServer.decode, + this.topicService.metadata); + + this.writeBidiStream.on('data', (value) => { + try { + YdbError.checkStatus(value!) + } catch (err) { + this.events.emit('error', err as Error); + return; + } + if (value!.writeResponse) this.events.emit('writeResponse', value!.writeResponse!); + else if (value!.initResponse) { + this._state = TopicWriteStreamState.Active; + this.events.emit('initResponse', value!.initResponse!); + } else if (value!.updateTokenResponse) this.events.emit('writeResponse', value!.updateTokenResponse!); + }) + this.writeBidiStream.on('error', (err) => { + if (TransportError.isMember(err)) err = TransportError.convertToYdbError(err); + this.events.emit('error', err); + }) + // this.writeBidiStream.on('status', (v) => { + // console.info(8200, v); + // }) + // this.writeBidiStream.on('metadata', (v) => { + // console.info(8000, v); + // }) + // this.writeBidiStream.on('finish', (v: any) => { + // console.info(8060, v); + // }) + this.initRequest(opts); // TODO: Think of retry cycle }; private initRequest(opts: WriteStreamInitArgs) { if (!this.writeBidiStream) throw new Error('Stream is not opened') - console.info(6000, Ydb.Topic.StreamWriteMessage.InitRequest.create(opts)) this.writeBidiStream.write( - FromClient.create({ + Ydb.Topic.StreamWriteMessage.FromClient.create({ initRequest: Ydb.Topic.StreamWriteMessage.InitRequest.create(opts), })); } @@ -115,7 +118,7 @@ export class TopicWriteStream extends EventEmitter /*implements TypedEmitter