Skip to content

Commit

Permalink
Merge pull request #1497 from ydb-platform/bulk-upsert
Browse files Browse the repository at this point in the history
Support of BulkUpsert over table client instead table session
  • Loading branch information
asmyasnikov authored Oct 6, 2024
2 parents e5f377a + 6b2fbf8 commit c7d991f
Show file tree
Hide file tree
Showing 12 changed files with 710 additions and 23 deletions.
10 changes: 6 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
* Supported `db.Table().BulkUpsert()` from scv, arrow and ydb rows formats

## v3.82.0
* Fixed error on experimental TopicListener.Close
* Fixed error on experimental `TopicListener.Close`
* Disabled reporting of `ydb_go_sdk_query_session_count` when metrics are disabled
* Disabled reporting of `ydb_go_sdk_ydb_query_session_create_latency` histogram metrics when metrics are disabled
* Allowed skip column for `ScanStruct` by tag `-`
* Allowed skip column for `ScanStruct` by tag `-`

## v3.81.4
* Returned `topicwriter.ErrQueueLimitExceed`, accidental removed at `v3.81.0`
Expand All @@ -14,15 +16,15 @@
* Removed `experimantal` comment for query service client

## v3.81.1
* Fixed nil pointer dereference panic on failed `ydb.Open`
* Fixed nil pointer dereference panic on failed `ydb.Open`
* Added ip discovery. Server can show own ip address and target hostname in the ListEndpoint message. These fields are used to bypass DNS resolving.

## v3.81.0
* Added error ErrMessagesPutToInternalQueueBeforeError to topic writer
* Added write to topics within transactions

## v3.80.10
* Added `ydb.WithSessionPoolSessionUsageLimit()` option for limitation max count of session usage
* Added `ydb.WithSessionPoolSessionUsageLimit()` option for limitation max count of session usage
* Refactored experimental topic iterators in `topicsugar` package

## v3.80.9
Expand Down
61 changes: 61 additions & 0 deletions internal/table/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"

"github.com/jonboulle/clockwork"
"github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1"
"google.golang.org/grpc"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
Expand Down Expand Up @@ -264,6 +266,65 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
}, config.RetryOptions...)
}

func (c *Client) BulkUpsert(
ctx context.Context,
tableName string,
data table.BulkUpsertData,
opts ...table.Option,
) (finalErr error) {
if c == nil {
return xerrors.WithStackTrace(errNilClient)
}

if c.isClosed() {
return xerrors.WithStackTrace(errClosedClient)
}

a := allocator.New()
defer a.Free()

attempts, config := 0, c.retryOptions(opts...)
config.RetryOptions = append(config.RetryOptions,
retry.WithIdempotent(true),
retry.WithTrace(&trace.Retry{
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) {
return func(info trace.RetryLoopDoneInfo) {
attempts = info.Attempts
}
},
}),
)

onDone := trace.TableOnBulkUpsert(config.Trace, &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Client).BulkUpsert"),
)
defer func() {
onDone(finalErr, attempts)
}()

request, err := data.ToYDB(a, tableName)
if err != nil {
return xerrors.WithStackTrace(err)
}

client := Ydb_Table_V1.NewTableServiceClient(c.cc)

err = retry.Retry(ctx,
func(ctx context.Context) (err error) {
attempts++
_, err = client.BulkUpsert(ctx, request)

return err
},
config.RetryOptions...,
)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
}

func executeTxOperation(ctx context.Context, c *Client, op table.TxOperation, tx table.Transaction) (err error) {
if panicCallback := c.config.PanicCallback(); panicCallback != nil {
defer func() {
Expand Down
1 change: 0 additions & 1 deletion internal/table/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,6 @@ func (s *session) BulkUpsert(ctx context.Context, table string, rows value.Value
onDone = trace.TableOnSessionBulkUpsert(
s.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).BulkUpsert"),
s,
)
)
defer func() {
Expand Down
176 changes: 176 additions & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"time"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Formats"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/params"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
Expand Down Expand Up @@ -68,6 +70,12 @@ type Client interface {
// If op TxOperation return non nil - transaction will be rollback
// Warning: if context without deadline or cancellation func than DoTx can run indefinitely
DoTx(ctx context.Context, op TxOperation, opts ...Option) error

// BulkUpsert upserts a batch of rows non-transactionally.
//
// Returns success only when all rows were successfully upserted. In case of an error some rows might
// be upserted and some might not.
BulkUpsert(ctx context.Context, table string, data BulkUpsertData, opts ...Option) error
}

type SessionStatus = string
Expand Down Expand Up @@ -179,6 +187,7 @@ type Session interface {
opts ...options.ExecuteScanQueryOption,
) (_ result.StreamResult, err error)

// Deprecated: use Client instance instead.
BulkUpsert(
ctx context.Context,
table string,
Expand Down Expand Up @@ -578,3 +587,170 @@ func (opt traceOption) ApplyTableOption(opts *Options) {
func WithTrace(t trace.Table) traceOption { //nolint:gocritic
return traceOption{t: &t}
}

type BulkUpsertData interface {
ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error)
}

type bulkUpsertRows struct {
rows value.Value
}

func (data bulkUpsertRows) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) {
return &Ydb_Table.BulkUpsertRequest{
Table: tableName,
Rows: value.ToYDB(data.rows, a),
}, nil
}

func BulkUpsertDataRows(rows value.Value) bulkUpsertRows {
return bulkUpsertRows{
rows: rows,
}
}

type bulkUpsertCsv struct {
data []byte
opts []csvFormatOption
}

type csvFormatOption interface {
applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) (err error)
}

func (data bulkUpsertCsv) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) {
var (
request = &Ydb_Table.BulkUpsertRequest{
Table: tableName,
Data: data.data,
}
dataFormat = &Ydb_Table.BulkUpsertRequest_CsvSettings{
CsvSettings: &Ydb_Formats.CsvSettings{},
}
)

for _, opt := range data.opts {
if opt != nil {
if err := opt.applyCsvFormatOption(dataFormat); err != nil {
return nil, err
}
}
}

request.DataFormat = dataFormat

return request, nil
}

func BulkUpsertDataCsv(data []byte, opts ...csvFormatOption) bulkUpsertCsv {
return bulkUpsertCsv{
data: data,
opts: opts,
}
}

type csvHeaderOption struct{}

func (opt *csvHeaderOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error {
dataFormat.CsvSettings.Header = true

return nil
}

// First not skipped line is a CSV header (list of column names).
func WithCsvHeader() csvFormatOption {
return &csvHeaderOption{}
}

type csvNullValueOption []byte

func (nullValue csvNullValueOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error {
dataFormat.CsvSettings.NullValue = nullValue

return nil
}

// String value that would be interpreted as NULL.
func WithCsvNullValue(value []byte) csvFormatOption {
return csvNullValueOption(value)
}

type csvDelimiterOption []byte

func (delimeter csvDelimiterOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error {
dataFormat.CsvSettings.Delimiter = delimeter

return nil
}

// Fields delimiter in CSV file. It's "," if not set.
func WithCsvDelimiter(value []byte) csvFormatOption {
return csvDelimiterOption(value)
}

type csvSkipRowsOption uint32

func (skipRows csvSkipRowsOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error {
dataFormat.CsvSettings.SkipRows = uint32(skipRows)

return nil
}

// Number of rows to skip before CSV data. It should be present only in the first upsert of CSV file.
func WithCsvSkipRows(skipRows uint32) csvFormatOption {
return csvSkipRowsOption(skipRows)
}

type bulkUpsertArrow struct {
data []byte
opts []arrowFormatOption
}

type arrowFormatOption interface {
applyArrowFormatOption(req *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings) (err error)
}

func (data bulkUpsertArrow) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) {
var (
request = &Ydb_Table.BulkUpsertRequest{
Table: tableName,
Data: data.data,
}
dataFormat = &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{
ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{},
}
)

for _, opt := range data.opts {
if opt != nil {
if err := opt.applyArrowFormatOption(dataFormat); err != nil {
return nil, err
}
}
}

request.DataFormat = dataFormat

return request, nil
}

func BulkUpsertDataArrow(data []byte, opts ...arrowFormatOption) bulkUpsertArrow {
return bulkUpsertArrow{
data: data,
opts: opts,
}
}

type arrowSchemaOption []byte

func (schema arrowSchemaOption) applyArrowFormatOption(
dataFormat *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings,
) error {
dataFormat.ArrowBatchSettings.Schema = schema

return nil
}

func WithArrowSchema(schema []byte) arrowFormatOption {
return arrowSchemaOption(schema)
}
Loading

0 comments on commit c7d991f

Please sign in to comment.