Skip to content

Commit

Permalink
chore: initial MVP of Topic service
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Zorkaltsev committed Jul 23, 2024
1 parent ec5aef0 commit 0e1de98
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 123 deletions.
112 changes: 112 additions & 0 deletions src/__tests__/e2e/topic-service/non-jest-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import {getDefaultLogger} from "../../../logger/get-default-logger";
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
import DiscoveryService from "../../../discovery/discovery-service";
import {ENDPOINT_DISCOVERY_PERIOD} from "../../../constants";
import {TopicService} from "../../../topic";
// @ts-ignore
import {google, Ydb} from "ydb-sdk-proto";

const DATABASE = '/local';
const ENDPOINT = 'grpc://localhost:2136';

async function testOnOneSessionWithoutDriver() {
const logger = getDefaultLogger();
const authService = new AnonymousAuthService();
const discoveryService = new DiscoveryService({
endpoint: ENDPOINT,
database: DATABASE,
authService,
discoveryPeriod: ENDPOINT_DISCOVERY_PERIOD,
logger,
});
await discoveryService.ready(ENDPOINT_DISCOVERY_PERIOD);
const topicService = new TopicService(
await discoveryService.getEndpoint(), // TODO: Should be one per endpoint
DATABASE,
authService,
logger,
);
return topicService;
}

(async () => {
const topicService = await testOnOneSessionWithoutDriver();

console.info(1000);
await topicService.createTopic({
path: 'MyTopic',
});

const writer = await topicService.openWriteStream({
path: 'MyTopic',
});

// @ts-ignore
let waitResolve: any;
// @ts-ignore
let waitPromise = new Promise((resolve) => {
waitResolve = resolve;
})

writer.events.on('initResponse', (v) => {
console.info(3900, v);
waitResolve();
});

waitPromise = new Promise((resolve) => {
waitResolve = resolve;
})
writer.events.on("writeResponse", (v) => {
console.info(4000, v);
// waitResolve();
})

writer.writeRequest({
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.alloc(1000, 'test messsage'),
uncompressedSize: 'test messsage'.length,
seqNo: 1,
createdAt: google.protobuf.Timestamp.create({
seconds: Date.now() / 1000,
nanos: Date.now() % 1000,
}),
messageGroupId: 'abc', // TODO: Check examples
partitionId: 1,
// metadataItems: // TODO: Should I use this?
}],
});

console.info(4100);

// await new Promise((resolve) => setTimeout(resolve, 4_000));

// console.info(1300);
//
await writer.dispose();

// -----

const reader = await topicService.openReadStream({
readerName: 'reasder1',
consumer: 'testConsumer',
topicsReadSettings: [{
path: 'MyTopic',
partitionIds: [1],
}],
});

waitPromise = new Promise((resolve) => {
waitResolve = resolve;
})
reader.events.on("readResponse", (data) => {
console.info(`Read from myTopic ${JSON.stringify(data, null, 2)}`);
waitResolve();
});

await waitPromise;

await reader.dispose();

console.info(`Test completed`);
})();
120 changes: 87 additions & 33 deletions src/__tests__/e2e/topic-service/topic-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {ENDPOINT_DISCOVERY_PERIOD} from "../../../constants";
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
import {getDefaultLogger} from "../../../logger/get-default-logger";
import {TopicService} from "../../../topic";
// import {google, Ydb} from "ydb-sdk-proto";
import {google, Ydb} from "ydb-sdk-proto";

const DATABASE = '/local';
const ENDPOINT = 'grpc://localhost:2136';
Expand All @@ -22,46 +22,100 @@ describe('Topic: General', () => {
});

it.only('write: simple', async () => {
console.info(1000);
let waitResolve: any, waitPromise: Promise<any>;

await topicService.createTopic({
path: 'MyTopic',
path: 'myTopic2',
});
console.info(`Service created`);

const writer = await topicService.openWriteStream({
path: 'MyTopic',
path: 'myTopic2',
});
writer.events.on('error', (err) => {
console.error(err);
});
console.info(`Topic writer created`);

waitPromise = new Promise((resolve) => {
waitResolve = resolve;
});
writer.events.on('initResponse', (_v) => {
waitResolve();
});
await waitPromise;
console.info(`Writer initialized`);

// expect()

// writer.events.on('initResponse', (resp) => {
// resp.
// })

// writer.write(Ydb.Topic.StreamWriteMessage.WriteRequest.create({
// messages: [
// Ydb.Topic.StreamWriteMessage.WriteRequest.MessageData.create({
// seqNo: 1,
// createdAt: google.protobuf.Timestamp.create({
// seconds: 100,
// nanos: 0,
// }),
// // metadataItems: [
// // Ydb.Topic.MetadataItem.create({
// // key: 'a',
// // value: [0, 1],
// // }),
// // ]
// // uncompressedSize: 100,
// // data: new Buffer(),
// }),
// ],
// }));

await new Promise((resolve) => setTimeout(resolve, 4_000));

console.info(1300);
await writer.writeRequest({
// tx:
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.alloc(100, '1234567890'),
uncompressedSize: '1234567890'.length,
seqNo: 1,
createdAt: google.protobuf.Timestamp.create({
seconds: Date.now() / 1000,
nanos: Date.now() % 1000,
}),
messageGroupId: 'abc', // TODO: Check examples
partitionId: 1,
// metadataItems: // TODO: Should I use this?
}],
});

waitPromise = new Promise((resolve) => {
waitResolve = resolve;
});
writer.events.on("writeResponse", (_v) => {
waitResolve();
});
await waitPromise;
console.info(`Message sent`);

await writer.dispose();
console.info(`Writer disposed`);

/////////////////////////////////////////////////
// Now read the message

const reader = await topicService.openReadStream({
readerName: 'reasder1',
consumer: 'testC',
topicsReadSettings: [{
path: 'myTopic2',
// partitionIds: [1],
}],
});
reader.events.on('error', (err) => {
console.error(err);
});

waitPromise = new Promise((resolve) => {
waitResolve = resolve;
});
reader.events.on('initResponse', (_v) => {
waitResolve();
});
await waitPromise;
console.info(`Topic reader created`);

// reader.readRequest({
// });
//
// waitPromise = new Promise((resolve) => {
// waitResolve = resolve;
// });
// reader.events.on('readResponse', (v) => {
// console.info(`Message read: ${v}`)
// waitResolve();
// });
// await waitPromise;

await reader.dispose();
console.info(`Reader disposed`);

await topicService.dispose();
console.info(`Topic service disposed`);
});

it('read: simple', async () => {
Expand Down
Loading

0 comments on commit 0e1de98

Please sign in to comment.