Skip to content

Commit

Permalink
Merge branch 'main' into topic
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Zorkaltsev authored Sep 30, 2024
2 parents 03fc7f9 + b09086f commit 924770a
Show file tree
Hide file tree
Showing 26 changed files with 197 additions and 27 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ jobs:

services:
ydb:
# image: ghcr.io/ydb-platform/local-ydb:nightly
image: ydbplatform/local-ydb:24.1
ports:
- 2135:2135
Expand Down
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,27 @@

All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.

## [5.4.0](https://github.com/ydb-platform/ydb-nodejs-sdk/compare/v5.3.5...v5.4.0) (2024-09-27)


### Features

* add idempotent option to tableClient session.executeQuery ([89df57e](https://github.com/ydb-platform/ydb-nodejs-sdk/commit/89df57eeb39a322af7dfbe84ce25c20549cce70b))

## [5.3.5](https://github.com/ydb-platform/ydb-nodejs-sdk/compare/v5.3.4...v5.3.5) (2024-09-26)


### Bug Fixes

* "Call cancelled" error ([ee14b9f](https://github.com/ydb-platform/ydb-nodejs-sdk/commit/ee14b9f386059a97b41ebe417fab905ea72b18ba))

## [5.3.4](https://github.com/ydb-platform/ydb-nodejs-sdk/compare/v5.3.3...v5.3.4) (2024-08-20)


### Bug Fixes

* initial version of topic service client ([2db4f42](https://github.com/ydb-platform/ydb-nodejs-sdk/commit/2db4f424f8da9f8751a7efe847eb8cad3a02633d))

## [5.3.3](https://github.com/ydb-platform/ydb-nodejs-sdk/compare/v5.3.2...v5.3.3) (2024-07-04)


Expand Down
3 changes: 1 addition & 2 deletions examples/basic-example-v2-with-query-service/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ async function selectWithParameters(driver: Driver, data: ThreeIds[], logger: Lo
DECLARE $seasonId AS Uint64;
DECLARE $episodeId AS Uint64;
SELECT title,|
air_date
SELECT title, air_date
FROM episodes
WHERE series_id = $seriesId
AND season_id = $seasonId
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ydb-sdk",
"version": "5.3.3",
"version": "5.4.0",
"description": "Node.js bindings for working with YDB API over gRPC",
"main": "build/cjs/src/index.js",
"module": "build/esm/src/index.js",
Expand Down
2 changes: 2 additions & 0 deletions src/__tests__/e2e/connection.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import {initDriver, destroyDriver} from "../../utils/test";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

describe('Connection', () => {
it('Test GRPC connection', async () => {
let driver = await initDriver({endpoint: process.env.YDB_ENDPOINT || 'grpc://localhost:2136'});
Expand Down
2 changes: 2 additions & 0 deletions src/__tests__/e2e/query-service/method-execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import ExecMode = Ydb.Query.ExecMode;
import {RetryParameters} from "../../../retries/retryParameters";
import {RetryStrategy} from "../../../retries/retryStrategy";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

const DATABASE = '/local';
const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';
const TABLE_NAME = 'test_table_1'
Expand Down
2 changes: 2 additions & 0 deletions src/__tests__/e2e/query-service/query-service-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import fs from "fs";
import {AUTO_TX} from "../../../table";
import {QuerySession, IExecuteResult} from "../../../query";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

const DATABASE = '/local';
const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';

Expand Down
2 changes: 2 additions & 0 deletions src/__tests__/e2e/query-service/rows-conversion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import {Context} from "../../../context";
import {RetryParameters} from "../../../retries/retryParameters";
import {RetryStrategy} from "../../../retries/retryStrategy";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

const DATABASE = '/local';
const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';
const TABLE_NAME = 'test_table_3'
Expand Down
2 changes: 2 additions & 0 deletions src/__tests__/e2e/query-service/transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {Context} from "../../../context";
import {RetryParameters} from "../../../retries/retryParameters";
import {RetryStrategy} from "../../../retries/retryStrategy";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

const DATABASE = '/local';
const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';

Expand Down
3 changes: 1 addition & 2 deletions src/__tests__/e2e/retries.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
import Driver from '../../driver';
import {
Aborted,
Expand Down Expand Up @@ -26,7 +25,7 @@ import {pessimizable} from "../../utils";
import {destroyDriver, initDriver} from "../../utils/test";
import {LogLevel, SimpleLogger} from "../../logger/simple-logger";

const MAX_RETRIES = 3;
if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

const logger = new SimpleLogger({level: LogLevel.error});
class ErrorThrower {
Expand Down
2 changes: 2 additions & 0 deletions src/__tests__/e2e/table-service/alter-table.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
} from "../../../table";
import {initDriver, destroyDriver} from "../../../utils/test";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

const getTableName = () => `table_alter_${Math.trunc(1000 * Math.random())}`;

describe('Alter table', () => {
Expand Down
2 changes: 2 additions & 0 deletions src/__tests__/e2e/table-service/bulk-upsert.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import Driver from '../../../driver';
import {TableSession} from "../../../table";
import {Row, initDriver, destroyDriver, createTable, fillTableWithData, TABLE} from "../../../utils/test";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

async function readTable(session: TableSession): Promise<Row[]> {
const rows: Row[] = [];

Expand Down
2 changes: 2 additions & 0 deletions src/__tests__/e2e/table-service/bytestring-identity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import {withRetries} from '../../../retries_obsoleted';
import {Column, TableSession, TableDescription} from "../../../table";
import {initDriver, destroyDriver, TABLE} from "../../../utils/test";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

async function createTable(session: TableSession) {
await session.dropTable(TABLE);
await session.createTable(
Expand Down
2 changes: 2 additions & 0 deletions src/__tests__/e2e/table-service/create-table.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {Ydb} from 'ydb-sdk-proto';
import {Column, DescribeTableSettings, TableDescription} from "../../../table";
import {initDriver, destroyDriver} from "../../../utils/test";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

const getTableName = () => `table_create_${Math.trunc(100000 * Math.random())}`;

describe('Create table', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {initDriver, destroyDriver} from "../../../utils/test";

const SHUTDOWN_URL = process.env.YDB_SHUTDOWN_URL || 'http://localhost:8765/actors/kqp_proxy?force_shutdown=all';

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

describe('Graceful session close', () => {

// TODO: Fix and enable test nce issue will be resolved https://github.com/ydb-platform/ydb/issues/2981
Expand Down
2 changes: 2 additions & 0 deletions src/__tests__/e2e/table-service/read-table.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import {TypedValues, TypedData} from '../../../types';
import {ReadTableSettings, TableSession} from "../../../table";
import {Row, initDriver, destroyDriver, createTable, fillTableWithData, TABLE} from "../../../utils/test";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

async function readTable(session: TableSession, settings: ReadTableSettings): Promise<TypedData[]> {
const rows: TypedData[] = [];

Expand Down
2 changes: 2 additions & 0 deletions src/__tests__/e2e/table-service/scan-query.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import {TypedData} from '../../../types';
import {TableSession} from "../../../table";
import {Row, initDriver, destroyDriver, createTable, fillTableWithData, TABLE} from "../../../utils/test";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

async function executeScanQuery(session: TableSession): Promise<TypedData[]> {
const query = `SELECT * FROM ${TABLE};`;

Expand Down
2 changes: 2 additions & 0 deletions src/__tests__/e2e/table-service/types.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {TypedData, TypedValues, Types} from '../../../types';
import NullValue = google.protobuf.NullValue;
import {initDriver, destroyDriver} from "../../../utils/test";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

describe('Types', () => {
let driver: Driver;

Expand Down
2 changes: 2 additions & 0 deletions src/__tests__/e2e/topic-service/internal.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import {Context} from "../../../context";
import {RetryParameters} from "../../../retries/retryParameters";
import {RetryStrategy} from "../../../retries/retryStrategy";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();

const DATABASE = '/local';
const ENDPOINT = process.env.YDB_ENDPOINT || 'grpc://localhost:2136';

Expand Down
95 changes: 93 additions & 2 deletions src/__tests__/e2e/topic-service/send-messages.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,100 @@
import {AnonymousAuthService, Driver as YDB} from '../../../index';
import {google, Ydb} from "ydb-sdk-proto";

if (process.env.TEST_ENVIRONMENT === 'dev') require('dotenv').config();
// import {AnonymousAuthService, Driver as YDB} from '../../../index';
// import {google, Ydb} from "ydb-sdk-proto";

// create topic

xdescribe('Topic: Send messages', () => {
let ydb: YDB | undefined;

beforeEach(async () => {
ydb = new YDB({
connectionString: 'grpc://localhost:2136/?database=local',
authService: new AnonymousAuthService(),
});
});

afterEach(async () => {
if (ydb) {
await ydb.destroy();
ydb = undefined;
}
});

it('General', async () => {
const topicClient = await ydb!.topic;

await topicClient.createTopic({
path: 'testTopic'
});

const writer = await topicClient.createWriter({
path: 'testTopic'
});

const res1 = await writer.sendMessages({
// tx:
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.alloc(10, '1234567890'),
uncompressedSize: '1234567890'.length,
seqNo: 1,
createdAt: google.protobuf.Timestamp.create({
seconds: 123 /* Math.trunc(Date.now() / 1000) */,
nanos: 456 /* Date.now() % 1000 */,
}),
messageGroupId: 'abc', // TODO: Check examples
partitionId: 1,
// metadataItems: // TODO: Should I use this?
}],
});

console.info('res1:', res1);

const res2 = await writer.sendMessages({
// tx:
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.alloc(10, '1234567890'),
uncompressedSize: '1234567890'.length,
seqNo: 1,
createdAt: google.protobuf.Timestamp.create({
seconds: 123 /*Date.now() / 1000*/,
nanos: 456 /*Date.now() % 1000*/,
}),
messageGroupId: 'abc', // TODO: Check examples
partitionId: 1,
// metadataItems: // TODO: Should I use this?
}],
});

console.info('res2:', res2);

const res3 = await writer.sendMessages({
// tx:
codec: Ydb.Topic.Codec.CODEC_RAW,
messages: [{
data: Buffer.alloc(10, '1234567890'),
uncompressedSize: '1234567890'.length,
seqNo: 1,
createdAt: google.protobuf.Timestamp.create({
seconds: 123 /*Date.now() / 1000*/,
nanos: 456 /*Date.now() % 1000*/,
}),
messageGroupId: 'abc', // TODO: Check examples
partitionId: 1,
// metadataItems: // TODO: Should I use this?
}],
});

console.info('res3:', res3);

// TODO: Send few messages

// TODO: Wait for ack

// TODO: Close before all messages are acked

// xdescribe('Topic: Send messages', () => {
// let ydb: YDB | undefined;
Expand Down
12 changes: 9 additions & 3 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export enum StatusCode {

UNAUTHENTICATED = CLIENT_STATUSES_FIRST + 30, // SDK local
SESSION_POOL_EMPTY = CLIENT_STATUSES_FIRST + 40, // SDK local
RETRIES_EXCEEDED = CLIENT_STATUSES_FIRST + 50, // SDK local
}

/**
Expand Down Expand Up @@ -314,15 +315,20 @@ export class ClientResourceExhausted extends TransportError {
public readonly [RetryPolicySymbol] = retryPolicy(Backoff.Slow, false, true, true);
}

export class ClientCancelled extends TransportError { // TODO: "Call cancelled" error appears also when connection string is wrong - would be right to avoid such dead lock retrying
static status = StatusCode.CLIENT_CANCELED;
public readonly [RetryPolicySymbol] = retryPolicy(Backoff.Fast, false, true, false);
}

const TRANSPORT_ERROR_CODES = new Map([
[GrpcStatus.CANCELLED, Cancelled],
[GrpcStatus.CANCELLED, ClientCancelled],
[GrpcStatus.UNAVAILABLE, TransportUnavailable],
[GrpcStatus.DEADLINE_EXCEEDED, ClientDeadlineExceeded],
[GrpcStatus.RESOURCE_EXHAUSTED, ClientResourceExhausted]
]);

export class ClientCancelled extends YdbError {
static status = StatusCode.CLIENT_CANCELED;
export class RetriesExceeded extends YdbError {
static status = StatusCode.RETRIES_EXCEEDED;
public readonly [RetryPolicySymbol] = retryPolicy(Backoff.No, false, false, false);

constructor(public readonly cause: Error) {
Expand Down
4 changes: 2 additions & 2 deletions src/retries/retryStrategy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Backoff, ClientCancelled, SpecificErrorRetryPolicy} from "../errors";
import {Backoff, RetriesExceeded, SpecificErrorRetryPolicy} from "../errors";
import {HasLogger} from "../logger/has-logger";
import {Logger} from "../logger/simple-logger";
import {RetryParameters} from "./retryParameters";
Expand Down Expand Up @@ -45,7 +45,7 @@ export class RetryStrategy implements HasLogger {
while (true) {
if (maxRetries !== 0 && attemptsCounter >= maxRetries) { // to support the old logic for a while
this.logger.debug(tooManyAttempts, attemptsCounter);
throw new ClientCancelled(new Error(`Too many attempts: ${attemptsCounter}`));
throw new RetriesExceeded(new Error(`Too many attempts: ${attemptsCounter}`));
}
let r: RetryLambdaResult<T>;
try {
Expand Down
4 changes: 2 additions & 2 deletions src/retries_obsoleted.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import {YdbError, TransportError} from './errors';
import * as errors from './errors';
import * as utils from "./utils";
import {Logger} from "./logger/simple-logger";
// import {getDefaultLogger} from "./logger/get-default-logger";

export class BackoffSettings {
/**
Expand Down Expand Up @@ -56,10 +55,11 @@ const RETRYABLE_ERRORS_FAST = [
errors.NotFound,
errors.TransportUnavailable,
errors.ClientDeadlineExceeded,
errors.ClientCancelled,
];
const RETRYABLE_ERRORS_SLOW = [errors.Overloaded, errors.ClientResourceExhausted];

class RetryStrategy {
export class RetryStrategy {
// private logger: Logger;
constructor(
public methodName = 'UnknownClass::UnknownMethod',
Expand Down
Loading

0 comments on commit 924770a

Please sign in to comment.