From 4c24c2685e6012d981467074d27ec7cb8f67a30d Mon Sep 17 00:00:00 2001 From: Vasily Gerasimov Date: Tue, 1 Oct 2024 10:05:18 +0000 Subject: [PATCH 01/16] BulkUpsert in Table Client --- internal/table/client.go | 66 +++++- internal/table/session.go | 58 ++--- table/options/options.go | 28 +++ table/table.go | 229 ++++++++++++++++++-- tests/integration/helpers_test.go | 4 +- tests/integration/table_bulk_upsert_test.go | 141 ++++++++++++ trace/table.go | 5 +- trace/table_gtrace.go | 28 +++ 8 files changed, 488 insertions(+), 71 deletions(-) diff --git a/internal/table/client.go b/internal/table/client.go index c14386497..dc7262422 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -4,8 +4,8 @@ import ( "context" "github.com/jonboulle/clockwork" - "google.golang.org/grpc" - + "github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table" "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" @@ -14,6 +14,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/retry" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/trace" + "google.golang.org/grpc" ) // sessionBuilder is the interface that holds logic of creating sessions. @@ -25,9 +26,10 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config ) return &Client{ - clock: config.Clock(), - config: config, - cc: cc, + clock: config.Clock(), + config: config, + service: Ydb_Table_V1.NewTableServiceClient(cc), + cc: cc, build: func(ctx context.Context) (s *session, err error) { return newSession(ctx, cc, config) }, @@ -85,12 +87,13 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config // A Client is safe for use by multiple goroutines simultaneously. type Client struct { // read-only fields - config *config.Config - build sessionBuilder - cc grpc.ClientConnInterface - clock clockwork.Clock - pool sessionPool - done chan struct{} + config *config.Config + service Ydb_Table_V1.TableServiceClient + build sessionBuilder + cc grpc.ClientConnInterface + clock clockwork.Clock + pool sessionPool + done chan struct{} } func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ table.ClosableSession, err error) { @@ -257,6 +260,47 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O }, config.RetryOptions...) } +func (c *Client) BulkUpsert( + ctx context.Context, + tableName string, + data table.BulkUpsertData, + opts ...table.Option, +) (finalErr error) { + if c == nil { + return xerrors.WithStackTrace(errNilClient) + } + + if c.isClosed() { + return xerrors.WithStackTrace(errClosedClient) + } + + config := c.retryOptions(opts...) + + attempts, onDone := 0, trace.TableOnBulkUpsert(config.Trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Client).BulkUpsert"), + config.Label, + ) + defer func() { + onDone(attempts, finalErr) + }() + + return retryBackoff(ctx, c.pool, func(ctx context.Context, s table.Session) (err error) { + attempts++ + + req := Ydb_Table.BulkUpsertRequest{ + Table: tableName, + } + err = data.ApplyBulkUpsertRequest((*table.BulkUpsertRequest)(&req)) + if err != nil { + return err + } + + _, err = c.service.BulkUpsert(ctx, &req, config.CallOptions...) + + return err + }, config.RetryOptions...) +} + func executeTxOperation(ctx context.Context, c *Client, op table.TxOperation, tx table.Transaction) (err error) { if panicCallback := c.config.PanicCallback(); panicCallback != nil { defer func() { diff --git a/internal/table/session.go b/internal/table/session.go index cd87683d1..4b58bada7 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -48,6 +48,7 @@ import ( type session struct { onClose []func(s *session) id string + cc grpc.ClientConnInterface tableService Ydb_Table_V1.TableServiceClient status table.SessionStatus config *config.Config @@ -150,6 +151,7 @@ func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config id: result.GetSessionId(), config: config, status: table.SessionReady, + cc: cc, } s.lastUsage.Store(time.Now().Unix()) @@ -1251,48 +1253,32 @@ func (s *session) StreamExecuteScanQuery( ) } +type bulkUpsertOptionToTableOption struct { + opt []options.BulkUpsertOption +} + +func (opt bulkUpsertOptionToTableOption) ApplyTableOption(tableOpts *options.TableOptions) { + var callOpts []grpc.CallOption + for _, o := range opt.opt { + callOpts = append(callOpts, o.ApplyBulkUpsertOption()...) + } + tableOpts.CallOptions = append(tableOpts.CallOptions, callOpts...) +} + // BulkUpsert uploads given list of ydb struct values to the table. -func (s *session) BulkUpsert(ctx context.Context, table string, rows value.Value, +func (s *session) BulkUpsert(ctx context.Context, tableName string, rows value.Value, opts ...options.BulkUpsertOption, ) (err error) { - var ( - a = allocator.New() - callOptions []grpc.CallOption - onDone = trace.TableOnSessionBulkUpsert( - s.config.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).BulkUpsert"), - s, - ) - ) - defer func() { - defer a.Free() - onDone(err) - }() - - for _, opt := range opts { - if opt != nil { - callOptions = append(callOptions, opt.ApplyBulkUpsertOption()...) - } - } + client := New(ctx, s.cc, config.New()) - _, err = s.tableService.BulkUpsert(ctx, - &Ydb_Table.BulkUpsertRequest{ - Table: table, - Rows: value.ToYDB(rows, a), - OperationParams: operation.Params( - ctx, - s.config.OperationTimeout(), - s.config.OperationCancelAfter(), - operation.ModeSync, - ), + return client.BulkUpsert( + ctx, + tableName, + table.BulkUpsertRows{ + Rows: rows, }, - callOptions..., + bulkUpsertOptionToTableOption{opts}, ) - if err != nil { - return xerrors.WithStackTrace(err) - } - - return nil } // BeginTransaction begins new transaction within given session with given settings. diff --git a/table/options/options.go b/table/options/options.go index 922333a17..83b8e90f3 100644 --- a/table/options/options.go +++ b/table/options/options.go @@ -8,6 +8,8 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/types" "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" + "github.com/ydb-platform/ydb-go-sdk/v3/retry" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) func WithShardKeyBounds() DescribeTableOption { @@ -869,6 +871,28 @@ func WithKeepInCache(keepInCache bool) ExecuteDataQueryOption { ) } +type TableOptions struct { + Label string + Idempotent bool + TxSettings *TransactionSettings + TxCommitOptions []CommitTransactionOption + RetryOptions []retry.Option + Trace *trace.Table + CallOptions []grpc.CallOption +} + +type TransactionSettings struct { + settings Ydb_Table.TransactionSettings +} + +func (t *TransactionSettings) Settings() *Ydb_Table.TransactionSettings { + if t == nil { + return nil + } + + return &t.settings +} + type withCallOptions []grpc.CallOption func (opts withCallOptions) ApplyExecuteScanQueryOption(d *ExecuteScanQueryDesc) []grpc.CallOption { @@ -885,6 +909,10 @@ func (opts withCallOptions) ApplyExecuteDataQueryOption( return opts } +func (opts withCallOptions) ApplyTableOption(tableOpts *TableOptions) { + tableOpts.CallOptions = append(tableOpts.CallOptions, opts...) +} + // WithCallOptions appends flag of commit transaction with executing query func WithCallOptions(opts ...grpc.CallOption) withCallOptions { return opts diff --git a/table/table.go b/table/table.go index 1ef386664..a02a954a3 100644 --- a/table/table.go +++ b/table/table.go @@ -4,8 +4,10 @@ import ( "context" "time" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Formats" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" "github.com/ydb-platform/ydb-go-sdk/v3/internal/params" "github.com/ydb-platform/ydb-go-sdk/v3/internal/tx" @@ -68,6 +70,12 @@ type Client interface { // If op TxOperation return non nil - transaction will be rollback // Warning: if context without deadline or cancellation func than DoTx can run indefinitely DoTx(ctx context.Context, op TxOperation, opts ...Option) error + + // BulkUpsert upserts a batch of rows non-transactionally. + // + // Returns success only when all rows were successfully upserted. In case of an error some rows might + // be upserted and some might not. + BulkUpsert(ctx context.Context, table string, data BulkUpsertData, opts ...Option) error } type SessionStatus = string @@ -179,6 +187,7 @@ type Session interface { opts ...options.ExecuteScanQueryOption, ) (_ result.StreamResult, err error) + // Deprecated: use Client instance instead. BulkUpsert( ctx context.Context, table string, @@ -203,17 +212,7 @@ type Session interface { ) error } -type TransactionSettings struct { - settings Ydb_Table.TransactionSettings -} - -func (t *TransactionSettings) Settings() *Ydb_Table.TransactionSettings { - if t == nil { - return nil - } - - return &t.settings -} +type TransactionSettings = options.TransactionSettings // Explanation is a result of Explain calls. type Explanation struct { @@ -306,7 +305,7 @@ func TxSettings(opts ...TxOption) *TransactionSettings { s := new(TransactionSettings) for _, opt := range opts { if opt != nil { - opt((*txDesc)(&s.settings)) + opt((*txDesc)(s.Settings())) } } @@ -318,7 +317,7 @@ func BeginTx(opts ...TxOption) TxControlOption { return func(d *txControlDesc) { s := TxSettings(opts...) d.TxSelector = &Ydb_Table.TransactionControl_BeginTx{ - BeginTx: &s.settings, + BeginTx: s.Settings(), } } } @@ -489,14 +488,7 @@ func ValueParam(name string, v value.Value) ParameterOption { return params.Named(name, v) } -type Options struct { - Label string - Idempotent bool - TxSettings *TransactionSettings - TxCommitOptions []options.CommitTransactionOption - RetryOptions []retry.Option - Trace *trace.Table -} +type Options = options.TableOptions type Option interface { ApplyTableOption(opts *Options) @@ -578,3 +570,198 @@ func (opt traceOption) ApplyTableOption(opts *Options) { func WithTrace(t trace.Table) traceOption { //nolint:gocritic return traceOption{t: &t} } + +type ( + BulkUpsertRequest Ydb_Table.BulkUpsertRequest +) + +type BulkUpsertData interface { + ApplyBulkUpsertRequest(req *BulkUpsertRequest) error +} + +type BulkUpsertRows struct { + Rows value.Value +} + +func (data BulkUpsertRows) ApplyBulkUpsertRequest(req *BulkUpsertRequest) error { + a := allocator.New() + + defer func() { + defer a.Free() + }() + + req.Rows = value.ToYDB(data.Rows, a) + + return nil +} + +type BulkUpsertCsv struct { + Data []byte + Options []CsvFormatOption +} + +type CsvFormatOption interface { + ApplyCsvFormatOption(req *BulkUpsertRequest) (err error) +} + +func (data BulkUpsertCsv) ApplyBulkUpsertRequest(req *BulkUpsertRequest) error { + req.Data = data.Data + + var err error + for _, opt := range data.Options { + if opt != nil { + err = opt.ApplyCsvFormatOption(req) + if err != nil { + return err + } + } + } + + return err +} + +func NewBulkUpsertCsv(data []byte, opts ...CsvFormatOption) BulkUpsertCsv { + return BulkUpsertCsv{ + Data: data, + Options: opts, + } +} + +func ensureCsvDataFormatSettings(req *BulkUpsertRequest) (format *Ydb_Formats.CsvSettings) { + if settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_CsvSettings); ok { + if settings.CsvSettings == nil { + settings.CsvSettings = &Ydb_Formats.CsvSettings{} + } + + return settings.CsvSettings + } + + req.DataFormat = &Ydb_Table.BulkUpsertRequest_CsvSettings{ + CsvSettings: &Ydb_Formats.CsvSettings{}, + } + + settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_CsvSettings) + if !ok { + return nil + } + + return settings.CsvSettings +} + +type csvHeaderOption struct{} + +func (opt *csvHeaderOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error { + ensureCsvDataFormatSettings(req).Header = true + + return nil +} + +// First not skipped line is a CSV header (list of column names). +func WithCsvHeader() CsvFormatOption { + return &csvHeaderOption{} +} + +type csvNullValueOption struct { + Value []byte +} + +func (opt *csvNullValueOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error { + ensureCsvDataFormatSettings(req).NullValue = opt.Value + + return nil +} + +// String value that would be interpreted as NULL. +func WithCsvNullValue(value []byte) CsvFormatOption { + return &csvNullValueOption{value} +} + +type csvDelimiterOption struct { + Value []byte +} + +func (opt *csvDelimiterOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error { + ensureCsvDataFormatSettings(req).Delimiter = opt.Value + + return nil +} + +// Fields delimiter in CSV file. It's "," if not set. +func WithCsvDelimiter(value []byte) CsvFormatOption { + return &csvDelimiterOption{value} +} + +type csvSkipRowsOption struct { + Count uint32 +} + +func (opt *csvSkipRowsOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error { + ensureCsvDataFormatSettings(req).SkipRows = opt.Count + + return nil +} + +// Number of rows to skip before CSV data. It should be present only in the first upsert of CSV file. +func WithCsvSkipRows(count uint32) CsvFormatOption { + return &csvSkipRowsOption{count} +} + +type BulkUpsertArrow struct { + Data []byte + Options []ArrowFormatOption +} + +type ArrowFormatOption interface { + ApplyArrowFormatOption(req *BulkUpsertRequest) (err error) +} + +func (data BulkUpsertArrow) ApplyBulkUpsertRequest(req *BulkUpsertRequest) error { + req.Data = data.Data + + var err error + for _, opt := range data.Options { + if opt != nil { + err = opt.ApplyArrowFormatOption(req) + if err != nil { + return err + } + } + } + + return err +} + +func ensureArrowDataFormatSettings(req *BulkUpsertRequest) (format *Ydb_Formats.ArrowBatchSettings) { + if settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_ArrowBatchSettings); ok { + if settings.ArrowBatchSettings == nil { + settings.ArrowBatchSettings = &Ydb_Formats.ArrowBatchSettings{} + } + + return settings.ArrowBatchSettings + } + + req.DataFormat = &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{ + ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{}, + } + + settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_ArrowBatchSettings) + if !ok { + return nil + } + + return settings.ArrowBatchSettings +} + +type arrowSchemaOption struct { + Schema []byte +} + +func (opt *arrowSchemaOption) ApplyArrowFormatOption(req *BulkUpsertRequest) error { + ensureArrowDataFormatSettings(req).Schema = opt.Schema + + return nil +} + +func WithArrowSchema(schema []byte) ArrowFormatOption { + return &arrowSchemaOption{schema} +} diff --git a/tests/integration/helpers_test.go b/tests/integration/helpers_test.go index c64f329f2..91f1c892d 100644 --- a/tests/integration/helpers_test.go +++ b/tests/integration/helpers_test.go @@ -180,7 +180,7 @@ func (scope *scopeT) Folder() string { scope.Require.NoError(sugar.RemoveRecursive(scope.Ctx, driver, folderPath)) } } - scope.Logf("Createing folder done: %v", folderPath) + scope.Logf("Creating folder done: %v", folderPath) return fixenv.NewGenericResultWithCleanup(folderPath, clean), nil } return fixenv.CacheResult(scope.Env, f) @@ -310,7 +310,7 @@ func (scope *scopeT) TableName(opts ...func(t *tableNameParams)) string { createTableQueryTemplate: ` PRAGMA TablePathPrefix("{{.TablePathPrefix}}"); CREATE TABLE {{.TableName}} ( - id Int64 NOT NULL, + id Int64 NOT NULL, val Text, PRIMARY KEY (id) ) diff --git a/tests/integration/table_bulk_upsert_test.go b/tests/integration/table_bulk_upsert_test.go index 33523cb93..45ae60a24 100644 --- a/tests/integration/table_bulk_upsert_test.go +++ b/tests/integration/table_bulk_upsert_test.go @@ -6,9 +6,13 @@ package integration import ( "context" "fmt" + "os" "testing" + "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" ) @@ -33,3 +37,140 @@ func TestTableBulkUpsert(t *testing.T) { }) scope.Require.NoError(err) } + +func assertIdValueImpl(ctx context.Context, t *testing.T, tableName string, id int64, val *string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := ydb.Open(ctx, + os.Getenv("YDB_CONNECTION_STRING"), + // ydb.WithAccessTokenCredentials(os.Getenv("YDB_ACCESS_TOKEN_CREDENTIALS")), + ) + require.NoError(t, err) + err = db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) (err error) { + res, err := tx.Execute(ctx, fmt.Sprintf("SELECT val FROM `%s` WHERE id = %d", tableName, id), nil) + if err != nil { + return err + } + err = res.NextResultSetErr(ctx) + if err != nil { + return err + } + require.EqualValues(t, 1, res.ResultSetCount()) + if !res.NextRow() { + if err = res.Err(); err != nil { + return err + } + return fmt.Errorf("unexpected empty result set") + } + var resultVal *string + err = res.ScanNamed( + named.Optional("val", &resultVal), + ) + if err != nil { + return err + } + if val != nil { + require.NotEmpty(t, resultVal) + require.EqualValues(t, *val, *resultVal) + } else { + require.Nil(t, resultVal) + } + + return res.Err() + }, table.WithTxSettings(table.TxSettings(table.WithSnapshotReadOnly())), table.WithIdempotent()) + require.NoError(t, err) +} + +func assertIdValue(ctx context.Context, t *testing.T, tableName string, id int64, val string) { + assertIdValueImpl(ctx, t, tableName, id, &val) +} + +func assertIdValueNil(ctx context.Context, t *testing.T, tableName string, id int64) { + assertIdValueImpl(ctx, t, tableName, id, nil) +} + +func TestTableCsvBulkUpsert(t *testing.T) { + scope := newScope(t) + driver := scope.Driver() + tablePath := scope.TablePath() + + csv := `id,val +42,"text42" +43,"text43"` + + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.NewBulkUpsertCsv( + []byte(csv), + table.WithCsvHeader(), + )) + scope.Require.NoError(err) + + assertIdValue(scope.Ctx, t, tablePath, 42, "text42") + assertIdValue(scope.Ctx, t, tablePath, 43, "text43") +} + +func TestTableCsvBulkUpsertDelimiter(t *testing.T) { + scope := newScope(t) + driver := scope.Driver() + tablePath := scope.TablePath() + + csv := `id:val +42:"text42" +43:"text43"` + + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.NewBulkUpsertCsv( + []byte(csv), + table.WithCsvHeader(), + table.WithCsvDelimiter([]byte(":")), + )) + scope.Require.NoError(err) + + assertIdValue(scope.Ctx, t, tablePath, 42, "text42") + assertIdValue(scope.Ctx, t, tablePath, 43, "text43") +} + +func TestTableCsvBulkUpsertNullValue(t *testing.T) { + scope := newScope(t) + driver := scope.Driver() + tablePath := scope.TablePath() + + csv := `id,val +42,hello +43,hello world` + + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.NewBulkUpsertCsv( + []byte(csv), + table.WithCsvHeader(), + table.WithCsvNullValue([]byte("hello")), + )) + scope.Require.NoError(err) + + assertIdValueNil(scope.Ctx, t, tablePath, 42) + assertIdValue(scope.Ctx, t, tablePath, 43, "hello world") +} + +func TestTableCsvBulkUpsertSkipRows(t *testing.T) { + scope := newScope(t) + driver := scope.Driver() + tablePath := scope.TablePath() + + // Empty row are OK after skipped rows + csv := `First skip row + Second skip row + +id,val +42,123 +43,456 + +` + + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.NewBulkUpsertCsv( + []byte(csv), + table.WithCsvHeader(), + table.WithCsvSkipRows(2), + )) + scope.Require.NoError(err) + + assertIdValue(scope.Ctx, t, tablePath, 42, "123") + assertIdValue(scope.Ctx, t, tablePath, 43, "456") +} diff --git a/trace/table.go b/trace/table.go index 5589327b1..067c9f4be 100644 --- a/trace/table.go +++ b/trace/table.go @@ -23,6 +23,8 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnDoTx func(TableDoTxStartInfo) func(TableDoTxDoneInfo) // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + OnBulkUpsert func(TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnCreateSession func(TableCreateSessionStartInfo) func(TableCreateSessionDoneInfo) // Session events // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals @@ -146,7 +148,8 @@ type ( } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TableBulkUpsertDoneInfo struct { - Error error + Error error + Attempts int } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TableSessionDeleteStartInfo struct { diff --git a/trace/table_gtrace.go b/trace/table_gtrace.go index c548157b1..38eb5e004 100644 --- a/trace/table_gtrace.go +++ b/trace/table_gtrace.go @@ -957,6 +957,21 @@ func (t *Table) onDoTx(t1 TableDoTxStartInfo) func(TableDoTxDoneInfo) { } return res } +func (t *Table) onBulkUpsert(t1 TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) { + fn := t.OnBulkUpsert + if fn == nil { + return func(TableBulkUpsertDoneInfo) { + return + } + } + res := fn(t1) + if res == nil { + return func(TableBulkUpsertDoneInfo) { + return + } + } + return res +} func (t *Table) onCreateSession(t1 TableCreateSessionStartInfo) func(TableCreateSessionDoneInfo) { fn := t.OnCreateSession if fn == nil { @@ -1320,6 +1335,19 @@ func TableOnDoTx(t *Table, c *context.Context, call call, label string, idempote } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals +func TableOnBulkUpsert(t *Table, c *context.Context, call call, label string) func(attempts int, _ error) { + var p TableBulkUpsertStartInfo + p.Context = c + p.Call = call + res := t.onBulkUpsert(p) + return func(attempts int, e error) { + var p TableBulkUpsertDoneInfo + p.Attempts = attempts + p.Error = e + res(p) + } +} +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TableOnCreateSession(t *Table, c *context.Context, call call) func(session sessionInfo, attempts int, _ error) { var p TableCreateSessionStartInfo p.Context = c From 5a82d2f517425006eae2f8aab0b75196a5883cac Mon Sep 17 00:00:00 2001 From: Vasily Gerasimov Date: Tue, 1 Oct 2024 16:37:51 +0000 Subject: [PATCH 02/16] Arrow test --- table/table.go | 7 ++++++ tests/integration/table_bulk_upsert_test.go | 22 ++++++++++++++++++ .../testdata/bulk_upsert_test_data.arrow | Bin 0 -> 256 bytes .../testdata/bulk_upsert_test_schema.arrow | Bin 0 -> 176 bytes tests/integration/testdata/make_test_arrow.py | 17 ++++++++++++++ 5 files changed, 46 insertions(+) create mode 100644 tests/integration/testdata/bulk_upsert_test_data.arrow create mode 100644 tests/integration/testdata/bulk_upsert_test_schema.arrow create mode 100755 tests/integration/testdata/make_test_arrow.py diff --git a/table/table.go b/table/table.go index a02a954a3..78885ee72 100644 --- a/table/table.go +++ b/table/table.go @@ -731,6 +731,13 @@ func (data BulkUpsertArrow) ApplyBulkUpsertRequest(req *BulkUpsertRequest) error return err } +func NewBulkUpsertArrow(data []byte, opts ...ArrowFormatOption) BulkUpsertArrow { + return BulkUpsertArrow{ + Data: data, + Options: opts, + } +} + func ensureArrowDataFormatSettings(req *BulkUpsertRequest) (format *Ydb_Formats.ArrowBatchSettings) { if settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_ArrowBatchSettings); ok { if settings.ArrowBatchSettings == nil { diff --git a/tests/integration/table_bulk_upsert_test.go b/tests/integration/table_bulk_upsert_test.go index 45ae60a24..c8f916a72 100644 --- a/tests/integration/table_bulk_upsert_test.go +++ b/tests/integration/table_bulk_upsert_test.go @@ -174,3 +174,25 @@ id,val assertIdValue(scope.Ctx, t, tablePath, 42, "123") assertIdValue(scope.Ctx, t, tablePath, 43, "456") } + +func TestTableArrowBulkUpsert(t *testing.T) { + scope := newScope(t) + driver := scope.Driver() + tablePath := scope.TablePath() + + // data & schema generated with make_test_arrow.py script + data, err := os.ReadFile("testdata/bulk_upsert_test_data.arrow") + scope.Require.NoError(err) + + schema, err := os.ReadFile("testdata/bulk_upsert_test_schema.arrow") + scope.Require.NoError(err) + + err = driver.Table().BulkUpsert(scope.Ctx, tablePath, table.NewBulkUpsertArrow( + []byte(data), + table.WithArrowSchema(schema), + )) + scope.Require.NoError(err) + + assertIdValue(scope.Ctx, t, tablePath, 123, "data1") + assertIdValue(scope.Ctx, t, tablePath, 234, "data2") +} diff --git a/tests/integration/testdata/bulk_upsert_test_data.arrow b/tests/integration/testdata/bulk_upsert_test_data.arrow new file mode 100644 index 0000000000000000000000000000000000000000..c1cba98bc756431a708d1b5f6e71254684bfb173 GIT binary patch literal 256 zcmezW|Ns9J3=9k+Knwyr3}Osy46F3fSH9s0!SHv*kHf~Ig2aNv%$Yb6w<*t6Q#q-vILZ42u8& literal 0 HcmV?d00001 diff --git a/tests/integration/testdata/make_test_arrow.py b/tests/integration/testdata/make_test_arrow.py new file mode 100755 index 000000000..e4b07f802 --- /dev/null +++ b/tests/integration/testdata/make_test_arrow.py @@ -0,0 +1,17 @@ +#! /usr/bin/env python3 +import pyarrow + +schema = pyarrow.schema([("id", pyarrow.int64()), ("val", pyarrow.utf8())]) + +with open("bulk_upsert_test_schema.arrow", "bw") as schema_file: + schema_file.write(schema.serialize().to_pybytes()) + +data = [ + pyarrow.array([123, 234]), + pyarrow.array(["data1", "data2"]), +] + +batch = pyarrow.record_batch(data, schema=schema) + +with open("bulk_upsert_test_data.arrow", "bw") as data_file: + data_file.write(batch.serialize()) From 103a56535af94dd81e2e7a96c7b51584073f52fe Mon Sep 17 00:00:00 2001 From: Vasily Gerasimov Date: Tue, 1 Oct 2024 17:09:33 +0000 Subject: [PATCH 03/16] Fix import order --- internal/table/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/table/client.go b/internal/table/client.go index dc7262422..880ee584c 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -6,6 +6,8 @@ import ( "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table" + "google.golang.org/grpc" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" @@ -14,7 +16,6 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/retry" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/trace" - "google.golang.org/grpc" ) // sessionBuilder is the interface that holds logic of creating sessions. From 1d60745cc6f9430410ebee1782f2e574f2e0c7d8 Mon Sep 17 00:00:00 2001 From: Vasily Gerasimov Date: Tue, 1 Oct 2024 17:16:22 +0000 Subject: [PATCH 04/16] Changelog --- CHANGELOG.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f4a812935..5f009e3ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Support bulk upsert from scv, arrow and ydb internal formats in table client + ## v3.81.4 * Returned `topicwriter.ErrQueueLimitExceed`, accidental removed at `v3.81.0` @@ -8,7 +10,7 @@ * Removed `experimantal` comment for query service client ## v3.81.1 -* Fixed nil pointer dereference panic on failed `ydb.Open` +* Fixed nil pointer dereference panic on failed `ydb.Open` * Added ip discovery. Server can show own ip address and target hostname in the ListEndpoint message. These fields are used to bypass DNS resolving. ## v3.81.0 @@ -16,7 +18,7 @@ * Added write to topics within transactions ## v3.80.10 -* Added `ydb.WithSessionPoolSessionUsageLimit()` option for limitation max count of session usage +* Added `ydb.WithSessionPoolSessionUsageLimit()` option for limitation max count of session usage * Refactored experimental topic iterators in `topicsugar` package ## v3.80.9 From f0488e8f2e20272fc82082949ccb2dddcd8421dd Mon Sep 17 00:00:00 2001 From: Vasily Gerasimov Date: Tue, 1 Oct 2024 17:19:23 +0000 Subject: [PATCH 05/16] Style --- tests/integration/table_bulk_upsert_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/table_bulk_upsert_test.go b/tests/integration/table_bulk_upsert_test.go index c8f916a72..0991f7b21 100644 --- a/tests/integration/table_bulk_upsert_test.go +++ b/tests/integration/table_bulk_upsert_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" From 1a67a9e8ec42b09b7a21562f5b2c653204b0179c Mon Sep 17 00:00:00 2001 From: Vasily Gerasimov Date: Wed, 2 Oct 2024 14:40:14 +0000 Subject: [PATCH 06/16] Fix trace --- internal/table/client.go | 10 +++-- table/table.go | 14 ++---- tests/integration/table_bulk_upsert_test.go | 48 +++++++++++++-------- trace/table.go | 1 - trace/table_gtrace.go | 47 +++++++++++++++++--- 5 files changed, 82 insertions(+), 38 deletions(-) diff --git a/internal/table/client.go b/internal/table/client.go index 880ee584c..2238b7720 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -8,6 +8,7 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table" "google.golang.org/grpc" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" @@ -275,14 +276,17 @@ func (c *Client) BulkUpsert( return xerrors.WithStackTrace(errClosedClient) } + a := allocator.New() + defer a.Free() + config := c.retryOptions(opts...) + config.RetryOptions = append(config.RetryOptions, retry.WithIdempotent(true)) attempts, onDone := 0, trace.TableOnBulkUpsert(config.Trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Client).BulkUpsert"), - config.Label, ) defer func() { - onDone(attempts, finalErr) + onDone(finalErr, attempts) }() return retryBackoff(ctx, c.pool, func(ctx context.Context, s table.Session) (err error) { @@ -291,7 +295,7 @@ func (c *Client) BulkUpsert( req := Ydb_Table.BulkUpsertRequest{ Table: tableName, } - err = data.ApplyBulkUpsertRequest((*table.BulkUpsertRequest)(&req)) + err = data.ApplyBulkUpsertRequest(a, (*table.BulkUpsertRequest)(&req)) if err != nil { return err } diff --git a/table/table.go b/table/table.go index 78885ee72..c93b40edd 100644 --- a/table/table.go +++ b/table/table.go @@ -576,20 +576,14 @@ type ( ) type BulkUpsertData interface { - ApplyBulkUpsertRequest(req *BulkUpsertRequest) error + ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error } type BulkUpsertRows struct { Rows value.Value } -func (data BulkUpsertRows) ApplyBulkUpsertRequest(req *BulkUpsertRequest) error { - a := allocator.New() - - defer func() { - defer a.Free() - }() - +func (data BulkUpsertRows) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { req.Rows = value.ToYDB(data.Rows, a) return nil @@ -604,7 +598,7 @@ type CsvFormatOption interface { ApplyCsvFormatOption(req *BulkUpsertRequest) (err error) } -func (data BulkUpsertCsv) ApplyBulkUpsertRequest(req *BulkUpsertRequest) error { +func (data BulkUpsertCsv) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { req.Data = data.Data var err error @@ -715,7 +709,7 @@ type ArrowFormatOption interface { ApplyArrowFormatOption(req *BulkUpsertRequest) (err error) } -func (data BulkUpsertArrow) ApplyBulkUpsertRequest(req *BulkUpsertRequest) error { +func (data BulkUpsertArrow) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { req.Data = data.Data var err error diff --git a/tests/integration/table_bulk_upsert_test.go b/tests/integration/table_bulk_upsert_test.go index 0991f7b21..82465b31a 100644 --- a/tests/integration/table_bulk_upsert_test.go +++ b/tests/integration/table_bulk_upsert_test.go @@ -18,9 +18,11 @@ import ( ) func TestTableBulkUpsert(t *testing.T) { - scope := newScope(t) - driver := scope.Driver() - tablePath := scope.TablePath() + var ( + scope = newScope(t) + driver = scope.Driver() + tablePath = scope.TablePath() + ) // upsert var rows []types.Value @@ -92,9 +94,11 @@ func assertIdValueNil(ctx context.Context, t *testing.T, tableName string, id in } func TestTableCsvBulkUpsert(t *testing.T) { - scope := newScope(t) - driver := scope.Driver() - tablePath := scope.TablePath() + var ( + scope = newScope(t) + driver = scope.Driver() + tablePath = scope.TablePath() + ) csv := `id,val 42,"text42" @@ -111,9 +115,11 @@ func TestTableCsvBulkUpsert(t *testing.T) { } func TestTableCsvBulkUpsertDelimiter(t *testing.T) { - scope := newScope(t) - driver := scope.Driver() - tablePath := scope.TablePath() + var ( + scope = newScope(t) + driver = scope.Driver() + tablePath = scope.TablePath() + ) csv := `id:val 42:"text42" @@ -131,9 +137,11 @@ func TestTableCsvBulkUpsertDelimiter(t *testing.T) { } func TestTableCsvBulkUpsertNullValue(t *testing.T) { - scope := newScope(t) - driver := scope.Driver() - tablePath := scope.TablePath() + var ( + scope = newScope(t) + driver = scope.Driver() + tablePath = scope.TablePath() + ) csv := `id,val 42,hello @@ -151,9 +159,11 @@ func TestTableCsvBulkUpsertNullValue(t *testing.T) { } func TestTableCsvBulkUpsertSkipRows(t *testing.T) { - scope := newScope(t) - driver := scope.Driver() - tablePath := scope.TablePath() + var ( + scope = newScope(t) + driver = scope.Driver() + tablePath = scope.TablePath() + ) // Empty row are OK after skipped rows csv := `First skip row @@ -177,9 +187,11 @@ id,val } func TestTableArrowBulkUpsert(t *testing.T) { - scope := newScope(t) - driver := scope.Driver() - tablePath := scope.TablePath() + var ( + scope = newScope(t) + driver = scope.Driver() + tablePath = scope.TablePath() + ) // data & schema generated with make_test_arrow.py script data, err := os.ReadFile("testdata/bulk_upsert_test_data.arrow") diff --git a/trace/table.go b/trace/table.go index 067c9f4be..3e378d3ba 100644 --- a/trace/table.go +++ b/trace/table.go @@ -144,7 +144,6 @@ type ( // Safe replacement of context are provided only inside callback function Context *context.Context Call call - Session sessionInfo } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TableBulkUpsertDoneInfo struct { diff --git a/trace/table_gtrace.go b/trace/table_gtrace.go index 38eb5e004..ee982df07 100644 --- a/trace/table_gtrace.go +++ b/trace/table_gtrace.go @@ -173,6 +173,41 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { } } } + { + h1 := t.OnBulkUpsert + h2 := x.OnBulkUpsert + ret.OnBulkUpsert = func(t TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(TableBulkUpsertDoneInfo) + if h1 != nil { + r = h1(t) + } + if h2 != nil { + r1 = h2(t) + } + return func(t TableBulkUpsertDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(t) + } + if r1 != nil { + r1(t) + } + } + } + } { h1 := t.OnCreateSession h2 := x.OnCreateSession @@ -1335,15 +1370,15 @@ func TableOnDoTx(t *Table, c *context.Context, call call, label string, idempote } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TableOnBulkUpsert(t *Table, c *context.Context, call call, label string) func(attempts int, _ error) { +func TableOnBulkUpsert(t *Table, c *context.Context, call call) func(_ error, attempts int) { var p TableBulkUpsertStartInfo p.Context = c p.Call = call res := t.onBulkUpsert(p) - return func(attempts int, e error) { + return func(e error, attempts int) { var p TableBulkUpsertDoneInfo - p.Attempts = attempts p.Error = e + p.Attempts = attempts res(p) } } @@ -1401,15 +1436,15 @@ func TableOnSessionKeepAlive(t *Table, c *context.Context, call call, session se } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TableOnSessionBulkUpsert(t *Table, c *context.Context, call call, session sessionInfo) func(error) { +func TableOnSessionBulkUpsert(t *Table, c *context.Context, call call) func(_ error, attempts int) { var p TableBulkUpsertStartInfo p.Context = c p.Call = call - p.Session = session res := t.onSessionBulkUpsert(p) - return func(e error) { + return func(e error, attempts int) { var p TableBulkUpsertDoneInfo p.Error = e + p.Attempts = attempts res(p) } } From 70fe1c4ead76615e605ce124d3f44a31ba518ac5 Mon Sep 17 00:00:00 2001 From: Vasily Gerasimov Date: Wed, 2 Oct 2024 15:25:50 +0000 Subject: [PATCH 07/16] Fix review issues --- internal/table/client.go | 50 ++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/internal/table/client.go b/internal/table/client.go index 2238b7720..8adbf98c7 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -28,10 +28,10 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config ) return &Client{ - clock: config.Clock(), - config: config, - service: Ydb_Table_V1.NewTableServiceClient(cc), - cc: cc, + clock: config.Clock(), + config: config, + client: Ydb_Table_V1.NewTableServiceClient(cc), + cc: cc, build: func(ctx context.Context) (s *session, err error) { return newSession(ctx, cc, config) }, @@ -89,13 +89,13 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config // A Client is safe for use by multiple goroutines simultaneously. type Client struct { // read-only fields - config *config.Config - service Ydb_Table_V1.TableServiceClient - build sessionBuilder - cc grpc.ClientConnInterface - clock clockwork.Clock - pool sessionPool - done chan struct{} + config *config.Config + client Ydb_Table_V1.TableServiceClient + build sessionBuilder + cc grpc.ClientConnInterface + clock clockwork.Clock + pool sessionPool + done chan struct{} } func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ table.ClosableSession, err error) { @@ -289,21 +289,25 @@ func (c *Client) BulkUpsert( onDone(finalErr, attempts) }() - return retryBackoff(ctx, c.pool, func(ctx context.Context, s table.Session) (err error) { - attempts++ + request := Ydb_Table.BulkUpsertRequest{ + Table: tableName, + } + finalErr = data.ApplyBulkUpsertRequest(a, (*table.BulkUpsertRequest)(&request)) + if finalErr != nil { + return finalErr + } - req := Ydb_Table.BulkUpsertRequest{ - Table: tableName, - } - err = data.ApplyBulkUpsertRequest(a, (*table.BulkUpsertRequest)(&req)) - if err != nil { - return err - } + finalErr = retry.Retry(ctx, + func(ctx context.Context) (err error) { + attempts++ + _, err = c.client.BulkUpsert(ctx, &request, config.CallOptions...) - _, err = c.service.BulkUpsert(ctx, &req, config.CallOptions...) + return err + }, + config.RetryOptions..., + ) - return err - }, config.RetryOptions...) + return xerrors.WithStackTrace(finalErr) } func executeTxOperation(ctx context.Context, c *Client, op table.TxOperation, tx table.Transaction) (err error) { From edfaf7f403524ccd53a2ee1c9838d3fbc5dc691d Mon Sep 17 00:00:00 2001 From: Vasily Gerasimov Date: Fri, 4 Oct 2024 10:29:11 +0000 Subject: [PATCH 08/16] Don't use table method in deprecated session method --- internal/table/session.go | 57 ++++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/internal/table/session.go b/internal/table/session.go index 4b58bada7..c6b23c335 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -48,7 +48,6 @@ import ( type session struct { onClose []func(s *session) id string - cc grpc.ClientConnInterface tableService Ydb_Table_V1.TableServiceClient status table.SessionStatus config *config.Config @@ -151,7 +150,6 @@ func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config id: result.GetSessionId(), config: config, status: table.SessionReady, - cc: cc, } s.lastUsage.Store(time.Now().Unix()) @@ -1253,32 +1251,47 @@ func (s *session) StreamExecuteScanQuery( ) } -type bulkUpsertOptionToTableOption struct { - opt []options.BulkUpsertOption -} - -func (opt bulkUpsertOptionToTableOption) ApplyTableOption(tableOpts *options.TableOptions) { - var callOpts []grpc.CallOption - for _, o := range opt.opt { - callOpts = append(callOpts, o.ApplyBulkUpsertOption()...) - } - tableOpts.CallOptions = append(tableOpts.CallOptions, callOpts...) -} - // BulkUpsert uploads given list of ydb struct values to the table. -func (s *session) BulkUpsert(ctx context.Context, tableName string, rows value.Value, +func (s *session) BulkUpsert(ctx context.Context, table string, rows value.Value, opts ...options.BulkUpsertOption, ) (err error) { - client := New(ctx, s.cc, config.New()) + var ( + a = allocator.New() + callOptions []grpc.CallOption + onDone = trace.TableOnSessionBulkUpsert( + s.config.Trace(), &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).BulkUpsert"), + ) + ) + defer func() { + defer a.Free() + onDone(err, 1) + }() + + for _, opt := range opts { + if opt != nil { + callOptions = append(callOptions, opt.ApplyBulkUpsertOption()...) + } + } - return client.BulkUpsert( - ctx, - tableName, - table.BulkUpsertRows{ - Rows: rows, + _, err = s.tableService.BulkUpsert(ctx, + &Ydb_Table.BulkUpsertRequest{ + Table: table, + Rows: value.ToYDB(rows, a), + OperationParams: operation.Params( + ctx, + s.config.OperationTimeout(), + s.config.OperationCancelAfter(), + operation.ModeSync, + ), }, - bulkUpsertOptionToTableOption{opts}, + callOptions..., ) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return nil } // BeginTransaction begins new transaction within given session with given settings. From 327ee249b6b8bc9af1850b082d2594013f7efc5e Mon Sep 17 00:00:00 2001 From: Vasily Gerasimov Date: Fri, 4 Oct 2024 10:42:34 +0000 Subject: [PATCH 09/16] Leave options as they were --- internal/table/client.go | 2 +- table/options/options.go | 28 ---------------------------- table/table.go | 25 +++++++++++++++++++++---- 3 files changed, 22 insertions(+), 33 deletions(-) diff --git a/internal/table/client.go b/internal/table/client.go index 064d3446c..bc98e66ef 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -307,7 +307,7 @@ func (c *Client) BulkUpsert( finalErr = retry.Retry(ctx, func(ctx context.Context) (err error) { attempts++ - _, err = c.client.BulkUpsert(ctx, &request, config.CallOptions...) + _, err = c.client.BulkUpsert(ctx, &request) return err }, diff --git a/table/options/options.go b/table/options/options.go index 83b8e90f3..922333a17 100644 --- a/table/options/options.go +++ b/table/options/options.go @@ -8,8 +8,6 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/types" "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" - "github.com/ydb-platform/ydb-go-sdk/v3/retry" - "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) func WithShardKeyBounds() DescribeTableOption { @@ -871,28 +869,6 @@ func WithKeepInCache(keepInCache bool) ExecuteDataQueryOption { ) } -type TableOptions struct { - Label string - Idempotent bool - TxSettings *TransactionSettings - TxCommitOptions []CommitTransactionOption - RetryOptions []retry.Option - Trace *trace.Table - CallOptions []grpc.CallOption -} - -type TransactionSettings struct { - settings Ydb_Table.TransactionSettings -} - -func (t *TransactionSettings) Settings() *Ydb_Table.TransactionSettings { - if t == nil { - return nil - } - - return &t.settings -} - type withCallOptions []grpc.CallOption func (opts withCallOptions) ApplyExecuteScanQueryOption(d *ExecuteScanQueryDesc) []grpc.CallOption { @@ -909,10 +885,6 @@ func (opts withCallOptions) ApplyExecuteDataQueryOption( return opts } -func (opts withCallOptions) ApplyTableOption(tableOpts *TableOptions) { - tableOpts.CallOptions = append(tableOpts.CallOptions, opts...) -} - // WithCallOptions appends flag of commit transaction with executing query func WithCallOptions(opts ...grpc.CallOption) withCallOptions { return opts diff --git a/table/table.go b/table/table.go index c93b40edd..0173324d4 100644 --- a/table/table.go +++ b/table/table.go @@ -212,7 +212,17 @@ type Session interface { ) error } -type TransactionSettings = options.TransactionSettings +type TransactionSettings struct { + settings Ydb_Table.TransactionSettings +} + +func (t *TransactionSettings) Settings() *Ydb_Table.TransactionSettings { + if t == nil { + return nil + } + + return &t.settings +} // Explanation is a result of Explain calls. type Explanation struct { @@ -305,7 +315,7 @@ func TxSettings(opts ...TxOption) *TransactionSettings { s := new(TransactionSettings) for _, opt := range opts { if opt != nil { - opt((*txDesc)(s.Settings())) + opt((*txDesc)(&s.settings)) } } @@ -317,7 +327,7 @@ func BeginTx(opts ...TxOption) TxControlOption { return func(d *txControlDesc) { s := TxSettings(opts...) d.TxSelector = &Ydb_Table.TransactionControl_BeginTx{ - BeginTx: s.Settings(), + BeginTx: &s.settings, } } } @@ -488,7 +498,14 @@ func ValueParam(name string, v value.Value) ParameterOption { return params.Named(name, v) } -type Options = options.TableOptions +type Options struct { + Label string + Idempotent bool + TxSettings *TransactionSettings + TxCommitOptions []options.CommitTransactionOption + RetryOptions []retry.Option + Trace *trace.Table +} type Option interface { ApplyTableOption(opts *Options) From 5e4f7d69c2702b63c622e47618a3ecdd9ef07f39 Mon Sep 17 00:00:00 2001 From: Vasily Gerasimov Date: Fri, 4 Oct 2024 10:44:55 +0000 Subject: [PATCH 10/16] Fix changelog --- CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 686bfc2b8..befe8bd7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ * Support bulk upsert from scv, arrow and ydb internal formats in table client -* Fixed reporting of `ydb_go_sdk_ydb_query_session_create_latency` and `ydb_go_sdk_query_session_count` metrics depends on details -* Allowed skip column for `ScanStruct` by tag `-` +* Disabled reporting of `ydb_go_sdk_query_session_count` when metrics are disabled +* Disabled reporting of `ydb_go_sdk_ydb_query_session_create_latency` histogram metrics when metrics are disabled +* Allowed skip column for `ScanStruct` by tag `-` ## v3.81.4 * Returned `topicwriter.ErrQueueLimitExceed`, accidental removed at `v3.81.0` From 5700cf77ef6ae11244bdaa6584b7b7048e711626 Mon Sep 17 00:00:00 2001 From: Vasily Gerasimov Date: Fri, 4 Oct 2024 12:09:02 +0000 Subject: [PATCH 11/16] Remove BulkUpsertData implementations from public --- table/table.go | 26 ++-- tests/integration/table_bulk_upsert_test.go | 128 +++++++++++++------- 2 files changed, 97 insertions(+), 57 deletions(-) diff --git a/table/table.go b/table/table.go index 0173324d4..17d77b86d 100644 --- a/table/table.go +++ b/table/table.go @@ -596,17 +596,23 @@ type BulkUpsertData interface { ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error } -type BulkUpsertRows struct { +type bulkUpsertRows struct { Rows value.Value } -func (data BulkUpsertRows) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { +func (data bulkUpsertRows) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { req.Rows = value.ToYDB(data.Rows, a) return nil } -type BulkUpsertCsv struct { +func NewBulkUpsertRows(rows value.Value) bulkUpsertRows { + return bulkUpsertRows{ + Rows: rows, + } +} + +type bulkUpsertCsv struct { Data []byte Options []CsvFormatOption } @@ -615,7 +621,7 @@ type CsvFormatOption interface { ApplyCsvFormatOption(req *BulkUpsertRequest) (err error) } -func (data BulkUpsertCsv) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { +func (data bulkUpsertCsv) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { req.Data = data.Data var err error @@ -631,8 +637,8 @@ func (data BulkUpsertCsv) ApplyBulkUpsertRequest(a *allocator.Allocator, req *Bu return err } -func NewBulkUpsertCsv(data []byte, opts ...CsvFormatOption) BulkUpsertCsv { - return BulkUpsertCsv{ +func NewBulkUpsertCsv(data []byte, opts ...CsvFormatOption) bulkUpsertCsv { + return bulkUpsertCsv{ Data: data, Options: opts, } @@ -717,7 +723,7 @@ func WithCsvSkipRows(count uint32) CsvFormatOption { return &csvSkipRowsOption{count} } -type BulkUpsertArrow struct { +type bulkUpsertArrow struct { Data []byte Options []ArrowFormatOption } @@ -726,7 +732,7 @@ type ArrowFormatOption interface { ApplyArrowFormatOption(req *BulkUpsertRequest) (err error) } -func (data BulkUpsertArrow) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { +func (data bulkUpsertArrow) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { req.Data = data.Data var err error @@ -742,8 +748,8 @@ func (data BulkUpsertArrow) ApplyBulkUpsertRequest(a *allocator.Allocator, req * return err } -func NewBulkUpsertArrow(data []byte, opts ...ArrowFormatOption) BulkUpsertArrow { - return BulkUpsertArrow{ +func NewBulkUpsertArrow(data []byte, opts ...ArrowFormatOption) bulkUpsertArrow { + return bulkUpsertArrow{ Data: data, Options: opts, } diff --git a/tests/integration/table_bulk_upsert_test.go b/tests/integration/table_bulk_upsert_test.go index 82465b31a..265d79b1e 100644 --- a/tests/integration/table_bulk_upsert_test.go +++ b/tests/integration/table_bulk_upsert_test.go @@ -17,7 +17,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/table/types" ) -func TestTableBulkUpsert(t *testing.T) { +func TestTableBulkUpsertSession(t *testing.T) { var ( scope = newScope(t) driver = scope.Driver() @@ -39,58 +39,40 @@ func TestTableBulkUpsert(t *testing.T) { return s.BulkUpsert(ctx, tablePath, types.ListValue(rows...)) }) scope.Require.NoError(err) -} -func assertIdValueImpl(ctx context.Context, t *testing.T, tableName string, id int64, val *string) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + for i := int64(0); i < 10; i++ { + val := fmt.Sprintf("value for %v", i) + assertIdValue(scope.Ctx, t, tablePath, i, val) + } +} - db, err := ydb.Open(ctx, - os.Getenv("YDB_CONNECTION_STRING"), - // ydb.WithAccessTokenCredentials(os.Getenv("YDB_ACCESS_TOKEN_CREDENTIALS")), +func TestTableBulkUpsert(t *testing.T) { + var ( + scope = newScope(t) + driver = scope.Driver() + tablePath = scope.TablePath() ) - require.NoError(t, err) - err = db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) (err error) { - res, err := tx.Execute(ctx, fmt.Sprintf("SELECT val FROM `%s` WHERE id = %d", tableName, id), nil) - if err != nil { - return err - } - err = res.NextResultSetErr(ctx) - if err != nil { - return err - } - require.EqualValues(t, 1, res.ResultSetCount()) - if !res.NextRow() { - if err = res.Err(); err != nil { - return err - } - return fmt.Errorf("unexpected empty result set") - } - var resultVal *string - err = res.ScanNamed( - named.Optional("val", &resultVal), - ) - if err != nil { - return err - } - if val != nil { - require.NotEmpty(t, resultVal) - require.EqualValues(t, *val, *resultVal) - } else { - require.Nil(t, resultVal) - } - return res.Err() - }, table.WithTxSettings(table.TxSettings(table.WithSnapshotReadOnly())), table.WithIdempotent()) - require.NoError(t, err) -} + // upsert + var rows []types.Value -func assertIdValue(ctx context.Context, t *testing.T, tableName string, id int64, val string) { - assertIdValueImpl(ctx, t, tableName, id, &val) -} + for i := int64(0); i < 10; i++ { + val := fmt.Sprintf("value for %v", i) + rows = append(rows, types.StructValue( + types.StructFieldValue("id", types.Int64Value(i)), + types.StructFieldValue("val", types.TextValue(val)), + )) + } -func assertIdValueNil(ctx context.Context, t *testing.T, tableName string, id int64) { - assertIdValueImpl(ctx, t, tableName, id, nil) + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.NewBulkUpsertRows( + types.ListValue(rows...), + )) + scope.Require.NoError(err) + + for i := int64(0); i < 10; i++ { + val := fmt.Sprintf("value for %v", i) + assertIdValue(scope.Ctx, t, tablePath, i, val) + } } func TestTableCsvBulkUpsert(t *testing.T) { @@ -209,3 +191,55 @@ func TestTableArrowBulkUpsert(t *testing.T) { assertIdValue(scope.Ctx, t, tablePath, 123, "data1") assertIdValue(scope.Ctx, t, tablePath, 234, "data2") } + +func assertIdValueImpl(ctx context.Context, t *testing.T, tableName string, id int64, val *string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := ydb.Open(ctx, + os.Getenv("YDB_CONNECTION_STRING"), + // ydb.WithAccessTokenCredentials(os.Getenv("YDB_ACCESS_TOKEN_CREDENTIALS")), + ) + require.NoError(t, err) + err = db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) (err error) { + res, err := tx.Execute(ctx, fmt.Sprintf("SELECT val FROM `%s` WHERE id = %d", tableName, id), nil) + if err != nil { + return err + } + err = res.NextResultSetErr(ctx) + if err != nil { + return err + } + require.EqualValues(t, 1, res.ResultSetCount()) + if !res.NextRow() { + if err = res.Err(); err != nil { + return err + } + return fmt.Errorf("unexpected empty result set") + } + var resultVal *string + err = res.ScanNamed( + named.Optional("val", &resultVal), + ) + if err != nil { + return err + } + if val != nil { + require.NotEmpty(t, resultVal) + require.EqualValues(t, *val, *resultVal) + } else { + require.Nil(t, resultVal) + } + + return res.Err() + }, table.WithTxSettings(table.TxSettings(table.WithSnapshotReadOnly())), table.WithIdempotent()) + require.NoError(t, err) +} + +func assertIdValue(ctx context.Context, t *testing.T, tableName string, id int64, val string) { + assertIdValueImpl(ctx, t, tableName, id, &val) +} + +func assertIdValueNil(ctx context.Context, t *testing.T, tableName string, id int64) { + assertIdValueImpl(ctx, t, tableName, id, nil) +} From 53608c267f5dc07e0854c49263599b4a5d855fd5 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Fri, 4 Oct 2024 18:37:36 +0300 Subject: [PATCH 12/16] changed visibility of bulk upsert data types and interfaces --- table/table.go | 24 ++++++++++----------- tests/integration/table_bulk_upsert_test.go | 12 +++++------ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/table/table.go b/table/table.go index 17d77b86d..51b80ff32 100644 --- a/table/table.go +++ b/table/table.go @@ -606,7 +606,7 @@ func (data bulkUpsertRows) ApplyBulkUpsertRequest(a *allocator.Allocator, req *B return nil } -func NewBulkUpsertRows(rows value.Value) bulkUpsertRows { +func BulkUpsertDataRows(rows value.Value) bulkUpsertRows { return bulkUpsertRows{ Rows: rows, } @@ -614,10 +614,10 @@ func NewBulkUpsertRows(rows value.Value) bulkUpsertRows { type bulkUpsertCsv struct { Data []byte - Options []CsvFormatOption + Options []csvFormatOption } -type CsvFormatOption interface { +type csvFormatOption interface { ApplyCsvFormatOption(req *BulkUpsertRequest) (err error) } @@ -637,7 +637,7 @@ func (data bulkUpsertCsv) ApplyBulkUpsertRequest(a *allocator.Allocator, req *Bu return err } -func NewBulkUpsertCsv(data []byte, opts ...CsvFormatOption) bulkUpsertCsv { +func BulkUpsertDataCsv(data []byte, opts ...csvFormatOption) bulkUpsertCsv { return bulkUpsertCsv{ Data: data, Options: opts, @@ -674,7 +674,7 @@ func (opt *csvHeaderOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error { } // First not skipped line is a CSV header (list of column names). -func WithCsvHeader() CsvFormatOption { +func WithCsvHeader() csvFormatOption { return &csvHeaderOption{} } @@ -689,7 +689,7 @@ func (opt *csvNullValueOption) ApplyCsvFormatOption(req *BulkUpsertRequest) erro } // String value that would be interpreted as NULL. -func WithCsvNullValue(value []byte) CsvFormatOption { +func WithCsvNullValue(value []byte) csvFormatOption { return &csvNullValueOption{value} } @@ -704,7 +704,7 @@ func (opt *csvDelimiterOption) ApplyCsvFormatOption(req *BulkUpsertRequest) erro } // Fields delimiter in CSV file. It's "," if not set. -func WithCsvDelimiter(value []byte) CsvFormatOption { +func WithCsvDelimiter(value []byte) csvFormatOption { return &csvDelimiterOption{value} } @@ -719,16 +719,16 @@ func (opt *csvSkipRowsOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error } // Number of rows to skip before CSV data. It should be present only in the first upsert of CSV file. -func WithCsvSkipRows(count uint32) CsvFormatOption { +func WithCsvSkipRows(count uint32) csvFormatOption { return &csvSkipRowsOption{count} } type bulkUpsertArrow struct { Data []byte - Options []ArrowFormatOption + Options []arrowFormatOption } -type ArrowFormatOption interface { +type arrowFormatOption interface { ApplyArrowFormatOption(req *BulkUpsertRequest) (err error) } @@ -748,7 +748,7 @@ func (data bulkUpsertArrow) ApplyBulkUpsertRequest(a *allocator.Allocator, req * return err } -func NewBulkUpsertArrow(data []byte, opts ...ArrowFormatOption) bulkUpsertArrow { +func BulkUpsertDataArrow(data []byte, opts ...arrowFormatOption) bulkUpsertArrow { return bulkUpsertArrow{ Data: data, Options: opts, @@ -786,6 +786,6 @@ func (opt *arrowSchemaOption) ApplyArrowFormatOption(req *BulkUpsertRequest) err return nil } -func WithArrowSchema(schema []byte) ArrowFormatOption { +func WithArrowSchema(schema []byte) arrowFormatOption { return &arrowSchemaOption{schema} } diff --git a/tests/integration/table_bulk_upsert_test.go b/tests/integration/table_bulk_upsert_test.go index 265d79b1e..fc0906d40 100644 --- a/tests/integration/table_bulk_upsert_test.go +++ b/tests/integration/table_bulk_upsert_test.go @@ -64,7 +64,7 @@ func TestTableBulkUpsert(t *testing.T) { )) } - err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.NewBulkUpsertRows( + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.BulkUpsertDataRows( types.ListValue(rows...), )) scope.Require.NoError(err) @@ -86,7 +86,7 @@ func TestTableCsvBulkUpsert(t *testing.T) { 42,"text42" 43,"text43"` - err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.NewBulkUpsertCsv( + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.BulkUpsertDataCsv( []byte(csv), table.WithCsvHeader(), )) @@ -107,7 +107,7 @@ func TestTableCsvBulkUpsertDelimiter(t *testing.T) { 42:"text42" 43:"text43"` - err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.NewBulkUpsertCsv( + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.BulkUpsertDataCsv( []byte(csv), table.WithCsvHeader(), table.WithCsvDelimiter([]byte(":")), @@ -129,7 +129,7 @@ func TestTableCsvBulkUpsertNullValue(t *testing.T) { 42,hello 43,hello world` - err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.NewBulkUpsertCsv( + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.BulkUpsertDataCsv( []byte(csv), table.WithCsvHeader(), table.WithCsvNullValue([]byte("hello")), @@ -157,7 +157,7 @@ id,val ` - err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.NewBulkUpsertCsv( + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.BulkUpsertDataCsv( []byte(csv), table.WithCsvHeader(), table.WithCsvSkipRows(2), @@ -182,7 +182,7 @@ func TestTableArrowBulkUpsert(t *testing.T) { schema, err := os.ReadFile("testdata/bulk_upsert_test_schema.arrow") scope.Require.NoError(err) - err = driver.Table().BulkUpsert(scope.Ctx, tablePath, table.NewBulkUpsertArrow( + err = driver.Table().BulkUpsert(scope.Ctx, tablePath, table.BulkUpsertDataArrow( []byte(data), table.WithArrowSchema(schema), )) From af3be446563283cb88a081185fbde25f3f6e2eb9 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Fri, 4 Oct 2024 18:40:07 +0300 Subject: [PATCH 13/16] CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b8c24ca62..cd11d03c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -* Support bulk upsert from scv, arrow and ydb internal formats in table client +* Supported `db.Table().BulkUpsert()` from scv, arrow and ydb rows formats ## v3.82.0 * Fixed error on experimental `TopicListener.Close` From 865024a6b4f373c1974b857be903a3ea0d2701cc Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Sun, 6 Oct 2024 15:02:07 +0300 Subject: [PATCH 14/16] replaced ensure... to explicit constructors of entities --- internal/table/client.go | 34 +++++--- table/table.go | 175 +++++++++++++++------------------------ 2 files changed, 90 insertions(+), 119 deletions(-) diff --git a/internal/table/client.go b/internal/table/client.go index bc98e66ef..55e2af776 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -5,7 +5,6 @@ import ( "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table" "google.golang.org/grpc" "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" @@ -286,35 +285,44 @@ func (c *Client) BulkUpsert( a := allocator.New() defer a.Free() - config := c.retryOptions(opts...) - config.RetryOptions = append(config.RetryOptions, retry.WithIdempotent(true)) + attempts, config := 0, c.retryOptions(opts...) + config.RetryOptions = append(config.RetryOptions, + retry.WithIdempotent(true), + retry.WithTrace(&trace.Retry{ + OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) { + return func(info trace.RetryLoopDoneInfo) { + attempts = info.Attempts + } + }, + }), + ) - attempts, onDone := 0, trace.TableOnBulkUpsert(config.Trace, &ctx, + onDone := trace.TableOnBulkUpsert(config.Trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Client).BulkUpsert"), ) defer func() { onDone(finalErr, attempts) }() - request := Ydb_Table.BulkUpsertRequest{ - Table: tableName, - } - finalErr = data.ApplyBulkUpsertRequest(a, (*table.BulkUpsertRequest)(&request)) - if finalErr != nil { - return finalErr + request, err := data.ToYDB(a, tableName) + if err != nil { + return xerrors.WithStackTrace(err) } - finalErr = retry.Retry(ctx, + err = retry.Retry(ctx, func(ctx context.Context) (err error) { attempts++ - _, err = c.client.BulkUpsert(ctx, &request) + _, err = c.client.BulkUpsert(ctx, request) return err }, config.RetryOptions..., ) + if err != nil { + return xerrors.WithStackTrace(err) + } - return xerrors.WithStackTrace(finalErr) + return nil } func executeTxOperation(ctx context.Context, c *Client, op table.TxOperation, tx table.Transaction) (err error) { diff --git a/table/table.go b/table/table.go index 51b80ff32..231d2a426 100644 --- a/table/table.go +++ b/table/table.go @@ -588,87 +588,71 @@ func WithTrace(t trace.Table) traceOption { //nolint:gocritic return traceOption{t: &t} } -type ( - BulkUpsertRequest Ydb_Table.BulkUpsertRequest -) - type BulkUpsertData interface { - ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error + ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) } type bulkUpsertRows struct { - Rows value.Value + rows value.Value } -func (data bulkUpsertRows) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { - req.Rows = value.ToYDB(data.Rows, a) - - return nil +func (data bulkUpsertRows) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) { + return &Ydb_Table.BulkUpsertRequest{ + Table: tableName, + Rows: value.ToYDB(data.rows, a), + }, nil } func BulkUpsertDataRows(rows value.Value) bulkUpsertRows { return bulkUpsertRows{ - Rows: rows, + rows: rows, } } type bulkUpsertCsv struct { - Data []byte - Options []csvFormatOption + data []byte + opts []csvFormatOption } type csvFormatOption interface { - ApplyCsvFormatOption(req *BulkUpsertRequest) (err error) + applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) (err error) } -func (data bulkUpsertCsv) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { - req.Data = data.Data +func (data bulkUpsertCsv) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) { + var ( + request = &Ydb_Table.BulkUpsertRequest{ + Table: tableName, + Data: data.data, + } + dataFormat = &Ydb_Table.BulkUpsertRequest_CsvSettings{ + CsvSettings: &Ydb_Formats.CsvSettings{}, + } + ) - var err error - for _, opt := range data.Options { + for _, opt := range data.opts { if opt != nil { - err = opt.ApplyCsvFormatOption(req) - if err != nil { - return err + if err := opt.applyCsvFormatOption(dataFormat); err != nil { + return nil, err } } } - return err + request.DataFormat = dataFormat + + return request, nil } func BulkUpsertDataCsv(data []byte, opts ...csvFormatOption) bulkUpsertCsv { return bulkUpsertCsv{ - Data: data, - Options: opts, - } -} - -func ensureCsvDataFormatSettings(req *BulkUpsertRequest) (format *Ydb_Formats.CsvSettings) { - if settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_CsvSettings); ok { - if settings.CsvSettings == nil { - settings.CsvSettings = &Ydb_Formats.CsvSettings{} - } - - return settings.CsvSettings - } - - req.DataFormat = &Ydb_Table.BulkUpsertRequest_CsvSettings{ - CsvSettings: &Ydb_Formats.CsvSettings{}, + data: data, + opts: opts, } - - settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_CsvSettings) - if !ok { - return nil - } - - return settings.CsvSettings } type csvHeaderOption struct{} -func (opt *csvHeaderOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error { - ensureCsvDataFormatSettings(req).Header = true +func (opt *csvHeaderOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error { + dataFormat.CsvSettings.Header = true return nil } @@ -678,114 +662,93 @@ func WithCsvHeader() csvFormatOption { return &csvHeaderOption{} } -type csvNullValueOption struct { - Value []byte -} +type csvNullValueOption []byte -func (opt *csvNullValueOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error { - ensureCsvDataFormatSettings(req).NullValue = opt.Value +func (nullValue csvNullValueOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error { + dataFormat.CsvSettings.NullValue = nullValue return nil } // String value that would be interpreted as NULL. func WithCsvNullValue(value []byte) csvFormatOption { - return &csvNullValueOption{value} + return csvNullValueOption(value) } -type csvDelimiterOption struct { - Value []byte -} +type csvDelimiterOption []byte -func (opt *csvDelimiterOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error { - ensureCsvDataFormatSettings(req).Delimiter = opt.Value +func (delimeter csvDelimiterOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error { + dataFormat.CsvSettings.Delimiter = delimeter return nil } // Fields delimiter in CSV file. It's "," if not set. func WithCsvDelimiter(value []byte) csvFormatOption { - return &csvDelimiterOption{value} + return csvDelimiterOption(value) } -type csvSkipRowsOption struct { - Count uint32 -} +type csvSkipRowsOption uint32 -func (opt *csvSkipRowsOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error { - ensureCsvDataFormatSettings(req).SkipRows = opt.Count +func (skipRows csvSkipRowsOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error { + dataFormat.CsvSettings.SkipRows = uint32(skipRows) return nil } // Number of rows to skip before CSV data. It should be present only in the first upsert of CSV file. -func WithCsvSkipRows(count uint32) csvFormatOption { - return &csvSkipRowsOption{count} +func WithCsvSkipRows(skipRows uint32) csvFormatOption { + return csvSkipRowsOption(skipRows) } type bulkUpsertArrow struct { - Data []byte - Options []arrowFormatOption + data []byte + opts []arrowFormatOption } type arrowFormatOption interface { - ApplyArrowFormatOption(req *BulkUpsertRequest) (err error) + applyArrowFormatOption(req *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings) (err error) } -func (data bulkUpsertArrow) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { - req.Data = data.Data +func (data bulkUpsertArrow) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) { + var ( + request = &Ydb_Table.BulkUpsertRequest{ + Table: tableName, + Data: data.data, + } + dataFormat = &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{ + ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{}, + } + ) - var err error - for _, opt := range data.Options { + for _, opt := range data.opts { if opt != nil { - err = opt.ApplyArrowFormatOption(req) - if err != nil { - return err + if err := opt.applyArrowFormatOption(dataFormat); err != nil { + return nil, err } } } - return err + request.DataFormat = dataFormat + + return request, nil } func BulkUpsertDataArrow(data []byte, opts ...arrowFormatOption) bulkUpsertArrow { return bulkUpsertArrow{ - Data: data, - Options: opts, + data: data, + opts: opts, } } -func ensureArrowDataFormatSettings(req *BulkUpsertRequest) (format *Ydb_Formats.ArrowBatchSettings) { - if settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_ArrowBatchSettings); ok { - if settings.ArrowBatchSettings == nil { - settings.ArrowBatchSettings = &Ydb_Formats.ArrowBatchSettings{} - } - - return settings.ArrowBatchSettings - } - - req.DataFormat = &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{ - ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{}, - } - - settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_ArrowBatchSettings) - if !ok { - return nil - } - - return settings.ArrowBatchSettings -} - -type arrowSchemaOption struct { - Schema []byte -} +type arrowSchemaOption []byte -func (opt *arrowSchemaOption) ApplyArrowFormatOption(req *BulkUpsertRequest) error { - ensureArrowDataFormatSettings(req).Schema = opt.Schema +func (schema arrowSchemaOption) applyArrowFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings) error { + dataFormat.ArrowBatchSettings.Schema = schema return nil } func WithArrowSchema(schema []byte) arrowFormatOption { - return &arrowSchemaOption{schema} + return arrowSchemaOption(schema) } From 0ea4a311d119333a4eccdb0d6b555dd40d50221a Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Sun, 6 Oct 2024 15:20:16 +0300 Subject: [PATCH 15/16] splitted trace.BulkUpsert{Start,Done}Info for client and session --- internal/table/client.go | 6 +++--- internal/table/session.go | 2 +- table/table.go | 4 +++- trace/table.go | 15 ++++++++++++++- trace/table_gtrace.go | 21 ++++++++++----------- 5 files changed, 31 insertions(+), 17 deletions(-) diff --git a/internal/table/client.go b/internal/table/client.go index 55e2af776..e463eba16 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -29,7 +29,6 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config return &Client{ clock: config.Clock(), config: config, - client: Ydb_Table_V1.NewTableServiceClient(cc), cc: cc, build: func(ctx context.Context) (s *session, err error) { return newSession(ctx, cc, config) @@ -89,7 +88,6 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config type Client struct { // read-only fields config *config.Config - client Ydb_Table_V1.TableServiceClient build sessionBuilder cc grpc.ClientConnInterface clock clockwork.Clock @@ -309,10 +307,12 @@ func (c *Client) BulkUpsert( return xerrors.WithStackTrace(err) } + client := Ydb_Table_V1.NewTableServiceClient(c.cc) + err = retry.Retry(ctx, func(ctx context.Context) (err error) { attempts++ - _, err = c.client.BulkUpsert(ctx, request) + _, err = client.BulkUpsert(ctx, request) return err }, diff --git a/internal/table/session.go b/internal/table/session.go index c6b23c335..9acbe5de3 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -1265,7 +1265,7 @@ func (s *session) BulkUpsert(ctx context.Context, table string, rows value.Value ) defer func() { defer a.Free() - onDone(err, 1) + onDone(err) }() for _, opt := range opts { diff --git a/table/table.go b/table/table.go index 231d2a426..961a994c3 100644 --- a/table/table.go +++ b/table/table.go @@ -743,7 +743,9 @@ func BulkUpsertDataArrow(data []byte, opts ...arrowFormatOption) bulkUpsertArrow type arrowSchemaOption []byte -func (schema arrowSchemaOption) applyArrowFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings) error { +func (schema arrowSchemaOption) applyArrowFormatOption( + dataFormat *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings, +) error { dataFormat.ArrowBatchSettings.Schema = schema return nil diff --git a/trace/table.go b/trace/table.go index 3e378d3ba..fe0574215 100644 --- a/trace/table.go +++ b/trace/table.go @@ -35,7 +35,7 @@ type ( OnSessionKeepAlive func(TableKeepAliveStartInfo) func(TableKeepAliveDoneInfo) // Query events // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals - OnSessionBulkUpsert func(TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) + OnSessionBulkUpsert func(TableSessionBulkUpsertStartInfo) func(TableSessionBulkUpsertDoneInfo) // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnSessionQueryPrepare func(TablePrepareDataQueryStartInfo) func(TablePrepareDataQueryDoneInfo) // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals @@ -151,6 +151,19 @@ type ( Attempts int } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + TableSessionBulkUpsertStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + TableSessionBulkUpsertDoneInfo struct { + Error error + } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TableSessionDeleteStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. diff --git a/trace/table_gtrace.go b/trace/table_gtrace.go index ee982df07..a9e6127fb 100644 --- a/trace/table_gtrace.go +++ b/trace/table_gtrace.go @@ -351,7 +351,7 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { { h1 := t.OnSessionBulkUpsert h2 := x.OnSessionBulkUpsert - ret.OnSessionBulkUpsert = func(t TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) { + ret.OnSessionBulkUpsert = func(t TableSessionBulkUpsertStartInfo) func(TableSessionBulkUpsertDoneInfo) { if options.panicCallback != nil { defer func() { if e := recover(); e != nil { @@ -359,14 +359,14 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { } }() } - var r, r1 func(TableBulkUpsertDoneInfo) + var r, r1 func(TableSessionBulkUpsertDoneInfo) if h1 != nil { r = h1(t) } if h2 != nil { r1 = h2(t) } - return func(t TableBulkUpsertDoneInfo) { + return func(t TableSessionBulkUpsertDoneInfo) { if options.panicCallback != nil { defer func() { if e := recover(); e != nil { @@ -1067,16 +1067,16 @@ func (t *Table) onSessionKeepAlive(t1 TableKeepAliveStartInfo) func(TableKeepAli } return res } -func (t *Table) onSessionBulkUpsert(t1 TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) { +func (t *Table) onSessionBulkUpsert(t1 TableSessionBulkUpsertStartInfo) func(TableSessionBulkUpsertDoneInfo) { fn := t.OnSessionBulkUpsert if fn == nil { - return func(TableBulkUpsertDoneInfo) { + return func(TableSessionBulkUpsertDoneInfo) { return } } res := fn(t1) if res == nil { - return func(TableBulkUpsertDoneInfo) { + return func(TableSessionBulkUpsertDoneInfo) { return } } @@ -1436,15 +1436,14 @@ func TableOnSessionKeepAlive(t *Table, c *context.Context, call call, session se } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TableOnSessionBulkUpsert(t *Table, c *context.Context, call call) func(_ error, attempts int) { - var p TableBulkUpsertStartInfo +func TableOnSessionBulkUpsert(t *Table, c *context.Context, call call) func(error) { + var p TableSessionBulkUpsertStartInfo p.Context = c p.Call = call res := t.onSessionBulkUpsert(p) - return func(e error, attempts int) { - var p TableBulkUpsertDoneInfo + return func(e error) { + var p TableSessionBulkUpsertDoneInfo p.Error = e - p.Attempts = attempts res(p) } } From 6b2fbf8bb82d67153c7512140e359da71bf5100a Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Sun, 6 Oct 2024 15:38:11 +0300 Subject: [PATCH 16/16] added test for bulk upsert data constructors --- table/table_test.go | 145 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/table/table_test.go b/table/table_test.go index 568c51068..7d92ed5b7 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -4,7 +4,11 @@ import ( "testing" "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Formats" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" ) @@ -62,3 +66,144 @@ func TestQueryParameters_String(t *testing.T) { }) } } + +func TestBulkUpsertData(t *testing.T) { + for _, tt := range []struct { + name string + data table.BulkUpsertData + request *Ydb_Table.BulkUpsertRequest + }{ + { + name: "Rows", + data: table.BulkUpsertDataRows(types.ListValue( + types.Uint64Value(123), + types.Uint64Value(321), + )), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Rows: &Ydb.TypedValue{ + Type: &Ydb.Type{ + Type: &Ydb.Type_ListType{ + ListType: &Ydb.ListType{ + Item: &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_UINT64}}, + }, + }, + }, + Value: &Ydb.Value{ + Items: []*Ydb.Value{ + { + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 123, + }, + }, + { + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 321, + }, + }, + }, + }, + }, + }, + }, + { + name: "Csv", + data: table.BulkUpsertDataCsv([]byte("123")), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Data: []byte("123"), + DataFormat: &Ydb_Table.BulkUpsertRequest_CsvSettings{ + CsvSettings: &Ydb_Formats.CsvSettings{}, + }, + }, + }, + { + name: "CsvWithDelimeter", + data: table.BulkUpsertDataCsv([]byte("123"), table.WithCsvDelimiter([]byte(";"))), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Data: []byte("123"), + DataFormat: &Ydb_Table.BulkUpsertRequest_CsvSettings{ + CsvSettings: &Ydb_Formats.CsvSettings{ + Delimiter: []byte(";"), + }, + }, + }, + }, + { + name: "CsvWithHeader", + data: table.BulkUpsertDataCsv([]byte("123"), table.WithCsvHeader()), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Data: []byte("123"), + DataFormat: &Ydb_Table.BulkUpsertRequest_CsvSettings{ + CsvSettings: &Ydb_Formats.CsvSettings{ + Header: true, + }, + }, + }, + }, + { + name: "CsvWithNullValue", + data: table.BulkUpsertDataCsv([]byte("123"), table.WithCsvNullValue([]byte("null"))), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Data: []byte("123"), + DataFormat: &Ydb_Table.BulkUpsertRequest_CsvSettings{ + CsvSettings: &Ydb_Formats.CsvSettings{ + NullValue: []byte("null"), + }, + }, + }, + }, + { + name: "CsvWithNullValue", + data: table.BulkUpsertDataCsv([]byte("123"), table.WithCsvSkipRows(30)), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Data: []byte("123"), + DataFormat: &Ydb_Table.BulkUpsertRequest_CsvSettings{ + CsvSettings: &Ydb_Formats.CsvSettings{ + SkipRows: 30, + }, + }, + }, + }, + { + name: "Arrow", + data: table.BulkUpsertDataArrow([]byte("123")), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Data: []byte("123"), + DataFormat: &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{ + ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{}, + }, + }, + }, + { + name: "ArrowWithSchema", + data: table.BulkUpsertDataArrow([]byte("123"), + table.WithArrowSchema([]byte("schema")), + ), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Data: []byte("123"), + DataFormat: &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{ + ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{ + Schema: []byte("schema"), + }, + }, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + a := allocator.New() + request, err := tt.data.ToYDB(a, "test") + require.NoError(t, err) + require.Equal(t, + tt.request.String(), + request.String(), + ) + }) + } +}