Skip to content

Commit

Permalink
Merge pull request #421 from ydb-platform/topic
Browse files Browse the repository at this point in the history
Topic
  • Loading branch information
Alexey Zorkaltsev authored Oct 22, 2024
2 parents 14597f5 + dc64f5b commit ca9d15d
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 49 deletions.
22 changes: 8 additions & 14 deletions examples/topic-service/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
import {Driver as YDB, getCredentialsFromEnv, Context} from 'ydb-sdk';
import {Driver as YDB, getCredentialsFromEnv, Context, SimpleLogger} from 'ydb-sdk';
import {Ydb} from "ydb-sdk-proto";
import {getDefaultLogger} from "../../src/logger/get-default-logger";
// import {main} from "../utils";
import Codec = Ydb.Topic.Codec;

require('dotenv').config();

const DATABASE = '/local';
const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';

async function run() {
const logger = getDefaultLogger();
// const logger = getDefaultLogger();
const logger = new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'});
const authService = getCredentialsFromEnv(logger);
const db = new YDB({
endpoint: ENDPOINT, // i.e.: grc(s)://<x.x.x.x>
database: DATABASE, // i.e.: '/local'
authService, logger
// logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}),
connectionString: process.env.YDB_CONNECTION_STRING || 'grpc://localhost:2136?database=/local',
authService, logger,
});
if (!(await db.ready(3000))) throw new Error('Driver is not ready!');
if (!(await db.ready(30000))) throw new Error('Driver is not ready!');
try {
await db.topic.createTopic({
path: 'demoTopic',
Expand All @@ -43,9 +37,9 @@ async function run() {
},
});

logger.info(await db.topic.describeTopic({
await db.topic.describeTopic({
path: 'demoTopic',
}));
});

const writer = await db.topic.createWriter({
path: 'demoTopic',
Expand Down
4 changes: 3 additions & 1 deletion src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,11 @@ export class TransportError extends YdbError {
} catch (error) {}
return new Error(`Unexpected transport error code ${e.code}! Error itself: ${errStr}`);
} else {
return new ErrCls(
const ydbErr = new ErrCls(
`${ErrCls.name} (code ${ErrCls.status}): ${e.name}: ${e.message}. ${e.details}`,
);
ydbErr.stack = e.stack;
return ydbErr;
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,7 @@ export {RemoveDirectorySettings} from "./schema/scheme-service";
export {MakeDirectorySettings} from "./schema/scheme-service";

export {ParsedConnectionString, parseConnectionString} from "./utils/parse-connection-string";

export {QueryClient, ResultSet, RowType} from "./query";

export {SimpleLogger} from "./logger/simple-logger";
export {getDefaultLogger} from "./logger/get-default-logger";
30 changes: 21 additions & 9 deletions src/topic/internal/internal-topic-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export class InternalTopicClient extends AuthenticatedService<Ydb.Topic.V1.Topic
// @ts-ignore
public destroy();
public /*async*/ destroy(ctx: Context) {
this.logger.trace('%s: InternalTopicClient.destroy()', ctx);
let destroyPromise;
if (this.allStreams.length > 0) {
destroyPromise = new Promise((resolve) => {
Expand All @@ -78,13 +79,15 @@ export class InternalTopicClient extends AuthenticatedService<Ydb.Topic.V1.Topic
}

public async openWriteStreamWithEvents(ctx: Context, args: InternalWriteStreamInitArgs & Pick<Ydb.Topic.StreamWriteMessage.IInitRequest, 'messageGroupId'>) {
this.logger.trace('%s: InternalTopicClient.openWriteStreamWithEvents()', ctx);
if (args.producerId === undefined || args.producerId === null) {
const newGUID = uuid_v4();
args = {...args, producerId: newGUID, messageGroupId: newGUID}
} else if (args.messageGroupId === undefined || args.messageGroupId === null) {
args = {...args, messageGroupId: args.producerId};
}
const writerStream = new InternalTopicWriteStream(ctx, args, this, this.logger);
const writerStream = new InternalTopicWriteStream(ctx, this, this.logger);
await writerStream.init(ctx, args);
writerStream.events.once('end', () => {
const index = this.allStreams.findIndex(v => v === writerStream)
if (index >= 0) this.allStreams.splice(index, 1);
Expand All @@ -95,7 +98,9 @@ export class InternalTopicClient extends AuthenticatedService<Ydb.Topic.V1.Topic
}

public async openReadStreamWithEvents(ctx: Context, args: InternalReadStreamInitArgs) {
const readStream = new InternalTopicReadStream(ctx, args, this, this.logger);
this.logger.trace('%s: InternalTopicClient.openReadStreamWithEvents()', ctx);
const readStream = new InternalTopicReadStream(ctx, this, this.logger);
await readStream.init(ctx, args);
readStream.events.once('end', () => {
const index = this.allStreams.findIndex(v => v === readStream)
if (index >= 0) this.allStreams.splice(index, 1);
Expand All @@ -105,31 +110,38 @@ export class InternalTopicClient extends AuthenticatedService<Ydb.Topic.V1.Topic
return readStream;
}

public async commitOffset(_ctx: Context, request: InternalCommitOffsetArgs) {
public async commitOffset(ctx: Context, request: InternalCommitOffsetArgs) {
this.logger.trace('%s: InternalTopicClient.commitOffset()', ctx);
return (await this.api.commitOffset(request)) as InternalCommitOffsetResult;
}

public async updateOffsetsInTransaction(_ctx: Context, request: InternalUpdateOffsetsInTransactionArgs) {
public async updateOffsetsInTransaction(ctx: Context, request: InternalUpdateOffsetsInTransactionArgs) {
this.logger.trace('%s: InternalTopicClient.updateOffsetsInTransaction()', ctx);
return (await this.api.updateOffsetsInTransaction(request)) as InternalUpdateOffsetsInTransactionResult;
}

public async createTopic(_ctx: Context, request: InternalCreateTopicArgs) {
public async createTopic(ctx: Context, request: InternalCreateTopicArgs) {
this.logger.trace('%s: InternalTopicClient.createTopic()', ctx);
return (await this.api.createTopic(request)) as InternalCreateTopicResult;
}

public async describeTopic(_ctx: Context, request: InternalDescribeTopicArgs) {
public async describeTopic(ctx: Context, request: InternalDescribeTopicArgs) {
this.logger.trace('%s: InternalTopicClient.describeTopic()', ctx);
return (await this.api.describeTopic(request)) as InternalDescribeTopicResult;
}

public async describeConsumer(_ctx: Context, request: InternalDescribeConsumerArgs) {
public async describeConsumer(ctx: Context, request: InternalDescribeConsumerArgs) {
this.logger.trace('%s: InternalTopicClient.describeConsumer()', ctx);
return (await this.api.describeConsumer(request)) as InternalDescribeConsumerResult;
}

public async alterTopic(_ctx: Context, request: InternalAlterTopicArgs) {
public async alterTopic(ctx: Context, request: InternalAlterTopicArgs) {
this.logger.trace('%s: InternalTopicClient.alterTopic()', ctx);
return (await this.api.alterTopic(request)) as InternalAlterTopicResult;
}

public async dropTopic(_ctx: Context, request: InternalDropTopicArgs) {
public async dropTopic(ctx: Context, request: InternalDropTopicArgs) {
this.logger.trace('%s: InternalTopicClient.dropTopic()', ctx);
return (await this.api.dropTopic(request)) as InternalDropTopicResult;
}
}
19 changes: 8 additions & 11 deletions src/topic/internal/internal-topic-read-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,6 @@ export type ReadStreamEvents = {
end: (cause: Error) => void,
}

export const enum TopicWriteStreamState {
Init,
Active,
Closing,
Closed
}

export class InternalTopicReadStream {
public events = new EventEmitter() as TypedEmitter<ReadStreamEvents>;

Expand All @@ -61,12 +54,15 @@ export class InternalTopicReadStream {

constructor(
ctx: Context,
args: InternalReadStreamInitArgs,
private topicService: InternalTopicClient,
// @ts-ignore
public readonly logger: Logger) {
this.logger.trace('%s: new TopicReadStreamWithEvents()', ctx);
this.topicService.updateMetadata();
};

public async init(ctx: Context, args: InternalReadStreamInitArgs) {
this.logger.trace('%s: InternalTopicReadStream.init()', ctx);
await this.topicService.updateMetadata();
this.readBidiStream = this.topicService.grpcServiceClient!
.makeBidiStreamRequest<Ydb.Topic.StreamReadMessage.FromClient, Ydb.Topic.StreamReadMessage.FromServer>(
'/Ydb.Topic.V1.TopicService/StreamRead',
Expand Down Expand Up @@ -111,9 +107,10 @@ export class InternalTopicReadStream {
}
})
this.initRequest(ctx, args);
};

private initRequest(ctx: Context, args: InternalReadStreamInitArgs) {
}

public initRequest(ctx: Context, args: InternalReadStreamInitArgs) {
this.logger.trace('%s: TopicReadStreamWithEvents.initRequest()', ctx);
this.readBidiStream!.write(
Ydb.Topic.StreamReadMessage.create({
Expand Down
22 changes: 14 additions & 8 deletions src/topic/internal/internal-topic-write-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,24 @@ export type WriteStreamEvents = {

export class InternalTopicWriteStream {
private reasonForClose?: Error;
private writeBidiStream: ClientDuplexStream<Ydb.Topic.StreamWriteMessage.FromClient, Ydb.Topic.StreamWriteMessage.FromServer>;
private writeBidiStream?: ClientDuplexStream<Ydb.Topic.StreamWriteMessage.FromClient, Ydb.Topic.StreamWriteMessage.FromServer>;

public readonly events = new EventEmitter() as TypedEmitter<WriteStreamEvents>;

constructor(
ctx: Context,
args: InternalWriteStreamInitArgs,
private topicService: InternalTopicClient,
// @ts-ignore
private logger: Logger) {
this.logger.trace('%s: new TopicWriteStreamWithEvents|()', ctx);
this.topicService.updateMetadata();
this.logger.trace('%s: new TopicWriteStreamWithEvents()', ctx);
};

public async init(
ctx: Context,
args: InternalWriteStreamInitArgs
) {
this.logger.trace('%s: TopicWriteStreamWithEvents.init()', ctx);
await this.topicService.updateMetadata();
this.writeBidiStream = this.topicService.grpcServiceClient!
.makeBidiStreamRequest<Ydb.Topic.StreamWriteMessage.FromClient, Ydb.Topic.StreamWriteMessage.FromServer>(
'/Ydb.Topic.V1.TopicService/StreamWrite',
Expand Down Expand Up @@ -86,10 +92,10 @@ export class InternalTopicWriteStream {
err = TransportError.convertToYdbError(err as (Error & StatusObject));
this.events.emit('error', err);
}
this.writeBidiStream.end();
this.writeBidiStream!.end();
});
this.initRequest(ctx, args);
};
}

private initRequest(ctx: Context, args: InternalWriteStreamInitArgs) {
this.logger.trace('%s: TopicWriteStreamWithEvents.initRequest()', ctx);
Expand All @@ -107,7 +113,7 @@ export class InternalTopicWriteStream {
this.logger.trace('%s: TopicWriteStreamWithEvents.writeRequest()', ctx);
if (this.reasonForClose) throw new Error('Stream is not open');
await this.updateToken(ctx);
this.writeBidiStream.write(
this.writeBidiStream!.write(
Ydb.Topic.StreamWriteMessage.FromClient.create({
writeRequest: Ydb.Topic.StreamWriteMessage.WriteRequest.create(args),
}));
Expand All @@ -117,7 +123,7 @@ export class InternalTopicWriteStream {
this.logger.trace('%s: TopicWriteStreamWithEvents.updateTokenRequest()', ctx);
if (this.reasonForClose) throw new Error('Stream is not open');
await this.updateToken(ctx);
this.writeBidiStream.write(
this.writeBidiStream!.write(
Ydb.Topic.StreamWriteMessage.FromClient.create({
updateTokenRequest: Ydb.Topic.UpdateTokenRequest.create(args),
}));
Expand Down
4 changes: 2 additions & 2 deletions src/topic/topic-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {asIdempotentRetryableLambda} from "../retries/asIdempotentRetryableLambd
import {google, Ydb} from "ydb-sdk-proto";
import {InternalTopicClient} from "./internal/internal-topic-client";

// TODO: Consider support for "operationParams?: (Ydb.Operations.IOperationParams|null);". It presents in eve+ry jdbc operation
// TODO: Consider support for "operationParams?: (Ydb.Operations.IOperationParams|null);". It presents in every jdbc operation

export type ICreateWriterArgs = {
path: string;
Expand Down Expand Up @@ -184,7 +184,7 @@ export class TopicClient {
return new TopicReader(ctx, args, this.settings.retrier, this.settings.discoveryService, this.settings.logger);
}

// TODO: Add commit a queue - same as in writer, to confirm commits
// TODO: Add commit queue - same as in writer, to confirm commits

// @ts-ignore
public commitOffset(request: ICommitOffsetArgs): Promise<IOperationResult>;
Expand Down
3 changes: 1 addition & 2 deletions src/topic/topic-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ export class TopicReader {

private async initInnerStream(ctx: Context) {
this.logger.trace('%s: TopicReader.initInnerStream()', ctx);
this.innerReadStream = new InternalTopicReadStream(ctx, this.readStreamArgs, await this.discovery.getTopicNodeClient(), this.logger);

this.innerReadStream = await (await this.discovery.getTopicNodeClient()).openReadStreamWithEvents(ctx, this.readStreamArgs);
// this.innerReadStream.events.on('initResponse', async (resp) => {
// try {
// // TODO: Impl
Expand Down
2 changes: 1 addition & 1 deletion src/topic/topic-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export class TopicWriter {
delete this.writeStreamArgs.getLastSeqNo;
}
delete this.firstInnerStreamInitResp;
const stream = new InternalTopicWriteStream(ctx, this.writeStreamArgs, await this.discovery.getTopicNodeClient(), this.logger);
const stream = await (await this.discovery.getTopicNodeClient()).openWriteStreamWithEvents(ctx, this.writeStreamArgs);
stream.events.on('initResponse', (resp) => {
this.logger.trace('%s: TopicWriter.on "initResponse"', ctx);
try {
Expand Down

0 comments on commit ca9d15d

Please sign in to comment.