Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support of BulkUpsert over table client instead table session #1497

Merged
merged 20 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
* Supported `db.Table().BulkUpsert()` from scv, arrow and ydb rows formats

## v3.82.0
* Fixed error on experimental TopicListener.Close
* Fixed error on experimental `TopicListener.Close`
* Disabled reporting of `ydb_go_sdk_query_session_count` when metrics are disabled
* Disabled reporting of `ydb_go_sdk_ydb_query_session_create_latency` histogram metrics when metrics are disabled
* Allowed skip column for `ScanStruct` by tag `-`
* Allowed skip column for `ScanStruct` by tag `-`

## v3.81.4
* Returned `topicwriter.ErrQueueLimitExceed`, accidental removed at `v3.81.0`
Expand All @@ -14,15 +16,15 @@
* Removed `experimantal` comment for query service client

## v3.81.1
* Fixed nil pointer dereference panic on failed `ydb.Open`
* Fixed nil pointer dereference panic on failed `ydb.Open`
* Added ip discovery. Server can show own ip address and target hostname in the ListEndpoint message. These fields are used to bypass DNS resolving.

## v3.81.0
* Added error ErrMessagesPutToInternalQueueBeforeError to topic writer
* Added write to topics within transactions

## v3.80.10
* Added `ydb.WithSessionPoolSessionUsageLimit()` option for limitation max count of session usage
* Added `ydb.WithSessionPoolSessionUsageLimit()` option for limitation max count of session usage
* Refactored experimental topic iterators in `topicsugar` package

## v3.80.9
Expand Down
61 changes: 61 additions & 0 deletions internal/table/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"

"github.com/jonboulle/clockwork"
"github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1"
"google.golang.org/grpc"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
Expand Down Expand Up @@ -264,6 +266,65 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
}, config.RetryOptions...)
}

func (c *Client) BulkUpsert(
ctx context.Context,
tableName string,
data table.BulkUpsertData,
opts ...table.Option,
) (finalErr error) {
if c == nil {
return xerrors.WithStackTrace(errNilClient)
}

if c.isClosed() {
return xerrors.WithStackTrace(errClosedClient)
}

a := allocator.New()
defer a.Free()

attempts, config := 0, c.retryOptions(opts...)
config.RetryOptions = append(config.RetryOptions,
retry.WithIdempotent(true),
retry.WithTrace(&trace.Retry{
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) {
return func(info trace.RetryLoopDoneInfo) {
attempts = info.Attempts
}
},
}),
)

onDone := trace.TableOnBulkUpsert(config.Trace, &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Client).BulkUpsert"),
)
defer func() {
onDone(finalErr, attempts)
}()

request, err := data.ToYDB(a, tableName)
if err != nil {
return xerrors.WithStackTrace(err)
}

client := Ydb_Table_V1.NewTableServiceClient(c.cc)

err = retry.Retry(ctx,
func(ctx context.Context) (err error) {
attempts++
_, err = client.BulkUpsert(ctx, request)

return err
},
config.RetryOptions...,
)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
}

func executeTxOperation(ctx context.Context, c *Client, op table.TxOperation, tx table.Transaction) (err error) {
if panicCallback := c.config.PanicCallback(); panicCallback != nil {
defer func() {
Expand Down
1 change: 0 additions & 1 deletion internal/table/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,6 @@ func (s *session) BulkUpsert(ctx context.Context, table string, rows value.Value
onDone = trace.TableOnSessionBulkUpsert(
s.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).BulkUpsert"),
s,
)
)
defer func() {
Expand Down
176 changes: 176 additions & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"time"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Formats"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/params"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
Expand Down Expand Up @@ -68,6 +70,12 @@ type Client interface {
// If op TxOperation return non nil - transaction will be rollback
// Warning: if context without deadline or cancellation func than DoTx can run indefinitely
DoTx(ctx context.Context, op TxOperation, opts ...Option) error

// BulkUpsert upserts a batch of rows non-transactionally.
//
// Returns success only when all rows were successfully upserted. In case of an error some rows might
// be upserted and some might not.
BulkUpsert(ctx context.Context, table string, data BulkUpsertData, opts ...Option) error
}

type SessionStatus = string
Expand Down Expand Up @@ -179,6 +187,7 @@ type Session interface {
opts ...options.ExecuteScanQueryOption,
) (_ result.StreamResult, err error)

// Deprecated: use Client instance instead.
BulkUpsert(
ctx context.Context,
table string,
Expand Down Expand Up @@ -578,3 +587,170 @@ func (opt traceOption) ApplyTableOption(opts *Options) {
func WithTrace(t trace.Table) traceOption { //nolint:gocritic
return traceOption{t: &t}
}

type BulkUpsertData interface {
ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error)
}

type bulkUpsertRows struct {
rows value.Value
}

func (data bulkUpsertRows) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) {
return &Ydb_Table.BulkUpsertRequest{
Table: tableName,
Rows: value.ToYDB(data.rows, a),
}, nil
}

func BulkUpsertDataRows(rows value.Value) bulkUpsertRows {
return bulkUpsertRows{
rows: rows,
}
}

type bulkUpsertCsv struct {
data []byte
opts []csvFormatOption
}

type csvFormatOption interface {
applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) (err error)
}

func (data bulkUpsertCsv) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) {
var (
request = &Ydb_Table.BulkUpsertRequest{
Table: tableName,
Data: data.data,
}
dataFormat = &Ydb_Table.BulkUpsertRequest_CsvSettings{
CsvSettings: &Ydb_Formats.CsvSettings{},
}
)

for _, opt := range data.opts {
if opt != nil {
if err := opt.applyCsvFormatOption(dataFormat); err != nil {
return nil, err
}
}
}

request.DataFormat = dataFormat

return request, nil
}

func BulkUpsertDataCsv(data []byte, opts ...csvFormatOption) bulkUpsertCsv {
return bulkUpsertCsv{
data: data,
opts: opts,
}
}

type csvHeaderOption struct{}

func (opt *csvHeaderOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error {
dataFormat.CsvSettings.Header = true

return nil
}

// First not skipped line is a CSV header (list of column names).
func WithCsvHeader() csvFormatOption {
return &csvHeaderOption{}
}

type csvNullValueOption []byte

func (nullValue csvNullValueOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error {
dataFormat.CsvSettings.NullValue = nullValue

return nil
}

// String value that would be interpreted as NULL.
func WithCsvNullValue(value []byte) csvFormatOption {
return csvNullValueOption(value)
}

type csvDelimiterOption []byte

func (delimeter csvDelimiterOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error {
dataFormat.CsvSettings.Delimiter = delimeter

return nil
}

// Fields delimiter in CSV file. It's "," if not set.
func WithCsvDelimiter(value []byte) csvFormatOption {
return csvDelimiterOption(value)
}

type csvSkipRowsOption uint32

func (skipRows csvSkipRowsOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error {
dataFormat.CsvSettings.SkipRows = uint32(skipRows)

return nil
}

// Number of rows to skip before CSV data. It should be present only in the first upsert of CSV file.
func WithCsvSkipRows(skipRows uint32) csvFormatOption {
return csvSkipRowsOption(skipRows)
}

type bulkUpsertArrow struct {
data []byte
opts []arrowFormatOption
}

type arrowFormatOption interface {
applyArrowFormatOption(req *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings) (err error)
}

func (data bulkUpsertArrow) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) {
var (
request = &Ydb_Table.BulkUpsertRequest{
Table: tableName,
Data: data.data,
}
dataFormat = &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{
ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{},
}
)

for _, opt := range data.opts {
if opt != nil {
if err := opt.applyArrowFormatOption(dataFormat); err != nil {
return nil, err
}
}
}

request.DataFormat = dataFormat

return request, nil
}

func BulkUpsertDataArrow(data []byte, opts ...arrowFormatOption) bulkUpsertArrow {
return bulkUpsertArrow{
data: data,
opts: opts,
}
}

type arrowSchemaOption []byte

func (schema arrowSchemaOption) applyArrowFormatOption(
dataFormat *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings,
) error {
dataFormat.ArrowBatchSettings.Schema = schema

return nil
}

func WithArrowSchema(schema []byte) arrowFormatOption {
return arrowSchemaOption(schema)
}
Loading
Loading