diff --git a/examples/topic-service/index.ts b/examples/topic-service/index.ts index d5551f63..5195fc59 100644 --- a/examples/topic-service/index.ts +++ b/examples/topic-service/index.ts @@ -1,24 +1,18 @@ -import {Driver as YDB, getCredentialsFromEnv, Context} from 'ydb-sdk'; +import {Driver as YDB, getCredentialsFromEnv, Context, SimpleLogger} from 'ydb-sdk'; import {Ydb} from "ydb-sdk-proto"; -import {getDefaultLogger} from "../../src/logger/get-default-logger"; -// import {main} from "../utils"; import Codec = Ydb.Topic.Codec; require('dotenv').config(); -const DATABASE = '/local'; -const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136'; - async function run() { - const logger = getDefaultLogger(); + // const logger = getDefaultLogger(); + const logger = new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}); const authService = getCredentialsFromEnv(logger); const db = new YDB({ - endpoint: ENDPOINT, // i.e.: grc(s):// - database: DATABASE, // i.e.: '/local' - authService, logger - // logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}), + connectionString: process.env.YDB_CONNECTION_STRING || 'grpc://localhost:2136?database=/local', + authService, logger, }); - if (!(await db.ready(3000))) throw new Error('Driver is not ready!'); + if (!(await db.ready(30000))) throw new Error('Driver is not ready!'); try { await db.topic.createTopic({ path: 'demoTopic', @@ -43,9 +37,9 @@ async function run() { }, }); - logger.info(await db.topic.describeTopic({ + await db.topic.describeTopic({ path: 'demoTopic', - })); + }); const writer = await db.topic.createWriter({ path: 'demoTopic', diff --git a/src/errors.ts b/src/errors.ts index 1cef9d4b..bf105a37 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -293,9 +293,11 @@ export class TransportError extends YdbError { } catch (error) {} return new Error(`Unexpected transport error code ${e.code}! Error itself: ${errStr}`); } else { - return new ErrCls( + const ydbErr = new ErrCls( `${ErrCls.name} (code ${ErrCls.status}): ${e.name}: ${e.message}. ${e.details}`, ); + ydbErr.stack = e.stack; + return ydbErr; } } } diff --git a/src/index.ts b/src/index.ts index 7d8ca493..1112b559 100644 --- a/src/index.ts +++ b/src/index.ts @@ -86,5 +86,7 @@ export {RemoveDirectorySettings} from "./schema/scheme-service"; export {MakeDirectorySettings} from "./schema/scheme-service"; export {ParsedConnectionString, parseConnectionString} from "./utils/parse-connection-string"; - export {QueryClient, ResultSet, RowType} from "./query"; + +export {SimpleLogger} from "./logger/simple-logger"; +export {getDefaultLogger} from "./logger/get-default-logger"; diff --git a/src/topic/internal/internal-topic-client.ts b/src/topic/internal/internal-topic-client.ts index 9dc56af8..a474c478 100644 --- a/src/topic/internal/internal-topic-client.ts +++ b/src/topic/internal/internal-topic-client.ts @@ -64,6 +64,7 @@ export class InternalTopicClient extends AuthenticatedService 0) { destroyPromise = new Promise((resolve) => { @@ -78,13 +79,15 @@ export class InternalTopicClient extends AuthenticatedService) { + this.logger.trace('%s: InternalTopicClient.openWriteStreamWithEvents()', ctx); if (args.producerId === undefined || args.producerId === null) { const newGUID = uuid_v4(); args = {...args, producerId: newGUID, messageGroupId: newGUID} } else if (args.messageGroupId === undefined || args.messageGroupId === null) { args = {...args, messageGroupId: args.producerId}; } - const writerStream = new InternalTopicWriteStream(ctx, args, this, this.logger); + const writerStream = new InternalTopicWriteStream(ctx, this, this.logger); + await writerStream.init(ctx, args); writerStream.events.once('end', () => { const index = this.allStreams.findIndex(v => v === writerStream) if (index >= 0) this.allStreams.splice(index, 1); @@ -95,7 +98,9 @@ export class InternalTopicClient extends AuthenticatedService { const index = this.allStreams.findIndex(v => v === readStream) if (index >= 0) this.allStreams.splice(index, 1); @@ -105,31 +110,38 @@ export class InternalTopicClient extends AuthenticatedService void, } -export const enum TopicWriteStreamState { - Init, - Active, - Closing, - Closed -} - export class InternalTopicReadStream { public events = new EventEmitter() as TypedEmitter; @@ -61,12 +54,15 @@ export class InternalTopicReadStream { constructor( ctx: Context, - args: InternalReadStreamInitArgs, private topicService: InternalTopicClient, // @ts-ignore public readonly logger: Logger) { this.logger.trace('%s: new TopicReadStreamWithEvents()', ctx); - this.topicService.updateMetadata(); + }; + + public async init(ctx: Context, args: InternalReadStreamInitArgs) { + this.logger.trace('%s: InternalTopicReadStream.init()', ctx); + await this.topicService.updateMetadata(); this.readBidiStream = this.topicService.grpcServiceClient! .makeBidiStreamRequest( '/Ydb.Topic.V1.TopicService/StreamRead', @@ -111,9 +107,10 @@ export class InternalTopicReadStream { } }) this.initRequest(ctx, args); - }; - private initRequest(ctx: Context, args: InternalReadStreamInitArgs) { + } + + public initRequest(ctx: Context, args: InternalReadStreamInitArgs) { this.logger.trace('%s: TopicReadStreamWithEvents.initRequest()', ctx); this.readBidiStream!.write( Ydb.Topic.StreamReadMessage.create({ diff --git a/src/topic/internal/internal-topic-write-stream.ts b/src/topic/internal/internal-topic-write-stream.ts index a78ab8bf..1a4ea109 100644 --- a/src/topic/internal/internal-topic-write-stream.ts +++ b/src/topic/internal/internal-topic-write-stream.ts @@ -38,18 +38,24 @@ export type WriteStreamEvents = { export class InternalTopicWriteStream { private reasonForClose?: Error; - private writeBidiStream: ClientDuplexStream; + private writeBidiStream?: ClientDuplexStream; public readonly events = new EventEmitter() as TypedEmitter; constructor( ctx: Context, - args: InternalWriteStreamInitArgs, private topicService: InternalTopicClient, // @ts-ignore private logger: Logger) { - this.logger.trace('%s: new TopicWriteStreamWithEvents|()', ctx); - this.topicService.updateMetadata(); + this.logger.trace('%s: new TopicWriteStreamWithEvents()', ctx); + }; + + public async init( + ctx: Context, + args: InternalWriteStreamInitArgs + ) { + this.logger.trace('%s: TopicWriteStreamWithEvents.init()', ctx); + await this.topicService.updateMetadata(); this.writeBidiStream = this.topicService.grpcServiceClient! .makeBidiStreamRequest( '/Ydb.Topic.V1.TopicService/StreamWrite', @@ -86,10 +92,10 @@ export class InternalTopicWriteStream { err = TransportError.convertToYdbError(err as (Error & StatusObject)); this.events.emit('error', err); } - this.writeBidiStream.end(); + this.writeBidiStream!.end(); }); this.initRequest(ctx, args); - }; + } private initRequest(ctx: Context, args: InternalWriteStreamInitArgs) { this.logger.trace('%s: TopicWriteStreamWithEvents.initRequest()', ctx); @@ -107,7 +113,7 @@ export class InternalTopicWriteStream { this.logger.trace('%s: TopicWriteStreamWithEvents.writeRequest()', ctx); if (this.reasonForClose) throw new Error('Stream is not open'); await this.updateToken(ctx); - this.writeBidiStream.write( + this.writeBidiStream!.write( Ydb.Topic.StreamWriteMessage.FromClient.create({ writeRequest: Ydb.Topic.StreamWriteMessage.WriteRequest.create(args), })); @@ -117,7 +123,7 @@ export class InternalTopicWriteStream { this.logger.trace('%s: TopicWriteStreamWithEvents.updateTokenRequest()', ctx); if (this.reasonForClose) throw new Error('Stream is not open'); await this.updateToken(ctx); - this.writeBidiStream.write( + this.writeBidiStream!.write( Ydb.Topic.StreamWriteMessage.FromClient.create({ updateTokenRequest: Ydb.Topic.UpdateTokenRequest.create(args), })); diff --git a/src/topic/topic-client.ts b/src/topic/topic-client.ts index 040aa7d8..52139aac 100644 --- a/src/topic/topic-client.ts +++ b/src/topic/topic-client.ts @@ -6,7 +6,7 @@ import {asIdempotentRetryableLambda} from "../retries/asIdempotentRetryableLambd import {google, Ydb} from "ydb-sdk-proto"; import {InternalTopicClient} from "./internal/internal-topic-client"; -// TODO: Consider support for "operationParams?: (Ydb.Operations.IOperationParams|null);". It presents in eve+ry jdbc operation +// TODO: Consider support for "operationParams?: (Ydb.Operations.IOperationParams|null);". It presents in every jdbc operation export type ICreateWriterArgs = { path: string; @@ -184,7 +184,7 @@ export class TopicClient { return new TopicReader(ctx, args, this.settings.retrier, this.settings.discoveryService, this.settings.logger); } - // TODO: Add commit a queue - same as in writer, to confirm commits + // TODO: Add commit queue - same as in writer, to confirm commits // @ts-ignore public commitOffset(request: ICommitOffsetArgs): Promise; diff --git a/src/topic/topic-reader.ts b/src/topic/topic-reader.ts index e9e021b6..522c9cbd 100644 --- a/src/topic/topic-reader.ts +++ b/src/topic/topic-reader.ts @@ -157,8 +157,7 @@ export class TopicReader { private async initInnerStream(ctx: Context) { this.logger.trace('%s: TopicReader.initInnerStream()', ctx); - this.innerReadStream = new InternalTopicReadStream(ctx, this.readStreamArgs, await this.discovery.getTopicNodeClient(), this.logger); - + this.innerReadStream = await (await this.discovery.getTopicNodeClient()).openReadStreamWithEvents(ctx, this.readStreamArgs); // this.innerReadStream.events.on('initResponse', async (resp) => { // try { // // TODO: Impl diff --git a/src/topic/topic-writer.ts b/src/topic/topic-writer.ts index 249d9b65..c40c970c 100644 --- a/src/topic/topic-writer.ts +++ b/src/topic/topic-writer.ts @@ -107,7 +107,7 @@ export class TopicWriter { delete this.writeStreamArgs.getLastSeqNo; } delete this.firstInnerStreamInitResp; - const stream = new InternalTopicWriteStream(ctx, this.writeStreamArgs, await this.discovery.getTopicNodeClient(), this.logger); + const stream = await (await this.discovery.getTopicNodeClient()).openWriteStreamWithEvents(ctx, this.writeStreamArgs); stream.events.on('initResponse', (resp) => { this.logger.trace('%s: TopicWriter.on "initResponse"', ctx); try {