Skip to content

Commit

Permalink
Merge pull request #412 from ydb-platform/topic
Browse files Browse the repository at this point in the history
Fix topic example
  • Loading branch information
Alexey Zorkaltsev authored Oct 3, 2024
2 parents 6bcbda3 + dd8e24e commit 97202a9
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 80 deletions.
6 changes: 4 additions & 2 deletions examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"scripts": {
"build": "tsc",
"basic-v1": "node build/basic-example-v1/index.js",
"basic-v22": "node build/basic-example-v2/index.js",
"auth:access-token-credentials": "node build/auth/access-token-credentials/index.js",
"auth:anonymous-credentials": "node build/auth/anonymous-credentials/index.js",
"auth:environ": "node build/auth/environ/index.js",
Expand All @@ -17,7 +18,8 @@
"scheme-client": "node build/scheme-client/index.js",
"type-utils": "node build/type-utils/index.js",
"url-shortener": "node build/url-shortener/index.js",
"query-service": "node build/query-service/index.js"
"query-service": "node build/query-service/index.js",
"topic-service": "node build/topic-service/index.js"
},
"dependencies": {
"@yandex-cloud/nodejs-sdk": "^2.4.1",
Expand All @@ -29,7 +31,7 @@
"devDependencies": {
"@types/crc": "^3.4.0",
"@types/express": "~4.17.6",
"@types/node": "10.17.20",
"@types/node": "^12.20.55",
"@types/yargs": "^15.0.9",
"crc": "^3.8.0",
"express": "^4.17.1",
Expand Down
1 change: 0 additions & 1 deletion examples/topic-service-example/README.md

This file was deleted.

70 changes: 0 additions & 70 deletions examples/topic-service-example/index.ts.md

This file was deleted.

80 changes: 80 additions & 0 deletions examples/topic-service/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import {Driver as YDB} from '../../src';
import {AnonymousAuthService} from '../../src/credentials/anonymous-auth-service';
import {SimpleLogger} from "../../src/logger/simple-logger";
import {Ydb} from "ydb-sdk-proto";
import {Context} from "../../src/context";

require('dotenv').config();

const DATABASE = '/local';
const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';

async function main() {
const db = new YDB({
endpoint: ENDPOINT,
database: DATABASE,
authService: new AnonymousAuthService(),
logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}),
});
if (!(await db.ready(3000))) throw new Error('Driver is not ready!');
try {

db.topic;

await db.topic.createTopic({
path: 'demoTopic',
consumers: [{
name: 'demo',
}],
});
const writer = await db.topic.createWriter({
path: 'demoTopic',
// producerId: '...', // will be genereted automatically
// messageGroupId: '...' // will be the same as producerId
getLastSeqNo: true, // seqNo will be assigned automatically
});
await writer.sendMessages({
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.from('Hello, world'),
uncompressedSize: 'Hello, world'.length,
}],
});
const promises = [];
for (let n = 0; n < 4; n++) {
promises.push(writer.sendMessages({
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.from(`Message N${n}`),
uncompressedSize: `Message N${n}`.length,
}],
}));
}
await writer.close();

await Promise.all(promises);
const reader = await db.topic.createReader(Context.createNew({
timeout: 3000,
}).ctx, {
topicsReadSettings: [{
path: 'demoTopic',
}],
consumer: 'demo',
receiveBufferSizeInBytes: 10_000_000,
});
try {
for await (const message of reader.messages) {
console.info(`Message: ${message.data!.toString()}`);
await message.commit();
}
} catch (err) {
if (!Context.isTimeout(err)) throw err;
console.info('Timeout is over!');
}
await reader.close(true); // graceful close() - complete when all messages are commited
} finally {
await db.destroy();
}
}

main();
3 changes: 2 additions & 1 deletion examples/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"forceConsistentCasingInFileNames": true,
"skipLibCheck": true,
"experimentalDecorators": true,
"allowJs": true
"allowJs": true,
"resolveJsonModule": true
},
"include": [
"**/*.ts"
Expand Down
2 changes: 2 additions & 0 deletions src/topic/internal/topic-write-stream-with-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ export class TopicWriteStreamWithEvents {
err = TransportError.convertToYdbError(err as (Error & StatusObject));
this.events.emit('error', err);
}
this.writeBidiStream.end();
});
this.initRequest(ctx, args);
};
Expand Down Expand Up @@ -127,6 +128,7 @@ export class TopicWriteStreamWithEvents {
if (this.reasonForClose) throw new Error('Stream is not open');
this.reasonForClose = error;
this.writeBidiStream!.cancel();
this.writeBidiStream!.end();
}

// TODO: Add [dispose] that calls close()
Expand Down
3 changes: 0 additions & 3 deletions src/topic/topic-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ import {asIdempotentRetryableLambda} from "../retries/asIdempotentRetryableLambd
export class TopicClient extends EventEmitter { // TODO: Reconsider why I need to have EventEmitter in any client
private service?: TopicNodeClient;

// private retrier: RetryStrategy;

constructor(private settings: IClientSettings) {
super();
// this.retrier = new RetryStrategy(new RetryParameters({maxRetries: 0}), this.settings.logger);
}

/**
Expand Down
7 changes: 4 additions & 3 deletions src/topic/topic-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export class TopicReader {
private queue: Message[] = [];
private waitNextResolve?: (value: unknown) => void;
private innerReadStream?: TopicReadStreamWithEvents;
private closePromise?: Promise<void>;

private _messages?: { [Symbol.asyncIterator]: () => AsyncGenerator<Message, void> };

Expand Down Expand Up @@ -123,9 +124,8 @@ export class TopicReader {
logger.trace('%s: new TopicReader', ctx);
if (!(readStreamArgs.receiveBufferSizeInBytes > 0)) throw new Error('receivingBufferSize must be greater than 0');
let onCancelUnsub: CtxUnsubcribe;
if (ctx.onCancel) onCancelUnsub = ctx.onCancel((cause) => {
if (ctx.onCancel) onCancelUnsub = ctx.onCancel((_cause) => {
if (this.reasonForClose) return;
this.reasonForClose = cause;
this.close(ctx, true)
});
// background process of sending and retrying
Expand Down Expand Up @@ -292,12 +292,13 @@ export class TopicReader {
this.queue.length = 0; // drop rest of messages
if (this.waitNextResolve) this.waitNextResolve(undefined);
} else {
return new Promise<void>((resolve) => {
this.closePromise = new Promise<void>((resolve) => {
this.closeResolve = resolve;
});
}
await this.innerReadStream!.close(ctx);
}
return this.closePromise;
}

private async closeInnerStream(ctx: Context) {
Expand Down
1 change: 1 addition & 0 deletions src/topic/topic-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export class TopicWriter {
logger.debug('%s: failed: %o', ctx, err);
this.reasonForClose = err;
this.spreadError(ctx, err);
this.close();
})
.finally(() => {
if (onCancelUnsub) onCancelUnsub();
Expand Down

0 comments on commit 97202a9

Please sign in to comment.