diff --git a/examples/package.json b/examples/package.json index 57560331..f2a618d6 100644 --- a/examples/package.json +++ b/examples/package.json @@ -5,6 +5,7 @@ "scripts": { "build": "tsc", "basic-v1": "node build/basic-example-v1/index.js", + "basic-v22": "node build/basic-example-v2/index.js", "auth:access-token-credentials": "node build/auth/access-token-credentials/index.js", "auth:anonymous-credentials": "node build/auth/anonymous-credentials/index.js", "auth:environ": "node build/auth/environ/index.js", @@ -17,7 +18,8 @@ "scheme-client": "node build/scheme-client/index.js", "type-utils": "node build/type-utils/index.js", "url-shortener": "node build/url-shortener/index.js", - "query-service": "node build/query-service/index.js" + "query-service": "node build/query-service/index.js", + "topic-service": "node build/topic-service/index.js" }, "dependencies": { "@yandex-cloud/nodejs-sdk": "^2.4.1", @@ -29,7 +31,7 @@ "devDependencies": { "@types/crc": "^3.4.0", "@types/express": "~4.17.6", - "@types/node": "10.17.20", + "@types/node": "^12.20.55", "@types/yargs": "^15.0.9", "crc": "^3.8.0", "express": "^4.17.1", diff --git a/examples/topic-service-example/README.md b/examples/topic-service-example/README.md deleted file mode 100644 index c8f4bc82..00000000 --- a/examples/topic-service-example/README.md +++ /dev/null @@ -1 +0,0 @@ -Due to a problem with a reference to json - TEMPORARY example is as md-file diff --git a/examples/topic-service-example/index.ts.md b/examples/topic-service-example/index.ts.md deleted file mode 100644 index c2d1d80b..00000000 --- a/examples/topic-service-example/index.ts.md +++ /dev/null @@ -1,70 +0,0 @@ -import {Driver as YDB} from '../../src'; -import {AnonymousAuthService} from "../../src/credentials/anonymous-auth-service"; -import {Ydb} from "ydb-sdk-proto"; -import {SimpleLogger} from "../../src/logger/simple-logger"; -import {Context} from "../../src/context"; - -require('dotenv').config(); - -const DATABASE = '/local'; -const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136'; - -async function main() { - const db = new YDB({ - endpoint: ENDPOINT, - database: DATABASE, - authService: new AnonymousAuthService(), - logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}), - }); - if (!(await db.ready(3000))) throw new Error('Driver is not ready!'); - await db.topic.createTopic({ - path: 'demoTopic', - consumers: [{ - name: 'demo', - }], - }); - const writer = await db.topic.createWriter({ - path: 'demoTopic', - // producerId: '...', // will be genereted automatically - // messageGroupId: '...' // will be the same as producerId - getLastSeqNo: true, // seqNo will be assigned automatically - }); - await writer.sendMessages({ - codec: Ydb.Topic.Codec.CODEC_RAW, - messages: [{ - data: Buffer.from('Hello, world'), - uncompressedSize: 'Hello, world'.length, - }], - }); - const promises = []; - for (let n = 0; n < 4; n++) { - // ((writer as any).innerWriteStream as TopicWriteStreamWithEvents).close(Context.createNew().ctx, new Error('Fake error')); - - // await sleep(3000); // TODO: - - promises.push(writer.sendMessages({ - codec: Ydb.Topic.Codec.CODEC_RAW, - messages: [{ - data: Buffer.from(`Message N${n}`), - uncompressedSize: `Message N${n}`.length, - }], - })); - } - await Promise.all(promises); - const reader = await db.topic.createReader(Context.createNew({ - timeout: 3000, - }).ctx, { - topicsReadSettings: [{ - path: 'demoTopic', - }], - consumer: 'demo', - receiveBufferSizeInBytes: 10_000_000, - }); - for await (const message of reader.messages) { - console.info(`Message: ${message.data!.toString()}`); - await message.commit(); - } - await reader.close(); // graceful close() - complete when all messages are commited -} - -main(); diff --git a/examples/topic-service/index.ts b/examples/topic-service/index.ts new file mode 100644 index 00000000..ffe058bd --- /dev/null +++ b/examples/topic-service/index.ts @@ -0,0 +1,80 @@ +import {Driver as YDB} from '../../src'; +import {AnonymousAuthService} from '../../src/credentials/anonymous-auth-service'; +import {SimpleLogger} from "../../src/logger/simple-logger"; +import {Ydb} from "ydb-sdk-proto"; +import {Context} from "../../src/context"; + +require('dotenv').config(); + +const DATABASE = '/local'; +const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136'; + +async function main() { + const db = new YDB({ + endpoint: ENDPOINT, + database: DATABASE, + authService: new AnonymousAuthService(), + logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}), + }); + if (!(await db.ready(3000))) throw new Error('Driver is not ready!'); + try { + + db.topic; + + await db.topic.createTopic({ + path: 'demoTopic', + consumers: [{ + name: 'demo', + }], + }); + const writer = await db.topic.createWriter({ + path: 'demoTopic', + // producerId: '...', // will be genereted automatically + // messageGroupId: '...' // will be the same as producerId + getLastSeqNo: true, // seqNo will be assigned automatically + }); + await writer.sendMessages({ + codec: Ydb.Topic.Codec.CODEC_RAW, + messages: [{ + data: Buffer.from('Hello, world'), + uncompressedSize: 'Hello, world'.length, + }], + }); + const promises = []; + for (let n = 0; n < 4; n++) { + promises.push(writer.sendMessages({ + codec: Ydb.Topic.Codec.CODEC_RAW, + messages: [{ + data: Buffer.from(`Message N${n}`), + uncompressedSize: `Message N${n}`.length, + }], + })); + } + await writer.close(); + + await Promise.all(promises); + const reader = await db.topic.createReader(Context.createNew({ + timeout: 3000, + }).ctx, { + topicsReadSettings: [{ + path: 'demoTopic', + }], + consumer: 'demo', + receiveBufferSizeInBytes: 10_000_000, + }); + try { + for await (const message of reader.messages) { + console.info(`Message: ${message.data!.toString()}`); + await message.commit(); + } + } catch (err) { + if (!Context.isTimeout(err)) throw err; + console.info('Timeout is over!'); + } + await reader.close(true); // graceful close() - complete when all messages are commited + } finally { + await db.destroy(); + } +} + +main(); diff --git a/examples/tsconfig.json b/examples/tsconfig.json index 6f261313..080202e8 100644 --- a/examples/tsconfig.json +++ b/examples/tsconfig.json @@ -17,7 +17,8 @@ "forceConsistentCasingInFileNames": true, "skipLibCheck": true, "experimentalDecorators": true, - "allowJs": true + "allowJs": true, + "resolveJsonModule": true }, "include": [ "**/*.ts" diff --git a/src/topic/internal/topic-write-stream-with-events.ts b/src/topic/internal/topic-write-stream-with-events.ts index be330a8f..d1d3f736 100644 --- a/src/topic/internal/topic-write-stream-with-events.ts +++ b/src/topic/internal/topic-write-stream-with-events.ts @@ -86,6 +86,7 @@ export class TopicWriteStreamWithEvents { err = TransportError.convertToYdbError(err as (Error & StatusObject)); this.events.emit('error', err); } + this.writeBidiStream.end(); }); this.initRequest(ctx, args); }; @@ -127,6 +128,7 @@ export class TopicWriteStreamWithEvents { if (this.reasonForClose) throw new Error('Stream is not open'); this.reasonForClose = error; this.writeBidiStream!.cancel(); + this.writeBidiStream!.end(); } // TODO: Add [dispose] that calls close() diff --git a/src/topic/topic-client.ts b/src/topic/topic-client.ts index c7dc17da..eafa9c64 100644 --- a/src/topic/topic-client.ts +++ b/src/topic/topic-client.ts @@ -20,11 +20,8 @@ import {asIdempotentRetryableLambda} from "../retries/asIdempotentRetryableLambd export class TopicClient extends EventEmitter { // TODO: Reconsider why I need to have EventEmitter in any client private service?: TopicNodeClient; - // private retrier: RetryStrategy; - constructor(private settings: IClientSettings) { super(); - // this.retrier = new RetryStrategy(new RetryParameters({maxRetries: 0}), this.settings.logger); } /** diff --git a/src/topic/topic-reader.ts b/src/topic/topic-reader.ts index bcf675d2..ca10cdff 100644 --- a/src/topic/topic-reader.ts +++ b/src/topic/topic-reader.ts @@ -85,6 +85,7 @@ export class TopicReader { private queue: Message[] = []; private waitNextResolve?: (value: unknown) => void; private innerReadStream?: TopicReadStreamWithEvents; + private closePromise?: Promise; private _messages?: { [Symbol.asyncIterator]: () => AsyncGenerator }; @@ -123,9 +124,8 @@ export class TopicReader { logger.trace('%s: new TopicReader', ctx); if (!(readStreamArgs.receiveBufferSizeInBytes > 0)) throw new Error('receivingBufferSize must be greater than 0'); let onCancelUnsub: CtxUnsubcribe; - if (ctx.onCancel) onCancelUnsub = ctx.onCancel((cause) => { + if (ctx.onCancel) onCancelUnsub = ctx.onCancel((_cause) => { if (this.reasonForClose) return; - this.reasonForClose = cause; this.close(ctx, true) }); // background process of sending and retrying @@ -292,12 +292,13 @@ export class TopicReader { this.queue.length = 0; // drop rest of messages if (this.waitNextResolve) this.waitNextResolve(undefined); } else { - return new Promise((resolve) => { + this.closePromise = new Promise((resolve) => { this.closeResolve = resolve; }); } await this.innerReadStream!.close(ctx); } + return this.closePromise; } private async closeInnerStream(ctx: Context) { diff --git a/src/topic/topic-writer.ts b/src/topic/topic-writer.ts index 58fe5981..e38da8d4 100644 --- a/src/topic/topic-writer.ts +++ b/src/topic/topic-writer.ts @@ -79,6 +79,7 @@ export class TopicWriter { logger.debug('%s: failed: %o', ctx, err); this.reasonForClose = err; this.spreadError(ctx, err); + this.close(); }) .finally(() => { if (onCancelUnsub) onCancelUnsub();