Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: public topic parameters in code without references to protobub… #416

Merged
merged 3 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 57 additions & 18 deletions examples/topic-service/index.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,60 @@
import {Driver as YDB} from '../../src';
import {AnonymousAuthService} from '../../src/credentials/anonymous-auth-service';
import {SimpleLogger} from "../../src/logger/simple-logger";
import {Driver as YDB, getCredentialsFromEnv} from 'ydb-sdk';
import {Ydb} from "ydb-sdk-proto";
import {Context} from "../../src/context";
import {getDefaultLogger} from "../../src/logger/get-default-logger";
import {main} from "../utils";
import Codec = Ydb.Topic.Codec;
import {Context} from "ydb-sdk/build/cjs/src/context/context";

require('dotenv').config();

const DATABASE = '/local';
const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';

async function main() {
async function run() {
const logger = getDefaultLogger();
const authService = getCredentialsFromEnv(logger);
const db = new YDB({
endpoint: ENDPOINT,
database: DATABASE,
authService: new AnonymousAuthService(),
logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}),
endpoint: ENDPOINT, // i.e.: grc(s)://<x.x.x.x>
database: DATABASE, // i.e.: '/local'
authService, logger
// logger: new SimpleLogger({envKey: 'YDB_TEST_LOG_LEVEL'}),
});
if (!(await db.ready(3000))) throw new Error('Driver is not ready!');
try {
await db.topic.createTopic({
path: 'demoTopic',
supportedCodecs: {
codecs: [Ydb.Topic.Codec.CODEC_RAW],
},
partitioningSettings: {
minActivePartitions: 3,
},
consumers: [{
name: 'demo',
name: 'demoConsumer',
}],
});

await db.topic.alterTopic({
path: 'demoTopic',
addConsumers: [{
name: 'anotherqDemoConsumer',
}],
setSupportedCodecs: {
codecs: [Ydb.Topic.Codec.CODEC_RAW, Codec.CODEC_GZIP],
},
});

logger.info(await db.topic.describeTopic({
path: 'demoTopic',
}));

const writer = await db.topic.createWriter({
path: 'demoTopic',
// producerId: '...', // will be genereted automatically
// messageGroupId: '...' // will be the same as producerId
getLastSeqNo: true, // seqNo will be assigned automatically
});
await writer.sendMessages({
await writer.send({
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.from('Hello, world'),
Expand All @@ -39,24 +63,34 @@ async function main() {
});
const promises = [];
for (let n = 0; n < 4; n++) {
promises.push(writer.sendMessages({
promises.push(writer.send({
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.from(`Message N${n}`),
uncompressedSize: `Message N${n}`.length,
metadataItems: [
{
key: 'key',
value: new TextEncoder().encode('value'),
},
{
key: 'key2',
value: new TextEncoder().encode('value2'),
}
],
}],
}));
}));
}
await writer.close();
await writer.close(); // // graceful close() - will finish after receiving confirmation that all messages have been processed by the server
// await Promise.all(promises); // another option

await Promise.all(promises);
const reader = await db.topic.createReader(Context.createNew({
timeout: 3000,
}).ctx, {
topicsReadSettings: [{
path: 'demoTopic',
}],
consumer: 'demo',
consumer: 'demoConsumer',
receiveBufferSizeInBytes: 10_000_000,
});
try {
Expand All @@ -68,10 +102,15 @@ async function main() {
if (!Context.isTimeout(err)) throw err;
console.info('Timeout is over!');
}
await reader.close(true); // graceful close() - complete when all messages are commited
await reader.close(); // graceful close() - will complete when processing of all currently processed messages will finish

await db.topic.dropTopic({
path: 'demoTopic',
});

} finally {
await db.destroy();
}
}

main();
main(run);
6 changes: 3 additions & 3 deletions src/__tests__/e2e/topic-service/internal.test.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
ReadStreamStartPartitionSessionArgs
} from "../../../topic/internal/topic-read-stream-with-events";
import {WriteStreamInitResult, WriteStreamWriteResult} from "../../../topic/internal/topic-write-stream-with-events";
import {TopicNodeClient} from "../../../topic/internal/topic-node-client";
import {TopicClientOnParticularNode} from "../../../topic/internal/topic-node-client";
import {Context} from "../../../context";
import {RetryParameters} from "../../../retries/retryParameters";
import {RetryStrategy} from "../../../retries/retryStrategy";
Expand All @@ -24,7 +24,7 @@ const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';

describe('Topic: General', () => {
let discoveryService: DiscoveryService;
let topicService: TopicNodeClient;
let topicService: TopicClientOnParticularNode;
const ctx = Context.createNew().ctx;

beforeEach(async () => {
Expand Down Expand Up @@ -191,7 +191,7 @@ describe('Topic: General', () => {
logger,
});
await discoveryService.ready(ENDPOINT_DISCOVERY_PERIOD);
topicService = new TopicNodeClient(
topicService = new TopicClientOnParticularNode(
await discoveryService.getEndpoint(), // TODO: Should be one per endpoint
DATABASE,
authService,
Expand Down
2 changes: 1 addition & 1 deletion src/context/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ export class Context {
}

/**
* Makes a promise cancellable through context, if the context allows cancel or has a timeout.
* Makes a pr omise cancellable through context, if the context allows cancel or has a timeout.
*/
public cancelRace<T>(promise: Promise<T>): Promise<T> {
if (!this.onCancel) return promise;
Expand Down
4 changes: 2 additions & 2 deletions src/discovery/discovery-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {retryable} from "../retries_obsoleted";
import {getOperationPayload} from "../utils/process-ydb-operation-result";
import {AuthenticatedService, withTimeout} from "../utils";
import {Logger} from "../logger/simple-logger";
import {TopicNodeClient} from "../topic/internal/topic-node-client";
import {InternalTopicClient} from "../topic/internal/internal-topic-client";
import {IDiscoverySettings} from "../client/settings";

type FailureDiscoveryHandler = (err: Error) => void;
Expand Down Expand Up @@ -146,7 +146,7 @@ export default class DiscoveryService extends AuthenticatedService<DiscoveryServ
public async getTopicNodeClient() {
const endpoint = await this.getEndpoint();
if (!endpoint.topicNodeClient) {
endpoint.topicNodeClient = new TopicNodeClient(endpoint, this.database, this.authService, this.logger, this.sslCredentials, this.clientOptions);
endpoint.topicNodeClient = new InternalTopicClient(endpoint, this.database, this.authService, this.logger, this.sslCredentials, this.clientOptions);
}
return endpoint.topicNodeClient;
}
Expand Down
4 changes: 2 additions & 2 deletions src/discovery/endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import IEndpointInfo = Ydb.Discovery.IEndpointInfo;
import * as grpc from "@grpc/grpc-js";
import {ISslCredentials} from "../utils/ssl-credentials";
import {ClientOptions} from "../utils";
import {TopicNodeClient} from "../topic/internal/topic-node-client";
import {InternalTopicClient} from "../topic/internal/internal-topic-client";

export type SuccessDiscoveryHandler = (result: Endpoint[]) => void;

Expand All @@ -18,7 +18,7 @@ export class Endpoint extends Ydb.Discovery.EndpointInfo {

private pessimizedAt: DateTime | null;

public topicNodeClient?: TopicNodeClient;
public topicNodeClient?: InternalTopicClient;

static fromString(host: string) {
const match = Endpoint.HOST_RE.exec(host);
Expand Down
102 changes: 52 additions & 50 deletions src/query/query-session-execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,35 @@ import {CtxUnsubcribe} from "../context";
import IExecuteQueryRequest = Ydb.Query.IExecuteQueryRequest;
import IColumn = Ydb.IColumn;

export type IExecuteArgs = {
/**
* SQL query / DDL etc.
*
*/
text: string,
/**
* Default value is SYNTAX_YQL_V1.
*/
syntax?: Ydb.Query.Syntax,
/**
* SQL query parameters.
*/
parameters?: { [k: string]: Ydb.ITypedValue },
txControl?: Ydb.Query.ITransactionControl,
execMode?: Ydb.Query.ExecMode,
statsMode?: Ydb.Query.StatsMode,
concurrentResultSets?: boolean,
/**
* Operation timeout in ms
*/
// timeout?: number, // TODO: that make sense to timeout one op?
/**
* Default Native.
*/
rowMode?: RowType,
idempotent?: boolean,
};

export type IExecuteResult = {
resultSets: AsyncGenerator<ResultSet>,
execStats?: Ydb.TableStats.IQueryStats;
Expand Down Expand Up @@ -51,73 +80,46 @@ export const enum RowType {
* Finishes when the first data block is received or when the end of the stream is received. So if you are sure
* that the operation does not return any data, you may not process resultSets.
*/
export function execute(this: QuerySession, opts: {
/**
* SQL query / DDL etc.
*
*/
text: string,
/**
* Default value is SYNTAX_YQL_V1.
*/
syntax?: Ydb.Query.Syntax,
/**
* SQL query parameters.
*/
parameters?: { [k: string]: Ydb.ITypedValue },
txControl?: Ydb.Query.ITransactionControl,
execMode?: Ydb.Query.ExecMode,
statsMode?: Ydb.Query.StatsMode,
concurrentResultSets?: boolean,
/**
* Operation timeout in ms
*/
// timeout?: number, // TODO: that make sense to timeout one op?
/**
* Default Native.
*/
rowMode?: RowType,
idempotent?: boolean,
}): Promise<IExecuteResult> {
// Validate opts
if (!opts.text.trim()) throw new Error('"text" parameter is empty')
if (opts.parameters)
Object.keys(opts.parameters).forEach(n => {
export function execute(this: QuerySession, args: IExecuteArgs): Promise<IExecuteResult> {
// Validate args
if (!args.text.trim()) throw new Error('"text" parameter is empty')
if (args.parameters)
Object.keys(args.parameters).forEach(n => {
if (!n.startsWith('$')) throw new Error(`Parameter name must start with "$": ${n}`);
})
if (opts.txControl && this[sessionTxSettingsSymbol])
if (args.txControl && this[sessionTxSettingsSymbol])
throw new Error(CANNOT_MANAGE_TRASACTIONS_ERROR);
if (opts.txControl?.txId)
if (args.txControl?.txId)
throw new Error('Cannot contain txControl.txId because the current session transaction is used (see session.txId)');
if (this[sessionTxIdSymbol]) {
if (opts.txControl?.beginTx)
if (args.txControl?.beginTx)
throw new Error('txControl.beginTx when there\'s already an open transaction');
} else {
if (opts.txControl?.commitTx && !opts.txControl?.beginTx)
if (args.txControl?.commitTx && !args.txControl?.beginTx)
throw new Error('txControl.commitTx === true when no open transaction and there\'s no txControl.beginTx');
}

// Build params
const executeQueryRequest: IExecuteQueryRequest = {
sessionId: this.sessionId,
queryContent: {
text: opts.text,
syntax: opts.syntax ?? Ydb.Query.Syntax.SYNTAX_YQL_V1,
text: args.text,
syntax: args.syntax ?? Ydb.Query.Syntax.SYNTAX_YQL_V1,
},
execMode: opts.execMode ?? Ydb.Query.ExecMode.EXEC_MODE_EXECUTE,
execMode: args.execMode ?? Ydb.Query.ExecMode.EXEC_MODE_EXECUTE,
};
if (opts.statsMode) executeQueryRequest.statsMode = opts.statsMode;
if (opts.parameters) executeQueryRequest.parameters = opts.parameters;
if (args.statsMode) executeQueryRequest.statsMode = args.statsMode;
if (args.parameters) executeQueryRequest.parameters = args.parameters;
if (this[sessionTxSettingsSymbol] && !this[sessionTxIdSymbol])
executeQueryRequest.txControl = {beginTx: this[sessionTxSettingsSymbol], commitTx: false};
else if (opts.txControl)
executeQueryRequest.txControl = opts.txControl;
else if (args.txControl)
executeQueryRequest.txControl = args.txControl;
if (this[sessionTxIdSymbol])
(executeQueryRequest.txControl || (executeQueryRequest.txControl = {})).txId = this[sessionTxIdSymbol];
executeQueryRequest.concurrentResultSets = opts.concurrentResultSets ?? false;
if (opts.hasOwnProperty('idempotent')) {
executeQueryRequest.concurrentResultSets = args.concurrentResultSets ?? false;
if (args.hasOwnProperty('idempotent')) {
if (this[isIdempotentDoLevelSymbol]) throw new Error('The attribute of idempotency is already set at the level of do()');
if (opts.idempotent) this[isIdempotentSymbol] = true;
if (args.idempotent) this[isIdempotentSymbol] = true;
}

// Run the operation
Expand Down Expand Up @@ -197,13 +199,13 @@ export function execute(this: QuerySession, opts: {
let resultSetTuple = resultSetByIndex[index];
if (!resultSetTuple) {
iterator = buildAsyncQueueIterator<Ydb.IValue>();
switch (opts.rowMode) {
switch (args.rowMode) {
case RowType.Ydb:
resultSet = new ResultSet(index, partialResp.resultSet!.columns as IColumn[], opts.rowMode ?? RowType.Native, iterator);
resultSet = new ResultSet(index, partialResp.resultSet!.columns as IColumn[], args.rowMode ?? RowType.Native, iterator);
break;
default: // Native
const nativeColumnsNames = (partialResp.resultSet!.columns as IColumn[]).map(v => snakeToCamelCaseConversion.ydbToJs(v.name!));
resultSet = new ResultSet(index, nativeColumnsNames, opts.rowMode ?? RowType.Native, iterator);
resultSet = new ResultSet(index, nativeColumnsNames, args.rowMode ?? RowType.Native, iterator);
resultSet[resultsetYdbColumnsSymbol] = partialResp.resultSet!.columns as IColumn[];
}
resultSetIterator.push(resultSet);
Expand All @@ -216,7 +218,7 @@ export function execute(this: QuerySession, opts: {
[iterator, resultSet] = resultSetTuple;
}

switch (opts.rowMode) {
switch (args.rowMode) {
case RowType.Ydb:
for (const row of partialResp.resultSet!.rows!) iterator.push(row);
break;
Expand Down
Loading
Loading