Skip to content

Commit

Permalink
chore: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Zorkaltsev committed Aug 14, 2024
1 parent b62a6de commit ff4e90e
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 143 deletions.
130 changes: 68 additions & 62 deletions src/__tests__/e2e/topic-service/topic-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,25 @@ describe('Topic: General', () => {
if (topicService) topicService.dispose();
});

it.only('write: simple', async () => {
let waitResolve: any, waitPromise: Promise<any>;

it('general', async () => {
await topicService.createTopic({
path: 'myTopic2',
path: 'myTopic',
});
console.info(`Service created`);

const writer = await topicService.openWriteStream({
path: 'myTopic2',
const writer = await topicService.openWriteStreamWithEvent({
path: 'myTopic',
});
writer.events.on('error', (err) => {
console.error(err);
console.error('Writer error:', err);
});
console.info(`Topic writer created`);

waitPromise = new Promise((resolve) => {
waitResolve = resolve;
});
writer.events.on('initResponse', (_v) => {
waitResolve();
await stepResult(`Writer initialized`, (resolve) => {
writer.events.once('initResponse', (_v) => {
resolve(undefined);
});
});
await waitPromise;
console.info(`Writer initialized`);

await writer.writeRequest({
// tx:
Expand All @@ -62,64 +57,63 @@ describe('Topic: General', () => {
// metadataItems: // TODO: Should I use this?
}],
});

waitPromise = new Promise((resolve) => {
waitResolve = resolve;
});
writer.events.on("writeResponse", (_v) => {
waitResolve();
await stepResult(`Message sent`, (resolve) => {
writer.events.once("writeResponse", (_v) => {
resolve(undefined);
});
});
await waitPromise;
console.info(`Message sent`);

await writer.dispose();
console.info(`Writer disposed`);
writer.close();
await stepResult(`Writer closed`, (resolve) => {
writer.events.once("end", () => {
resolve(undefined);
});
});

/////////////////////////////////////////////////
// Now read the message

const reader = await topicService.openReadStream({
readerName: 'reasder1',
consumer: 'testC',
topicsReadSettings: [{
path: 'myTopic2',
// partitionIds: [1],
}],
});
reader.events.on('error', (err) => {
console.error(err);
});

waitPromise = new Promise((resolve) => {
waitResolve = resolve;
});
reader.events.on('initResponse', (_v) => {
waitResolve();
});
await waitPromise;
console.info(`Topic reader created`);

// reader.readRequest({
// const reader = await topicService.openReadStreamWithEvents({
// readerName: 'reader1',
// consumer: 'testC',
// topicsReadSettings: [{
// path: 'myTopic2',
// partitionIds: [1],
// }],
// });
// reader.events.on('error', (err) => {
// console.error('Reader error:', err);
// });
//
// waitPromise = new Promise((resolve) => {
// waitResolve = resolve;
// await stepResult(`Topic reader created`, (resolve) => {
// reader.events.once("initResponse", () => {
// resolve(undefined);
// });
// });
// reader.events.on('readResponse', (v) => {
// console.info(`Message read: ${v}`)
// waitResolve();
//
// await stepResult(`Start partition`, (resolve) => {
// reader.events.once('startPartitionSessionRequest', (v) => {
// console.info(`Partition: ${v}`)
// reader.startPartitionSessionResponse({
// partitionSessionId: v.partitionSession?.partitionSessionId,
// });
// resolve(undefined);
// });
// });
//
// await stepResult(`Message read`, (resolve) => {
// reader.events.once('readResponse', (v) => {
// console.info(`Message: ${v}`)
// resolve(undefined);
// });
// });
//
// reader.close();
// await stepResult(`Reader closed`, (resolve) => {
// reader.events.once("end", () => {
// resolve(undefined);
// });
// });
// await waitPromise;

await reader.dispose();
console.info(`Reader disposed`);

await topicService.dispose();
console.info(`Topic service disposed`);
});

it('read: simple', async () => {

});

async function testOnOneSessionWithoutDriver() {
Expand All @@ -140,4 +134,16 @@ describe('Topic: General', () => {
logger,
);
}

async function stepResult<T>(message: String, cb: (resolve: (value: T | PromiseLike<T>) => void, reject: (reason?: any) => void) => T): Promise<T> {
return new Promise<T>((resolve, reject) => {
try {
cb(resolve, reject);
console.info(message);
} catch (err) {
reject(err);
console.error('Step failed:', err);
}
});
}
});
22 changes: 19 additions & 3 deletions src/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {parseConnectionString} from "./utils/parse-connection-string";
import {QueryClient} from "./query";
import {Logger} from "./logger/simple-logger";
import {getDefaultLogger} from "./logger/get-default-logger";
import {TopicService} from "./topic";

export interface IPoolSettings {
minLimit?: number;
Expand Down Expand Up @@ -38,10 +39,25 @@ export default class Driver {
private clientOptions?: ClientOptions;
private logger: Logger;
private discoveryService: DiscoveryService;
private _topicClient?: TopicService;

public tableClient: TableClient;
public queryClient: QueryClient;
public schemeClient: SchemeService;
public readonly tableClient: TableClient;
public readonly queryClient: QueryClient;
public readonly schemeClient: SchemeService;

public async getTopicClient() {
if (!this._topicClient) {
this._topicClient = new TopicService(
await this.discoveryService.getEndpoint(),
this.database,
this.authService,
this.logger,
this.sslCredentials,
this.clientOptions,
);
}
return this._topicClient;
}

constructor(settings: IDriverSettings) {
this.logger = settings.logger || getDefaultLogger();
Expand Down
5 changes: 2 additions & 3 deletions src/topic/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ Notice: This API is EXPERIMENTAL and may be changed or removed in a later releas

# TODO

- Internal service implementation
- Client
- Retryer
- Update token on streams
- Promise like streams wrappers
- Update auth token on streams
- Compression
- Transactions
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,15 @@ export type ReadStreamStopPartitionSessionArgs = Ydb.Topic.StreamReadMessage.ISt
export type ReadStreamStopPartitionSessionResult = Ydb.Topic.StreamReadMessage.IStopPartitionSessionResponse;
// & Required<Pick<Ydb.Topic.StreamWriteMessage.IInitResponse, 'path'>>;

export const STREAM_DESTROYED = 'stream-destroyed';

type ReadStreamEvents = {
initResponse: (resp: ReadStreamInitResult) => void,
readResponse: (resp: ReadStreamReadResult) => void,
commitOffsetResponse: (resp: ReadStreamCommitOffsetResult) => void,
partitionSessionStatusResponse: (resp: ReadStreamPartitionSessionStatusResult) => void,
startPartitionSessionRequest: (resp: ReadStreamStartPartitionSessionArgs) => void,
stopPartitionSessionRequest: (resp: ReadStreamStopPartitionSessionArgs) => void,

updateTokenResponse: (resp: ReadStreamUpdateTokenResult) => void,

error: (err: Error) => void,
'stream-destroyed': (stream: { dispose: () => {} }) => void, // TODO: Is end is not enough
end: (cause: any) => void,
}

Expand All @@ -65,7 +60,7 @@ export const enum TopicWriteStreamState {
Closed
}

export class TopicReadStream {
export class TopicReadStreamWithEvent {
public events = new EventEmitter() as TypedEmitter<ReadStreamEvents>;

private _state: TopicWriteStreamState = TopicWriteStreamState.Init;
Expand All @@ -86,8 +81,16 @@ export class TopicReadStream {
(v: Ydb.Topic.StreamReadMessage.IFromClient) => Ydb.Topic.StreamReadMessage.FromClient.encode(v).finish() as Buffer,
Ydb.Topic.StreamReadMessage.FromServer.decode,
this.topicService.metadata);

//// Uncomment to see all events
const stream = this.readBidiStream;
const oldEmit = stream.emit;
stream.emit = ((...args) => {
console.info('read event:', args);
return oldEmit.apply(stream, args as unknown as ['readable']);
}) as typeof oldEmit;

this.readBidiStream.on('data', (value) => {
console.info(2000, value)
try {
YdbError.checkStatus(value!)
} catch (err) {
Expand All @@ -108,85 +111,77 @@ export class TopicReadStream {
if (TransportError.isMember(err)) err = TransportError.convertToYdbError(err);
this.events.emit('error', err);
})
// this.writeBidiStream.on('status', (v) => {
// console.info(8200, v);
// })
// this.writeBidiStream.on('metadata', (v) => {
// console.info(8000, v);
// })
// this.writeBidiStream.on('finish', (v: any) => {
// console.info(8060, v);
// })

this.readBidiStream.on('end', () => {
this._state = TopicWriteStreamState.Closed;
delete this.readBidiStream; // so there was no way to send more messages
});
this.initRequest(opts);
};

private initRequest(opts: ReadStreamInitArgs) {
if (!this.readBidiStream) throw new Error('Stream is not opened')
this.readBidiStream.write(
this.readBidiStream!.write(
Ydb.Topic.StreamReadMessage.create({
initRequest: Ydb.Topic.StreamReadMessage.InitRequest.create(opts),
}));
}

public readRequest(opts: ReadStreamReadArgs) {
if (!this.readBidiStream) throw new Error('Stream is not opened')
if (!this.readBidiStream) throw new Error('Stream is closed')
this.readBidiStream.write(
Ydb.Topic.StreamReadMessage.FromClient.create({
readRequest: Ydb.Topic.StreamReadMessage.ReadRequest.create(opts),
}));
}

public commitOffsetRequest(opts: ReadStreamCommitOffsetArgs) {
if (!this.readBidiStream) throw new Error('Stream is not opened')
if (!this.readBidiStream) throw new Error('Stream is closed')
this.readBidiStream.write(
Ydb.Topic.StreamReadMessage.FromClient.create({
commitOffsetRequest: Ydb.Topic.StreamReadMessage.CommitOffsetRequest.create(opts),
}));
}

public partitionSessionStatusRequest(opts: ReadStreamPartitionSessionStatusArgs) {
if (!this.readBidiStream) throw new Error('Stream is not opened')
if (!this.readBidiStream) throw new Error('Stream is closed')
this.readBidiStream.write(
Ydb.Topic.StreamReadMessage.FromClient.create({
partitionSessionStatusRequest: Ydb.Topic.StreamReadMessage.PartitionSessionStatusRequest.create(opts),
}));
}

public updateTokenRequest(opts: ReadStreamUpdateTokenArgs) {
if (!this.readBidiStream) throw new Error('Stream is not opened')
if (!this.readBidiStream) throw new Error('Stream is closed')
this.readBidiStream.write(
Ydb.Topic.StreamReadMessage.FromClient.create({
updateTokenRequest: Ydb.Topic.UpdateTokenRequest.create(opts),
}));
}

public startPartitionSessionResponse(opts: ReadStreamStartPartitionSessionResult) {
if (!this.readBidiStream) throw new Error('Stream is not opened')
if (!this.readBidiStream) throw new Error('Stream is closed')
this.readBidiStream.write(
Ydb.Topic.StreamReadMessage.FromClient.create({
startPartitionSessionResponse: Ydb.Topic.StreamReadMessage.StartPartitionSessionResponse.create(opts),
}));
}

public stopPartitionSessionResponse(opts: ReadStreamStopPartitionSessionResult) {
if (!this.readBidiStream) throw new Error('Stream is not opened')
if (!this.readBidiStream) throw new Error('Stream is closed')
this.readBidiStream.write(
Ydb.Topic.StreamReadMessage.FromClient.create({
stopPartitionSessionResponse: Ydb.Topic.StreamReadMessage.StopPartitionSessionResponse.create(opts),
}));
}

public async close() {
if (!this.readBidiStream) throw new Error('Stream is not opened')
if (!this.readBidiStream) throw new Error('Stream is closed')
this._state = TopicWriteStreamState.Closing;
this.readBidiStream.end();
delete this.readBidiStream; // so there was no way to send more messages
// TODO: Is there a way to keep waiting for later ACKs?
}

public async dispose() {
await this.close();
this.events.emit(STREAM_DESTROYED, this);
this._state = TopicWriteStreamState.Closed;
}

Expand Down
Loading

0 comments on commit ff4e90e

Please sign in to comment.