From 9b2a6db0e4fca61be3fe892d7c2a80cc9b132d99 Mon Sep 17 00:00:00 2001 From: Alexey Zorkaltsev Date: Tue, 1 Oct 2024 15:39:31 +0300 Subject: [PATCH 1/3] chore: wip --- examples/package.json | 8 +++++--- examples/topic-service-example/README.md | 1 - .../index.ts.md => topic-service/index.ts} | 15 ++++++++------- examples/tsconfig.json | 3 ++- 4 files changed, 15 insertions(+), 12 deletions(-) delete mode 100644 examples/topic-service-example/README.md rename examples/{topic-service-example/index.ts.md => topic-service/index.ts} (86%) diff --git a/examples/package.json b/examples/package.json index 57560331..c858b538 100644 --- a/examples/package.json +++ b/examples/package.json @@ -3,8 +3,9 @@ "version": "0.0.1", "private": true, "scripts": { - "build": "tsc", + "build": "tsc ./**/*.ts", "basic-v1": "node build/basic-example-v1/index.js", + "basic-v22": "node build/basic-example-v2/index.js", "auth:access-token-credentials": "node build/auth/access-token-credentials/index.js", "auth:anonymous-credentials": "node build/auth/anonymous-credentials/index.js", "auth:environ": "node build/auth/environ/index.js", @@ -17,7 +18,8 @@ "scheme-client": "node build/scheme-client/index.js", "type-utils": "node build/type-utils/index.js", "url-shortener": "node build/url-shortener/index.js", - "query-service": "node build/query-service/index.js" + "query-service": "node build/query-service/index.js", + "topic-service": "node build/topic-service/index.js" }, "dependencies": { "@yandex-cloud/nodejs-sdk": "^2.4.1", @@ -29,7 +31,7 @@ "devDependencies": { "@types/crc": "^3.4.0", "@types/express": "~4.17.6", - "@types/node": "10.17.20", + "@types/node": "^12.20.55", "@types/yargs": "^15.0.9", "crc": "^3.8.0", "express": "^4.17.1", diff --git a/examples/topic-service-example/README.md b/examples/topic-service-example/README.md deleted file mode 100644 index c8f4bc82..00000000 --- a/examples/topic-service-example/README.md +++ /dev/null @@ -1 +0,0 @@ -Due to a problem with a reference to json - TEMPORARY example is as md-file diff --git a/examples/topic-service-example/index.ts.md b/examples/topic-service/index.ts similarity index 86% rename from examples/topic-service-example/index.ts.md rename to examples/topic-service/index.ts index c2d1d80b..29e7ce51 100644 --- a/examples/topic-service-example/index.ts.md +++ b/examples/topic-service/index.ts @@ -38,10 +38,6 @@ async function main() { }); const promises = []; for (let n = 0; n < 4; n++) { - // ((writer as any).innerWriteStream as TopicWriteStreamWithEvents).close(Context.createNew().ctx, new Error('Fake error')); - - // await sleep(3000); // TODO: - promises.push(writer.sendMessages({ codec: Ydb.Topic.Codec.CODEC_RAW, messages: [{ @@ -60,9 +56,14 @@ async function main() { consumer: 'demo', receiveBufferSizeInBytes: 10_000_000, }); - for await (const message of reader.messages) { - console.info(`Message: ${message.data!.toString()}`); - await message.commit(); + try { + for await (const message of reader.messages) { + console.info(`Message: ${message.data!.toString()}`); + await message.commit(); + } + } catch (err) { + if (!Context.isTimeout(err)) throw err; + console.info('Timeout is over!'); } await reader.close(); // graceful close() - complete when all messages are commited } diff --git a/examples/tsconfig.json b/examples/tsconfig.json index 6f261313..080202e8 100644 --- a/examples/tsconfig.json +++ b/examples/tsconfig.json @@ -17,7 +17,8 @@ "forceConsistentCasingInFileNames": true, "skipLibCheck": true, "experimentalDecorators": true, - "allowJs": true + "allowJs": true, + "resolveJsonModule": true }, "include": [ "**/*.ts" From 22554b317cb071aba806344bf01aa3c7a8a6a2dc Mon Sep 17 00:00:00 2001 From: Alexey Zorkaltsev Date: Thu, 3 Oct 2024 12:58:20 +0300 Subject: [PATCH 2/3] chore: fix topic example --- examples/topic-service/index.ts | 101 ++++++++++-------- examples/topic-service/s.ts | 53 +++++++++ .../topic-write-stream-with-events.ts | 2 + src/topic/topic-client.ts | 3 - src/topic/topic-reader.ts | 7 +- src/topic/topic-writer.ts | 1 + 6 files changed, 115 insertions(+), 52 deletions(-) create mode 100644 examples/topic-service/s.ts diff --git a/examples/topic-service/index.ts b/examples/topic-service/index.ts index 29e7ce51..ffe058bd 100644 --- a/examples/topic-service/index.ts +++ b/examples/topic-service/index.ts @@ -1,7 +1,7 @@ import {Driver as YDB} from '../../src'; -import {AnonymousAuthService} from "../../src/credentials/anonymous-auth-service"; -import {Ydb} from "ydb-sdk-proto"; +import {AnonymousAuthService} from '../../src/credentials/anonymous-auth-service'; import {SimpleLogger} from "../../src/logger/simple-logger"; +import {Ydb} from "ydb-sdk-proto"; import {Context} from "../../src/context"; require('dotenv').config(); @@ -17,55 +17,64 @@ async function main() { logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}), }); if (!(await db.ready(3000))) throw new Error('Driver is not ready!'); - await db.topic.createTopic({ - path: 'demoTopic', - consumers: [{ - name: 'demo', - }], - }); - const writer = await db.topic.createWriter({ - path: 'demoTopic', - // producerId: '...', // will be genereted automatically - // messageGroupId: '...' // will be the same as producerId - getLastSeqNo: true, // seqNo will be assigned automatically - }); - await writer.sendMessages({ - codec: Ydb.Topic.Codec.CODEC_RAW, - messages: [{ - data: Buffer.from('Hello, world'), - uncompressedSize: 'Hello, world'.length, - }], - }); - const promises = []; - for (let n = 0; n < 4; n++) { - promises.push(writer.sendMessages({ + try { + + db.topic; + + await db.topic.createTopic({ + path: 'demoTopic', + consumers: [{ + name: 'demo', + }], + }); + const writer = await db.topic.createWriter({ + path: 'demoTopic', + // producerId: '...', // will be genereted automatically + // messageGroupId: '...' // will be the same as producerId + getLastSeqNo: true, // seqNo will be assigned automatically + }); + await writer.sendMessages({ codec: Ydb.Topic.Codec.CODEC_RAW, messages: [{ - data: Buffer.from(`Message N${n}`), - uncompressedSize: `Message N${n}`.length, + data: Buffer.from('Hello, world'), + uncompressedSize: 'Hello, world'.length, }], - })); - } - await Promise.all(promises); - const reader = await db.topic.createReader(Context.createNew({ - timeout: 3000, - }).ctx, { - topicsReadSettings: [{ - path: 'demoTopic', - }], - consumer: 'demo', - receiveBufferSizeInBytes: 10_000_000, - }); - try { - for await (const message of reader.messages) { - console.info(`Message: ${message.data!.toString()}`); - await message.commit(); + }); + const promises = []; + for (let n = 0; n < 4; n++) { + promises.push(writer.sendMessages({ + codec: Ydb.Topic.Codec.CODEC_RAW, + messages: [{ + data: Buffer.from(`Message N${n}`), + uncompressedSize: `Message N${n}`.length, + }], + })); + } + await writer.close(); + + await Promise.all(promises); + const reader = await db.topic.createReader(Context.createNew({ + timeout: 3000, + }).ctx, { + topicsReadSettings: [{ + path: 'demoTopic', + }], + consumer: 'demo', + receiveBufferSizeInBytes: 10_000_000, + }); + try { + for await (const message of reader.messages) { + console.info(`Message: ${message.data!.toString()}`); + await message.commit(); + } + } catch (err) { + if (!Context.isTimeout(err)) throw err; + console.info('Timeout is over!'); } - } catch (err) { - if (!Context.isTimeout(err)) throw err; - console.info('Timeout is over!'); + await reader.close(true); // graceful close() - complete when all messages are commited + } finally { + await db.destroy(); } - await reader.close(); // graceful close() - complete when all messages are commited } main(); diff --git a/examples/topic-service/s.ts b/examples/topic-service/s.ts new file mode 100644 index 00000000..bdb37255 --- /dev/null +++ b/examples/topic-service/s.ts @@ -0,0 +1,53 @@ +import {Driver as YDB} from '../../src'; +import {AnonymousAuthService} from '../../src/credentials/anonymous-auth-service'; +import {SimpleLogger} from "../../src/logger/simple-logger"; +import {Context} from "../../src/context"; +import {TopicWriteStreamWithEvents} from "../../src/topic/internal/topic-write-stream-with-events"; +import {sleep} from "../../src/utils"; +// import {Ydb} from "ydb-sdk-proto"; +// import {Context} from "../../src/context"; + +require('dotenv').config(); + +const DATABASE = '/local'; +const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136'; + +async function main() { + const db = new YDB({ + endpoint: ENDPOINT, + database: DATABASE, + authService: new AnonymousAuthService(), + logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}), + }); + if (!(await db.ready(3000))) throw new Error('Driver is not ready!'); + try { + + db.topic; + + await db.topic.createTopic({ + path: 'demoTopic', + consumers: [{ + name: 'demo', + }], + }); + + const stream = new TopicWriteStreamWithEvents(Context.createNew().ctx, { + path: 'demoTopic', + // producerId: '...', // will be genereted automatically + // messageGroupId: '...' // will be the same as producerId + getLastSeqNo: true, // seqNo will be assigned automatically + }, await (db as any).discoveryService.getTopicNodeClient(), (db as any).logger); + + stream.events.on('initResponse', (_resp) => { + (db as any).logger.trace('TopicWriter.on "initResponse"'); + }); + + stream.close(Context.createNew().ctx); + + await sleep(3000); + } finally { + // await db.destroy(); + } +} + +main(); diff --git a/src/topic/internal/topic-write-stream-with-events.ts b/src/topic/internal/topic-write-stream-with-events.ts index be330a8f..d1d3f736 100644 --- a/src/topic/internal/topic-write-stream-with-events.ts +++ b/src/topic/internal/topic-write-stream-with-events.ts @@ -86,6 +86,7 @@ export class TopicWriteStreamWithEvents { err = TransportError.convertToYdbError(err as (Error & StatusObject)); this.events.emit('error', err); } + this.writeBidiStream.end(); }); this.initRequest(ctx, args); }; @@ -127,6 +128,7 @@ export class TopicWriteStreamWithEvents { if (this.reasonForClose) throw new Error('Stream is not open'); this.reasonForClose = error; this.writeBidiStream!.cancel(); + this.writeBidiStream!.end(); } // TODO: Add [dispose] that calls close() diff --git a/src/topic/topic-client.ts b/src/topic/topic-client.ts index c7dc17da..eafa9c64 100644 --- a/src/topic/topic-client.ts +++ b/src/topic/topic-client.ts @@ -20,11 +20,8 @@ import {asIdempotentRetryableLambda} from "../retries/asIdempotentRetryableLambd export class TopicClient extends EventEmitter { // TODO: Reconsider why I need to have EventEmitter in any client private service?: TopicNodeClient; - // private retrier: RetryStrategy; - constructor(private settings: IClientSettings) { super(); - // this.retrier = new RetryStrategy(new RetryParameters({maxRetries: 0}), this.settings.logger); } /** diff --git a/src/topic/topic-reader.ts b/src/topic/topic-reader.ts index bcf675d2..ca10cdff 100644 --- a/src/topic/topic-reader.ts +++ b/src/topic/topic-reader.ts @@ -85,6 +85,7 @@ export class TopicReader { private queue: Message[] = []; private waitNextResolve?: (value: unknown) => void; private innerReadStream?: TopicReadStreamWithEvents; + private closePromise?: Promise; private _messages?: { [Symbol.asyncIterator]: () => AsyncGenerator }; @@ -123,9 +124,8 @@ export class TopicReader { logger.trace('%s: new TopicReader', ctx); if (!(readStreamArgs.receiveBufferSizeInBytes > 0)) throw new Error('receivingBufferSize must be greater than 0'); let onCancelUnsub: CtxUnsubcribe; - if (ctx.onCancel) onCancelUnsub = ctx.onCancel((cause) => { + if (ctx.onCancel) onCancelUnsub = ctx.onCancel((_cause) => { if (this.reasonForClose) return; - this.reasonForClose = cause; this.close(ctx, true) }); // background process of sending and retrying @@ -292,12 +292,13 @@ export class TopicReader { this.queue.length = 0; // drop rest of messages if (this.waitNextResolve) this.waitNextResolve(undefined); } else { - return new Promise((resolve) => { + this.closePromise = new Promise((resolve) => { this.closeResolve = resolve; }); } await this.innerReadStream!.close(ctx); } + return this.closePromise; } private async closeInnerStream(ctx: Context) { diff --git a/src/topic/topic-writer.ts b/src/topic/topic-writer.ts index 58fe5981..e38da8d4 100644 --- a/src/topic/topic-writer.ts +++ b/src/topic/topic-writer.ts @@ -79,6 +79,7 @@ export class TopicWriter { logger.debug('%s: failed: %o', ctx, err); this.reasonForClose = err; this.spreadError(ctx, err); + this.close(); }) .finally(() => { if (onCancelUnsub) onCancelUnsub(); From 00f2b561f6f2445bdcb52d995f4bad724c64769f Mon Sep 17 00:00:00 2001 From: Alexey Zorkaltsev Date: Thu, 3 Oct 2024 13:15:02 +0300 Subject: [PATCH 3/3] chore: fix CI --- examples/package.json | 2 +- examples/topic-service/s.ts | 53 ------------------------------------- 2 files changed, 1 insertion(+), 54 deletions(-) delete mode 100644 examples/topic-service/s.ts diff --git a/examples/package.json b/examples/package.json index c858b538..f2a618d6 100644 --- a/examples/package.json +++ b/examples/package.json @@ -3,7 +3,7 @@ "version": "0.0.1", "private": true, "scripts": { - "build": "tsc ./**/*.ts", + "build": "tsc", "basic-v1": "node build/basic-example-v1/index.js", "basic-v22": "node build/basic-example-v2/index.js", "auth:access-token-credentials": "node build/auth/access-token-credentials/index.js", diff --git a/examples/topic-service/s.ts b/examples/topic-service/s.ts deleted file mode 100644 index bdb37255..00000000 --- a/examples/topic-service/s.ts +++ /dev/null @@ -1,53 +0,0 @@ -import {Driver as YDB} from '../../src'; -import {AnonymousAuthService} from '../../src/credentials/anonymous-auth-service'; -import {SimpleLogger} from "../../src/logger/simple-logger"; -import {Context} from "../../src/context"; -import {TopicWriteStreamWithEvents} from "../../src/topic/internal/topic-write-stream-with-events"; -import {sleep} from "../../src/utils"; -// import {Ydb} from "ydb-sdk-proto"; -// import {Context} from "../../src/context"; - -require('dotenv').config(); - -const DATABASE = '/local'; -const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136'; - -async function main() { - const db = new YDB({ - endpoint: ENDPOINT, - database: DATABASE, - authService: new AnonymousAuthService(), - logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}), - }); - if (!(await db.ready(3000))) throw new Error('Driver is not ready!'); - try { - - db.topic; - - await db.topic.createTopic({ - path: 'demoTopic', - consumers: [{ - name: 'demo', - }], - }); - - const stream = new TopicWriteStreamWithEvents(Context.createNew().ctx, { - path: 'demoTopic', - // producerId: '...', // will be genereted automatically - // messageGroupId: '...' // will be the same as producerId - getLastSeqNo: true, // seqNo will be assigned automatically - }, await (db as any).discoveryService.getTopicNodeClient(), (db as any).logger); - - stream.events.on('initResponse', (_resp) => { - (db as any).logger.trace('TopicWriter.on "initResponse"'); - }); - - stream.close(Context.createNew().ctx); - - await sleep(3000); - } finally { - // await db.destroy(); - } -} - -main();