diff --git a/CHANGELOG.md b/CHANGELOG.md index d7433d935..cd11d03c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,10 @@ +* Supported `db.Table().BulkUpsert()` from scv, arrow and ydb rows formats + ## v3.82.0 -* Fixed error on experimental TopicListener.Close +* Fixed error on experimental `TopicListener.Close` * Disabled reporting of `ydb_go_sdk_query_session_count` when metrics are disabled * Disabled reporting of `ydb_go_sdk_ydb_query_session_create_latency` histogram metrics when metrics are disabled -* Allowed skip column for `ScanStruct` by tag `-` +* Allowed skip column for `ScanStruct` by tag `-` ## v3.81.4 * Returned `topicwriter.ErrQueueLimitExceed`, accidental removed at `v3.81.0` @@ -14,7 +16,7 @@ * Removed `experimantal` comment for query service client ## v3.81.1 -* Fixed nil pointer dereference panic on failed `ydb.Open` +* Fixed nil pointer dereference panic on failed `ydb.Open` * Added ip discovery. Server can show own ip address and target hostname in the ListEndpoint message. These fields are used to bypass DNS resolving. ## v3.81.0 @@ -22,7 +24,7 @@ * Added write to topics within transactions ## v3.80.10 -* Added `ydb.WithSessionPoolSessionUsageLimit()` option for limitation max count of session usage +* Added `ydb.WithSessionPoolSessionUsageLimit()` option for limitation max count of session usage * Refactored experimental topic iterators in `topicsugar` package ## v3.80.9 diff --git a/internal/table/client.go b/internal/table/client.go index 3982c0ce2..e463eba16 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -4,8 +4,10 @@ import ( "context" "github.com/jonboulle/clockwork" + "github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1" "google.golang.org/grpc" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" @@ -264,6 +266,65 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O }, config.RetryOptions...) } +func (c *Client) BulkUpsert( + ctx context.Context, + tableName string, + data table.BulkUpsertData, + opts ...table.Option, +) (finalErr error) { + if c == nil { + return xerrors.WithStackTrace(errNilClient) + } + + if c.isClosed() { + return xerrors.WithStackTrace(errClosedClient) + } + + a := allocator.New() + defer a.Free() + + attempts, config := 0, c.retryOptions(opts...) + config.RetryOptions = append(config.RetryOptions, + retry.WithIdempotent(true), + retry.WithTrace(&trace.Retry{ + OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) { + return func(info trace.RetryLoopDoneInfo) { + attempts = info.Attempts + } + }, + }), + ) + + onDone := trace.TableOnBulkUpsert(config.Trace, &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Client).BulkUpsert"), + ) + defer func() { + onDone(finalErr, attempts) + }() + + request, err := data.ToYDB(a, tableName) + if err != nil { + return xerrors.WithStackTrace(err) + } + + client := Ydb_Table_V1.NewTableServiceClient(c.cc) + + err = retry.Retry(ctx, + func(ctx context.Context) (err error) { + attempts++ + _, err = client.BulkUpsert(ctx, request) + + return err + }, + config.RetryOptions..., + ) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return nil +} + func executeTxOperation(ctx context.Context, c *Client, op table.TxOperation, tx table.Transaction) (err error) { if panicCallback := c.config.PanicCallback(); panicCallback != nil { defer func() { diff --git a/internal/table/session.go b/internal/table/session.go index cd87683d1..9acbe5de3 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -1261,7 +1261,6 @@ func (s *session) BulkUpsert(ctx context.Context, table string, rows value.Value onDone = trace.TableOnSessionBulkUpsert( s.config.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).BulkUpsert"), - s, ) ) defer func() { diff --git a/table/table.go b/table/table.go index 1ef386664..961a994c3 100644 --- a/table/table.go +++ b/table/table.go @@ -4,8 +4,10 @@ import ( "context" "time" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Formats" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" "github.com/ydb-platform/ydb-go-sdk/v3/internal/params" "github.com/ydb-platform/ydb-go-sdk/v3/internal/tx" @@ -68,6 +70,12 @@ type Client interface { // If op TxOperation return non nil - transaction will be rollback // Warning: if context without deadline or cancellation func than DoTx can run indefinitely DoTx(ctx context.Context, op TxOperation, opts ...Option) error + + // BulkUpsert upserts a batch of rows non-transactionally. + // + // Returns success only when all rows were successfully upserted. In case of an error some rows might + // be upserted and some might not. + BulkUpsert(ctx context.Context, table string, data BulkUpsertData, opts ...Option) error } type SessionStatus = string @@ -179,6 +187,7 @@ type Session interface { opts ...options.ExecuteScanQueryOption, ) (_ result.StreamResult, err error) + // Deprecated: use Client instance instead. BulkUpsert( ctx context.Context, table string, @@ -578,3 +587,170 @@ func (opt traceOption) ApplyTableOption(opts *Options) { func WithTrace(t trace.Table) traceOption { //nolint:gocritic return traceOption{t: &t} } + +type BulkUpsertData interface { + ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) +} + +type bulkUpsertRows struct { + rows value.Value +} + +func (data bulkUpsertRows) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) { + return &Ydb_Table.BulkUpsertRequest{ + Table: tableName, + Rows: value.ToYDB(data.rows, a), + }, nil +} + +func BulkUpsertDataRows(rows value.Value) bulkUpsertRows { + return bulkUpsertRows{ + rows: rows, + } +} + +type bulkUpsertCsv struct { + data []byte + opts []csvFormatOption +} + +type csvFormatOption interface { + applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) (err error) +} + +func (data bulkUpsertCsv) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) { + var ( + request = &Ydb_Table.BulkUpsertRequest{ + Table: tableName, + Data: data.data, + } + dataFormat = &Ydb_Table.BulkUpsertRequest_CsvSettings{ + CsvSettings: &Ydb_Formats.CsvSettings{}, + } + ) + + for _, opt := range data.opts { + if opt != nil { + if err := opt.applyCsvFormatOption(dataFormat); err != nil { + return nil, err + } + } + } + + request.DataFormat = dataFormat + + return request, nil +} + +func BulkUpsertDataCsv(data []byte, opts ...csvFormatOption) bulkUpsertCsv { + return bulkUpsertCsv{ + data: data, + opts: opts, + } +} + +type csvHeaderOption struct{} + +func (opt *csvHeaderOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error { + dataFormat.CsvSettings.Header = true + + return nil +} + +// First not skipped line is a CSV header (list of column names). +func WithCsvHeader() csvFormatOption { + return &csvHeaderOption{} +} + +type csvNullValueOption []byte + +func (nullValue csvNullValueOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error { + dataFormat.CsvSettings.NullValue = nullValue + + return nil +} + +// String value that would be interpreted as NULL. +func WithCsvNullValue(value []byte) csvFormatOption { + return csvNullValueOption(value) +} + +type csvDelimiterOption []byte + +func (delimeter csvDelimiterOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error { + dataFormat.CsvSettings.Delimiter = delimeter + + return nil +} + +// Fields delimiter in CSV file. It's "," if not set. +func WithCsvDelimiter(value []byte) csvFormatOption { + return csvDelimiterOption(value) +} + +type csvSkipRowsOption uint32 + +func (skipRows csvSkipRowsOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error { + dataFormat.CsvSettings.SkipRows = uint32(skipRows) + + return nil +} + +// Number of rows to skip before CSV data. It should be present only in the first upsert of CSV file. +func WithCsvSkipRows(skipRows uint32) csvFormatOption { + return csvSkipRowsOption(skipRows) +} + +type bulkUpsertArrow struct { + data []byte + opts []arrowFormatOption +} + +type arrowFormatOption interface { + applyArrowFormatOption(req *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings) (err error) +} + +func (data bulkUpsertArrow) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) { + var ( + request = &Ydb_Table.BulkUpsertRequest{ + Table: tableName, + Data: data.data, + } + dataFormat = &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{ + ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{}, + } + ) + + for _, opt := range data.opts { + if opt != nil { + if err := opt.applyArrowFormatOption(dataFormat); err != nil { + return nil, err + } + } + } + + request.DataFormat = dataFormat + + return request, nil +} + +func BulkUpsertDataArrow(data []byte, opts ...arrowFormatOption) bulkUpsertArrow { + return bulkUpsertArrow{ + data: data, + opts: opts, + } +} + +type arrowSchemaOption []byte + +func (schema arrowSchemaOption) applyArrowFormatOption( + dataFormat *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings, +) error { + dataFormat.ArrowBatchSettings.Schema = schema + + return nil +} + +func WithArrowSchema(schema []byte) arrowFormatOption { + return arrowSchemaOption(schema) +} diff --git a/table/table_test.go b/table/table_test.go index 568c51068..7d92ed5b7 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -4,7 +4,11 @@ import ( "testing" "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Formats" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" ) @@ -62,3 +66,144 @@ func TestQueryParameters_String(t *testing.T) { }) } } + +func TestBulkUpsertData(t *testing.T) { + for _, tt := range []struct { + name string + data table.BulkUpsertData + request *Ydb_Table.BulkUpsertRequest + }{ + { + name: "Rows", + data: table.BulkUpsertDataRows(types.ListValue( + types.Uint64Value(123), + types.Uint64Value(321), + )), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Rows: &Ydb.TypedValue{ + Type: &Ydb.Type{ + Type: &Ydb.Type_ListType{ + ListType: &Ydb.ListType{ + Item: &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_UINT64}}, + }, + }, + }, + Value: &Ydb.Value{ + Items: []*Ydb.Value{ + { + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 123, + }, + }, + { + Value: &Ydb.Value_Uint64Value{ + Uint64Value: 321, + }, + }, + }, + }, + }, + }, + }, + { + name: "Csv", + data: table.BulkUpsertDataCsv([]byte("123")), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Data: []byte("123"), + DataFormat: &Ydb_Table.BulkUpsertRequest_CsvSettings{ + CsvSettings: &Ydb_Formats.CsvSettings{}, + }, + }, + }, + { + name: "CsvWithDelimeter", + data: table.BulkUpsertDataCsv([]byte("123"), table.WithCsvDelimiter([]byte(";"))), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Data: []byte("123"), + DataFormat: &Ydb_Table.BulkUpsertRequest_CsvSettings{ + CsvSettings: &Ydb_Formats.CsvSettings{ + Delimiter: []byte(";"), + }, + }, + }, + }, + { + name: "CsvWithHeader", + data: table.BulkUpsertDataCsv([]byte("123"), table.WithCsvHeader()), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Data: []byte("123"), + DataFormat: &Ydb_Table.BulkUpsertRequest_CsvSettings{ + CsvSettings: &Ydb_Formats.CsvSettings{ + Header: true, + }, + }, + }, + }, + { + name: "CsvWithNullValue", + data: table.BulkUpsertDataCsv([]byte("123"), table.WithCsvNullValue([]byte("null"))), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Data: []byte("123"), + DataFormat: &Ydb_Table.BulkUpsertRequest_CsvSettings{ + CsvSettings: &Ydb_Formats.CsvSettings{ + NullValue: []byte("null"), + }, + }, + }, + }, + { + name: "CsvWithNullValue", + data: table.BulkUpsertDataCsv([]byte("123"), table.WithCsvSkipRows(30)), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Data: []byte("123"), + DataFormat: &Ydb_Table.BulkUpsertRequest_CsvSettings{ + CsvSettings: &Ydb_Formats.CsvSettings{ + SkipRows: 30, + }, + }, + }, + }, + { + name: "Arrow", + data: table.BulkUpsertDataArrow([]byte("123")), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Data: []byte("123"), + DataFormat: &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{ + ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{}, + }, + }, + }, + { + name: "ArrowWithSchema", + data: table.BulkUpsertDataArrow([]byte("123"), + table.WithArrowSchema([]byte("schema")), + ), + request: &Ydb_Table.BulkUpsertRequest{ + Table: "test", + Data: []byte("123"), + DataFormat: &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{ + ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{ + Schema: []byte("schema"), + }, + }, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + a := allocator.New() + request, err := tt.data.ToYDB(a, "test") + require.NoError(t, err) + require.Equal(t, + tt.request.String(), + request.String(), + ) + }) + } +} diff --git a/tests/integration/helpers_test.go b/tests/integration/helpers_test.go index c64f329f2..91f1c892d 100644 --- a/tests/integration/helpers_test.go +++ b/tests/integration/helpers_test.go @@ -180,7 +180,7 @@ func (scope *scopeT) Folder() string { scope.Require.NoError(sugar.RemoveRecursive(scope.Ctx, driver, folderPath)) } } - scope.Logf("Createing folder done: %v", folderPath) + scope.Logf("Creating folder done: %v", folderPath) return fixenv.NewGenericResultWithCleanup(folderPath, clean), nil } return fixenv.CacheResult(scope.Env, f) @@ -310,7 +310,7 @@ func (scope *scopeT) TableName(opts ...func(t *tableNameParams)) string { createTableQueryTemplate: ` PRAGMA TablePathPrefix("{{.TablePathPrefix}}"); CREATE TABLE {{.TableName}} ( - id Int64 NOT NULL, + id Int64 NOT NULL, val Text, PRIMARY KEY (id) ) diff --git a/tests/integration/table_bulk_upsert_test.go b/tests/integration/table_bulk_upsert_test.go index 33523cb93..fc0906d40 100644 --- a/tests/integration/table_bulk_upsert_test.go +++ b/tests/integration/table_bulk_upsert_test.go @@ -6,16 +6,23 @@ package integration import ( "context" "fmt" + "os" "testing" + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" ) -func TestTableBulkUpsert(t *testing.T) { - scope := newScope(t) - driver := scope.Driver() - tablePath := scope.TablePath() +func TestTableBulkUpsertSession(t *testing.T) { + var ( + scope = newScope(t) + driver = scope.Driver() + tablePath = scope.TablePath() + ) // upsert var rows []types.Value @@ -32,4 +39,207 @@ func TestTableBulkUpsert(t *testing.T) { return s.BulkUpsert(ctx, tablePath, types.ListValue(rows...)) }) scope.Require.NoError(err) + + for i := int64(0); i < 10; i++ { + val := fmt.Sprintf("value for %v", i) + assertIdValue(scope.Ctx, t, tablePath, i, val) + } +} + +func TestTableBulkUpsert(t *testing.T) { + var ( + scope = newScope(t) + driver = scope.Driver() + tablePath = scope.TablePath() + ) + + // upsert + var rows []types.Value + + for i := int64(0); i < 10; i++ { + val := fmt.Sprintf("value for %v", i) + rows = append(rows, types.StructValue( + types.StructFieldValue("id", types.Int64Value(i)), + types.StructFieldValue("val", types.TextValue(val)), + )) + } + + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.BulkUpsertDataRows( + types.ListValue(rows...), + )) + scope.Require.NoError(err) + + for i := int64(0); i < 10; i++ { + val := fmt.Sprintf("value for %v", i) + assertIdValue(scope.Ctx, t, tablePath, i, val) + } +} + +func TestTableCsvBulkUpsert(t *testing.T) { + var ( + scope = newScope(t) + driver = scope.Driver() + tablePath = scope.TablePath() + ) + + csv := `id,val +42,"text42" +43,"text43"` + + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.BulkUpsertDataCsv( + []byte(csv), + table.WithCsvHeader(), + )) + scope.Require.NoError(err) + + assertIdValue(scope.Ctx, t, tablePath, 42, "text42") + assertIdValue(scope.Ctx, t, tablePath, 43, "text43") +} + +func TestTableCsvBulkUpsertDelimiter(t *testing.T) { + var ( + scope = newScope(t) + driver = scope.Driver() + tablePath = scope.TablePath() + ) + + csv := `id:val +42:"text42" +43:"text43"` + + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.BulkUpsertDataCsv( + []byte(csv), + table.WithCsvHeader(), + table.WithCsvDelimiter([]byte(":")), + )) + scope.Require.NoError(err) + + assertIdValue(scope.Ctx, t, tablePath, 42, "text42") + assertIdValue(scope.Ctx, t, tablePath, 43, "text43") +} + +func TestTableCsvBulkUpsertNullValue(t *testing.T) { + var ( + scope = newScope(t) + driver = scope.Driver() + tablePath = scope.TablePath() + ) + + csv := `id,val +42,hello +43,hello world` + + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.BulkUpsertDataCsv( + []byte(csv), + table.WithCsvHeader(), + table.WithCsvNullValue([]byte("hello")), + )) + scope.Require.NoError(err) + + assertIdValueNil(scope.Ctx, t, tablePath, 42) + assertIdValue(scope.Ctx, t, tablePath, 43, "hello world") +} + +func TestTableCsvBulkUpsertSkipRows(t *testing.T) { + var ( + scope = newScope(t) + driver = scope.Driver() + tablePath = scope.TablePath() + ) + + // Empty row are OK after skipped rows + csv := `First skip row + Second skip row + +id,val +42,123 +43,456 + +` + + err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.BulkUpsertDataCsv( + []byte(csv), + table.WithCsvHeader(), + table.WithCsvSkipRows(2), + )) + scope.Require.NoError(err) + + assertIdValue(scope.Ctx, t, tablePath, 42, "123") + assertIdValue(scope.Ctx, t, tablePath, 43, "456") +} + +func TestTableArrowBulkUpsert(t *testing.T) { + var ( + scope = newScope(t) + driver = scope.Driver() + tablePath = scope.TablePath() + ) + + // data & schema generated with make_test_arrow.py script + data, err := os.ReadFile("testdata/bulk_upsert_test_data.arrow") + scope.Require.NoError(err) + + schema, err := os.ReadFile("testdata/bulk_upsert_test_schema.arrow") + scope.Require.NoError(err) + + err = driver.Table().BulkUpsert(scope.Ctx, tablePath, table.BulkUpsertDataArrow( + []byte(data), + table.WithArrowSchema(schema), + )) + scope.Require.NoError(err) + + assertIdValue(scope.Ctx, t, tablePath, 123, "data1") + assertIdValue(scope.Ctx, t, tablePath, 234, "data2") +} + +func assertIdValueImpl(ctx context.Context, t *testing.T, tableName string, id int64, val *string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := ydb.Open(ctx, + os.Getenv("YDB_CONNECTION_STRING"), + // ydb.WithAccessTokenCredentials(os.Getenv("YDB_ACCESS_TOKEN_CREDENTIALS")), + ) + require.NoError(t, err) + err = db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) (err error) { + res, err := tx.Execute(ctx, fmt.Sprintf("SELECT val FROM `%s` WHERE id = %d", tableName, id), nil) + if err != nil { + return err + } + err = res.NextResultSetErr(ctx) + if err != nil { + return err + } + require.EqualValues(t, 1, res.ResultSetCount()) + if !res.NextRow() { + if err = res.Err(); err != nil { + return err + } + return fmt.Errorf("unexpected empty result set") + } + var resultVal *string + err = res.ScanNamed( + named.Optional("val", &resultVal), + ) + if err != nil { + return err + } + if val != nil { + require.NotEmpty(t, resultVal) + require.EqualValues(t, *val, *resultVal) + } else { + require.Nil(t, resultVal) + } + + return res.Err() + }, table.WithTxSettings(table.TxSettings(table.WithSnapshotReadOnly())), table.WithIdempotent()) + require.NoError(t, err) +} + +func assertIdValue(ctx context.Context, t *testing.T, tableName string, id int64, val string) { + assertIdValueImpl(ctx, t, tableName, id, &val) +} + +func assertIdValueNil(ctx context.Context, t *testing.T, tableName string, id int64) { + assertIdValueImpl(ctx, t, tableName, id, nil) } diff --git a/tests/integration/testdata/bulk_upsert_test_data.arrow b/tests/integration/testdata/bulk_upsert_test_data.arrow new file mode 100644 index 000000000..c1cba98bc Binary files /dev/null and b/tests/integration/testdata/bulk_upsert_test_data.arrow differ diff --git a/tests/integration/testdata/bulk_upsert_test_schema.arrow b/tests/integration/testdata/bulk_upsert_test_schema.arrow new file mode 100644 index 000000000..92a5908a3 Binary files /dev/null and b/tests/integration/testdata/bulk_upsert_test_schema.arrow differ diff --git a/tests/integration/testdata/make_test_arrow.py b/tests/integration/testdata/make_test_arrow.py new file mode 100755 index 000000000..e4b07f802 --- /dev/null +++ b/tests/integration/testdata/make_test_arrow.py @@ -0,0 +1,17 @@ +#! /usr/bin/env python3 +import pyarrow + +schema = pyarrow.schema([("id", pyarrow.int64()), ("val", pyarrow.utf8())]) + +with open("bulk_upsert_test_schema.arrow", "bw") as schema_file: + schema_file.write(schema.serialize().to_pybytes()) + +data = [ + pyarrow.array([123, 234]), + pyarrow.array(["data1", "data2"]), +] + +batch = pyarrow.record_batch(data, schema=schema) + +with open("bulk_upsert_test_data.arrow", "bw") as data_file: + data_file.write(batch.serialize()) diff --git a/trace/table.go b/trace/table.go index 5589327b1..fe0574215 100644 --- a/trace/table.go +++ b/trace/table.go @@ -23,6 +23,8 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnDoTx func(TableDoTxStartInfo) func(TableDoTxDoneInfo) // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + OnBulkUpsert func(TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnCreateSession func(TableCreateSessionStartInfo) func(TableCreateSessionDoneInfo) // Session events // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals @@ -33,7 +35,7 @@ type ( OnSessionKeepAlive func(TableKeepAliveStartInfo) func(TableKeepAliveDoneInfo) // Query events // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals - OnSessionBulkUpsert func(TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) + OnSessionBulkUpsert func(TableSessionBulkUpsertStartInfo) func(TableSessionBulkUpsertDoneInfo) // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnSessionQueryPrepare func(TablePrepareDataQueryStartInfo) func(TablePrepareDataQueryDoneInfo) // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals @@ -142,10 +144,23 @@ type ( // Safe replacement of context are provided only inside callback function Context *context.Context Call call - Session sessionInfo } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TableBulkUpsertDoneInfo struct { + Error error + Attempts int + } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + TableSessionBulkUpsertStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + TableSessionBulkUpsertDoneInfo struct { Error error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals diff --git a/trace/table_gtrace.go b/trace/table_gtrace.go index c548157b1..a9e6127fb 100644 --- a/trace/table_gtrace.go +++ b/trace/table_gtrace.go @@ -173,6 +173,41 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { } } } + { + h1 := t.OnBulkUpsert + h2 := x.OnBulkUpsert + ret.OnBulkUpsert = func(t TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(TableBulkUpsertDoneInfo) + if h1 != nil { + r = h1(t) + } + if h2 != nil { + r1 = h2(t) + } + return func(t TableBulkUpsertDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(t) + } + if r1 != nil { + r1(t) + } + } + } + } { h1 := t.OnCreateSession h2 := x.OnCreateSession @@ -316,7 +351,7 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { { h1 := t.OnSessionBulkUpsert h2 := x.OnSessionBulkUpsert - ret.OnSessionBulkUpsert = func(t TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) { + ret.OnSessionBulkUpsert = func(t TableSessionBulkUpsertStartInfo) func(TableSessionBulkUpsertDoneInfo) { if options.panicCallback != nil { defer func() { if e := recover(); e != nil { @@ -324,14 +359,14 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { } }() } - var r, r1 func(TableBulkUpsertDoneInfo) + var r, r1 func(TableSessionBulkUpsertDoneInfo) if h1 != nil { r = h1(t) } if h2 != nil { r1 = h2(t) } - return func(t TableBulkUpsertDoneInfo) { + return func(t TableSessionBulkUpsertDoneInfo) { if options.panicCallback != nil { defer func() { if e := recover(); e != nil { @@ -957,6 +992,21 @@ func (t *Table) onDoTx(t1 TableDoTxStartInfo) func(TableDoTxDoneInfo) { } return res } +func (t *Table) onBulkUpsert(t1 TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) { + fn := t.OnBulkUpsert + if fn == nil { + return func(TableBulkUpsertDoneInfo) { + return + } + } + res := fn(t1) + if res == nil { + return func(TableBulkUpsertDoneInfo) { + return + } + } + return res +} func (t *Table) onCreateSession(t1 TableCreateSessionStartInfo) func(TableCreateSessionDoneInfo) { fn := t.OnCreateSession if fn == nil { @@ -1017,16 +1067,16 @@ func (t *Table) onSessionKeepAlive(t1 TableKeepAliveStartInfo) func(TableKeepAli } return res } -func (t *Table) onSessionBulkUpsert(t1 TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) { +func (t *Table) onSessionBulkUpsert(t1 TableSessionBulkUpsertStartInfo) func(TableSessionBulkUpsertDoneInfo) { fn := t.OnSessionBulkUpsert if fn == nil { - return func(TableBulkUpsertDoneInfo) { + return func(TableSessionBulkUpsertDoneInfo) { return } } res := fn(t1) if res == nil { - return func(TableBulkUpsertDoneInfo) { + return func(TableSessionBulkUpsertDoneInfo) { return } } @@ -1320,6 +1370,19 @@ func TableOnDoTx(t *Table, c *context.Context, call call, label string, idempote } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals +func TableOnBulkUpsert(t *Table, c *context.Context, call call) func(_ error, attempts int) { + var p TableBulkUpsertStartInfo + p.Context = c + p.Call = call + res := t.onBulkUpsert(p) + return func(e error, attempts int) { + var p TableBulkUpsertDoneInfo + p.Error = e + p.Attempts = attempts + res(p) + } +} +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TableOnCreateSession(t *Table, c *context.Context, call call) func(session sessionInfo, attempts int, _ error) { var p TableCreateSessionStartInfo p.Context = c @@ -1373,14 +1436,13 @@ func TableOnSessionKeepAlive(t *Table, c *context.Context, call call, session se } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TableOnSessionBulkUpsert(t *Table, c *context.Context, call call, session sessionInfo) func(error) { - var p TableBulkUpsertStartInfo +func TableOnSessionBulkUpsert(t *Table, c *context.Context, call call) func(error) { + var p TableSessionBulkUpsertStartInfo p.Context = c p.Call = call - p.Session = session res := t.onSessionBulkUpsert(p) return func(e error) { - var p TableBulkUpsertDoneInfo + var p TableSessionBulkUpsertDoneInfo p.Error = e res(p) }