Skip to content

Commit

Permalink
Merge pull request #416 from ydb-platform/topic
Browse files Browse the repository at this point in the history
chore: public topic parameters in code without references to protobub…
  • Loading branch information
Alexey Zorkaltsev authored Oct 9, 2024
2 parents 1b47ccf + f185590 commit 3c920b8
Show file tree
Hide file tree
Showing 12 changed files with 392 additions and 250 deletions.
75 changes: 57 additions & 18 deletions examples/topic-service/index.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,60 @@
import {Driver as YDB} from '../../src';
import {AnonymousAuthService} from '../../src/credentials/anonymous-auth-service';
import {SimpleLogger} from "../../src/logger/simple-logger";
import {Driver as YDB, getCredentialsFromEnv} from 'ydb-sdk';
import {Ydb} from "ydb-sdk-proto";
import {Context} from "../../src/context";
import {getDefaultLogger} from "../../src/logger/get-default-logger";
import {main} from "../utils";
import Codec = Ydb.Topic.Codec;
import {Context} from "ydb-sdk/build/cjs/src/context/context";

require('dotenv').config();

const DATABASE = '/local';
const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';

async function main() {
async function run() {
const logger = getDefaultLogger();
const authService = getCredentialsFromEnv(logger);
const db = new YDB({
endpoint: ENDPOINT,
database: DATABASE,
authService: new AnonymousAuthService(),
logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}),
endpoint: ENDPOINT, // i.e.: grc(s)://<x.x.x.x>
database: DATABASE, // i.e.: '/local'
authService, logger
// logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}),
});
if (!(await db.ready(3000))) throw new Error('Driver is not ready!');
try {
await db.topic.createTopic({
path: 'demoTopic',
supportedCodecs: {
codecs: [Ydb.Topic.Codec.CODEC_RAW],
},
partitioningSettings: {
minActivePartitions: 3,
},
consumers: [{
name: 'demo',
name: 'demoConsumer',
}],
});

await db.topic.alterTopic({
path: 'demoTopic',
addConsumers: [{
name: 'anotherqDemoConsumer',
}],
setSupportedCodecs: {
codecs: [Ydb.Topic.Codec.CODEC_RAW, Codec.CODEC_GZIP],
},
});

logger.info(await db.topic.describeTopic({
path: 'demoTopic',
}));

const writer = await db.topic.createWriter({
path: 'demoTopic',
// producerId: '...', // will be genereted automatically
// messageGroupId: '...' // will be the same as producerId
getLastSeqNo: true, // seqNo will be assigned automatically
});
await writer.sendMessages({
await writer.send({
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.from('Hello, world'),
Expand All @@ -39,24 +63,34 @@ async function main() {
});
const promises = [];
for (let n = 0; n < 4; n++) {
promises.push(writer.sendMessages({
promises.push(writer.send({
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.from(`Message N${n}`),
uncompressedSize: `Message N${n}`.length,
metadataItems: [
{
key: 'key',
value: new TextEncoder().encode('value'),
},
{
key: 'key2',
value: new TextEncoder().encode('value2'),
}
],
}],
}));
}));
}
await writer.close();
await writer.close(); // // graceful close() - will finish after receiving confirmation that all messages have been processed by the server
// await Promise.all(promises); // another option

await Promise.all(promises);
const reader = await db.topic.createReader(Context.createNew({
timeout: 3000,
}).ctx, {
topicsReadSettings: [{
path: 'demoTopic',
}],
consumer: 'demo',
consumer: 'demoConsumer',
receiveBufferSizeInBytes: 10_000_000,
});
try {
Expand All @@ -68,10 +102,15 @@ async function main() {
if (!Context.isTimeout(err)) throw err;
console.info('Timeout is over!');
}
await reader.close(true); // graceful close() - complete when all messages are commited
await reader.close(); // graceful close() - will complete when processing of all currently processed messages will finish

await db.topic.dropTopic({
path: 'demoTopic',
});

} finally {
await db.destroy();
}
}

main();
main(run);
6 changes: 3 additions & 3 deletions src/__tests__/e2e/topic-service/internal.test.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
ReadStreamStartPartitionSessionArgs
} from "../../../topic/internal/topic-read-stream-with-events";
import {WriteStreamInitResult, WriteStreamWriteResult} from "../../../topic/internal/topic-write-stream-with-events";
import {TopicNodeClient} from "../../../topic/internal/topic-node-client";
import {TopicClientOnParticularNode} from "../../../topic/internal/topic-node-client";
import {Context} from "../../../context";
import {RetryParameters} from "../../../retries/retryParameters";
import {RetryStrategy} from "../../../retries/retryStrategy";
Expand All @@ -24,7 +24,7 @@ const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';

describe('Topic: General', () => {
let discoveryService: DiscoveryService;
let topicService: TopicNodeClient;
let topicService: TopicClientOnParticularNode;
const ctx = Context.createNew().ctx;

beforeEach(async () => {
Expand Down Expand Up @@ -191,7 +191,7 @@ describe('Topic: General', () => {
logger,
});
await discoveryService.ready(ENDPOINT_DISCOVERY_PERIOD);
topicService = new TopicNodeClient(
topicService = new TopicClientOnParticularNode(
await discoveryService.getEndpoint(), // TODO: Should be one per endpoint
DATABASE,
authService,
Expand Down
2 changes: 1 addition & 1 deletion src/context/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ export class Context {
}

/**
* Makes a promise cancellable through context, if the context allows cancel or has a timeout.
* Makes a pr omise cancellable through context, if the context allows cancel or has a timeout.
*/
public cancelRace<T>(promise: Promise<T>): Promise<T> {
if (!this.onCancel) return promise;
Expand Down
4 changes: 2 additions & 2 deletions src/discovery/discovery-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {retryable} from "../retries_obsoleted";
import {getOperationPayload} from "../utils/process-ydb-operation-result";
import {AuthenticatedService, withTimeout} from "../utils";
import {Logger} from "../logger/simple-logger";
import {TopicNodeClient} from "../topic/internal/topic-node-client";
import {InternalTopicClient} from "../topic/internal/internal-topic-client";
import {IDiscoverySettings} from "../client/settings";

type FailureDiscoveryHandler = (err: Error) => void;
Expand Down Expand Up @@ -146,7 +146,7 @@ export default class DiscoveryService extends AuthenticatedService<DiscoveryServ
public async getTopicNodeClient() {
const endpoint = await this.getEndpoint();
if (!endpoint.topicNodeClient) {
endpoint.topicNodeClient = new TopicNodeClient(endpoint, this.database, this.authService, this.logger, this.sslCredentials, this.clientOptions);
endpoint.topicNodeClient = new InternalTopicClient(endpoint, this.database, this.authService, this.logger, this.sslCredentials, this.clientOptions);
}
return endpoint.topicNodeClient;
}
Expand Down
4 changes: 2 additions & 2 deletions src/discovery/endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import IEndpointInfo = Ydb.Discovery.IEndpointInfo;
import * as grpc from "@grpc/grpc-js";
import {ISslCredentials} from "../utils/ssl-credentials";
import {ClientOptions} from "../utils";
import {TopicNodeClient} from "../topic/internal/topic-node-client";
import {InternalTopicClient} from "../topic/internal/internal-topic-client";

export type SuccessDiscoveryHandler = (result: Endpoint[]) => void;

Expand All @@ -18,7 +18,7 @@ export class Endpoint extends Ydb.Discovery.EndpointInfo {

private pessimizedAt: DateTime | null;

public topicNodeClient?: TopicNodeClient;
public topicNodeClient?: InternalTopicClient;

static fromString(host: string) {
const match = Endpoint.HOST_RE.exec(host);
Expand Down
102 changes: 52 additions & 50 deletions src/query/query-session-execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,35 @@ import {CtxUnsubcribe} from "../context";
import IExecuteQueryRequest = Ydb.Query.IExecuteQueryRequest;
import IColumn = Ydb.IColumn;

export type IExecuteArgs = {
/**
* SQL query / DDL etc.
*
*/
text: string,
/**
* Default value is SYNTAX_YQL_V1.
*/
syntax?: Ydb.Query.Syntax,
/**
* SQL query parameters.
*/
parameters?: { [k: string]: Ydb.ITypedValue },
txControl?: Ydb.Query.ITransactionControl,
execMode?: Ydb.Query.ExecMode,
statsMode?: Ydb.Query.StatsMode,
concurrentResultSets?: boolean,
/**
* Operation timeout in ms
*/
// timeout?: number, // TODO: that make sense to timeout one op?
/**
* Default Native.
*/
rowMode?: RowType,
idempotent?: boolean,
};

export type IExecuteResult = {
resultSets: AsyncGenerator<ResultSet>,
execStats?: Ydb.TableStats.IQueryStats;
Expand Down Expand Up @@ -51,73 +80,46 @@ export const enum RowType {
* Finishes when the first data block is received or when the end of the stream is received. So if you are sure
* that the operation does not return any data, you may not process resultSets.
*/
export function execute(this: QuerySession, opts: {
/**
* SQL query / DDL etc.
*
*/
text: string,
/**
* Default value is SYNTAX_YQL_V1.
*/
syntax?: Ydb.Query.Syntax,
/**
* SQL query parameters.
*/
parameters?: { [k: string]: Ydb.ITypedValue },
txControl?: Ydb.Query.ITransactionControl,
execMode?: Ydb.Query.ExecMode,
statsMode?: Ydb.Query.StatsMode,
concurrentResultSets?: boolean,
/**
* Operation timeout in ms
*/
// timeout?: number, // TODO: that make sense to timeout one op?
/**
* Default Native.
*/
rowMode?: RowType,
idempotent?: boolean,
}): Promise<IExecuteResult> {
// Validate opts
if (!opts.text.trim()) throw new Error('"text" parameter is empty')
if (opts.parameters)
Object.keys(opts.parameters).forEach(n => {
export function execute(this: QuerySession, args: IExecuteArgs): Promise<IExecuteResult> {
// Validate args
if (!args.text.trim()) throw new Error('"text" parameter is empty')
if (args.parameters)
Object.keys(args.parameters).forEach(n => {
if (!n.startsWith('$')) throw new Error(`Parameter name must start with "$": ${n}`);
})
if (opts.txControl && this[sessionTxSettingsSymbol])
if (args.txControl && this[sessionTxSettingsSymbol])
throw new Error(CANNOT_MANAGE_TRASACTIONS_ERROR);
if (opts.txControl?.txId)
if (args.txControl?.txId)
throw new Error('Cannot contain txControl.txId because the current session transaction is used (see session.txId)');
if (this[sessionTxIdSymbol]) {
if (opts.txControl?.beginTx)
if (args.txControl?.beginTx)
throw new Error('txControl.beginTx when there\'s already an open transaction');
} else {
if (opts.txControl?.commitTx && !opts.txControl?.beginTx)
if (args.txControl?.commitTx && !args.txControl?.beginTx)
throw new Error('txControl.commitTx === true when no open transaction and there\'s no txControl.beginTx');
}

// Build params
const executeQueryRequest: IExecuteQueryRequest = {
sessionId: this.sessionId,
queryContent: {
text: opts.text,
syntax: opts.syntax ?? Ydb.Query.Syntax.SYNTAX_YQL_V1,
text: args.text,
syntax: args.syntax ?? Ydb.Query.Syntax.SYNTAX_YQL_V1,
},
execMode: opts.execMode ?? Ydb.Query.ExecMode.EXEC_MODE_EXECUTE,
execMode: args.execMode ?? Ydb.Query.ExecMode.EXEC_MODE_EXECUTE,
};
if (opts.statsMode) executeQueryRequest.statsMode = opts.statsMode;
if (opts.parameters) executeQueryRequest.parameters = opts.parameters;
if (args.statsMode) executeQueryRequest.statsMode = args.statsMode;
if (args.parameters) executeQueryRequest.parameters = args.parameters;
if (this[sessionTxSettingsSymbol] && !this[sessionTxIdSymbol])
executeQueryRequest.txControl = {beginTx: this[sessionTxSettingsSymbol], commitTx: false};
else if (opts.txControl)
executeQueryRequest.txControl = opts.txControl;
else if (args.txControl)
executeQueryRequest.txControl = args.txControl;
if (this[sessionTxIdSymbol])
(executeQueryRequest.txControl || (executeQueryRequest.txControl = {})).txId = this[sessionTxIdSymbol];
executeQueryRequest.concurrentResultSets = opts.concurrentResultSets ?? false;
if (opts.hasOwnProperty('idempotent')) {
executeQueryRequest.concurrentResultSets = args.concurrentResultSets ?? false;
if (args.hasOwnProperty('idempotent')) {
if (this[isIdempotentDoLevelSymbol]) throw new Error('The attribute of idempotency is already set at the level of do()');
if (opts.idempotent) this[isIdempotentSymbol] = true;
if (args.idempotent) this[isIdempotentSymbol] = true;
}

// Run the operation
Expand Down Expand Up @@ -197,13 +199,13 @@ export function execute(this: QuerySession, opts: {
let resultSetTuple = resultSetByIndex[index];
if (!resultSetTuple) {
iterator = buildAsyncQueueIterator<Ydb.IValue>();
switch (opts.rowMode) {
switch (args.rowMode) {
case RowType.Ydb:
resultSet = new ResultSet(index, partialResp.resultSet!.columns as IColumn[], opts.rowMode ?? RowType.Native, iterator);
resultSet = new ResultSet(index, partialResp.resultSet!.columns as IColumn[], args.rowMode ?? RowType.Native, iterator);
break;
default: // Native
const nativeColumnsNames = (partialResp.resultSet!.columns as IColumn[]).map(v => snakeToCamelCaseConversion.ydbToJs(v.name!));
resultSet = new ResultSet(index, nativeColumnsNames, opts.rowMode ?? RowType.Native, iterator);
resultSet = new ResultSet(index, nativeColumnsNames, args.rowMode ?? RowType.Native, iterator);
resultSet[resultsetYdbColumnsSymbol] = partialResp.resultSet!.columns as IColumn[];
}
resultSetIterator.push(resultSet);
Expand All @@ -216,7 +218,7 @@ export function execute(this: QuerySession, opts: {
[iterator, resultSet] = resultSetTuple;
}

switch (opts.rowMode) {
switch (args.rowMode) {
case RowType.Ydb:
for (const row of partialResp.resultSet!.rows!) iterator.push(row);
break;
Expand Down
Loading

0 comments on commit 3c920b8

Please sign in to comment.