From 69c3ad1dbd55a574ee35dc9e94eb96f06d249438 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Sun, 6 Oct 2024 15:02:07 +0300 Subject: [PATCH] replaced ensure... to explicit constructors of entities --- internal/table/client.go | 34 +++++--- table/table.go | 171 ++++++++++++++++----------------------- 2 files changed, 90 insertions(+), 115 deletions(-) diff --git a/internal/table/client.go b/internal/table/client.go index bc98e66ef..55e2af776 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -5,7 +5,6 @@ import ( "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table" "google.golang.org/grpc" "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" @@ -286,35 +285,44 @@ func (c *Client) BulkUpsert( a := allocator.New() defer a.Free() - config := c.retryOptions(opts...) - config.RetryOptions = append(config.RetryOptions, retry.WithIdempotent(true)) + attempts, config := 0, c.retryOptions(opts...) + config.RetryOptions = append(config.RetryOptions, + retry.WithIdempotent(true), + retry.WithTrace(&trace.Retry{ + OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) { + return func(info trace.RetryLoopDoneInfo) { + attempts = info.Attempts + } + }, + }), + ) - attempts, onDone := 0, trace.TableOnBulkUpsert(config.Trace, &ctx, + onDone := trace.TableOnBulkUpsert(config.Trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Client).BulkUpsert"), ) defer func() { onDone(finalErr, attempts) }() - request := Ydb_Table.BulkUpsertRequest{ - Table: tableName, - } - finalErr = data.ApplyBulkUpsertRequest(a, (*table.BulkUpsertRequest)(&request)) - if finalErr != nil { - return finalErr + request, err := data.ToYDB(a, tableName) + if err != nil { + return xerrors.WithStackTrace(err) } - finalErr = retry.Retry(ctx, + err = retry.Retry(ctx, func(ctx context.Context) (err error) { attempts++ - _, err = c.client.BulkUpsert(ctx, &request) + _, err = c.client.BulkUpsert(ctx, request) return err }, config.RetryOptions..., ) + if err != nil { + return xerrors.WithStackTrace(err) + } - return xerrors.WithStackTrace(finalErr) + return nil } func executeTxOperation(ctx context.Context, c *Client, op table.TxOperation, tx table.Transaction) (err error) { diff --git a/table/table.go b/table/table.go index 51b80ff32..b2a149cb7 100644 --- a/table/table.go +++ b/table/table.go @@ -593,82 +593,70 @@ type ( ) type BulkUpsertData interface { - ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error + ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) } type bulkUpsertRows struct { - Rows value.Value + rows value.Value } -func (data bulkUpsertRows) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { - req.Rows = value.ToYDB(data.Rows, a) - - return nil +func (data bulkUpsertRows) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) { + return &Ydb_Table.BulkUpsertRequest{ + Table: tableName, + Rows: value.ToYDB(data.rows, a), + }, nil } func BulkUpsertDataRows(rows value.Value) bulkUpsertRows { return bulkUpsertRows{ - Rows: rows, + rows: rows, } } type bulkUpsertCsv struct { - Data []byte - Options []csvFormatOption + data []byte + opts []csvFormatOption } type csvFormatOption interface { - ApplyCsvFormatOption(req *BulkUpsertRequest) (err error) + applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) (err error) } -func (data bulkUpsertCsv) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { - req.Data = data.Data +func (data bulkUpsertCsv) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) { + var ( + request = &Ydb_Table.BulkUpsertRequest{ + Table: tableName, + Data: data.data, + } + dataFormat = &Ydb_Table.BulkUpsertRequest_CsvSettings{ + CsvSettings: &Ydb_Formats.CsvSettings{}, + } + ) - var err error - for _, opt := range data.Options { + for _, opt := range data.opts { if opt != nil { - err = opt.ApplyCsvFormatOption(req) - if err != nil { - return err + if err := opt.applyCsvFormatOption(dataFormat); err != nil { + return nil, err } } } - return err + request.DataFormat = dataFormat + + return request, nil } func BulkUpsertDataCsv(data []byte, opts ...csvFormatOption) bulkUpsertCsv { return bulkUpsertCsv{ - Data: data, - Options: opts, - } -} - -func ensureCsvDataFormatSettings(req *BulkUpsertRequest) (format *Ydb_Formats.CsvSettings) { - if settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_CsvSettings); ok { - if settings.CsvSettings == nil { - settings.CsvSettings = &Ydb_Formats.CsvSettings{} - } - - return settings.CsvSettings + data: data, + opts: opts, } - - req.DataFormat = &Ydb_Table.BulkUpsertRequest_CsvSettings{ - CsvSettings: &Ydb_Formats.CsvSettings{}, - } - - settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_CsvSettings) - if !ok { - return nil - } - - return settings.CsvSettings } type csvHeaderOption struct{} -func (opt *csvHeaderOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error { - ensureCsvDataFormatSettings(req).Header = true +func (opt *csvHeaderOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error { + dataFormat.CsvSettings.Header = true return nil } @@ -678,114 +666,93 @@ func WithCsvHeader() csvFormatOption { return &csvHeaderOption{} } -type csvNullValueOption struct { - Value []byte -} +type csvNullValueOption []byte -func (opt *csvNullValueOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error { - ensureCsvDataFormatSettings(req).NullValue = opt.Value +func (nullValue csvNullValueOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error { + dataFormat.CsvSettings.NullValue = nullValue return nil } // String value that would be interpreted as NULL. func WithCsvNullValue(value []byte) csvFormatOption { - return &csvNullValueOption{value} + return csvNullValueOption(value) } -type csvDelimiterOption struct { - Value []byte -} +type csvDelimiterOption []byte -func (opt *csvDelimiterOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error { - ensureCsvDataFormatSettings(req).Delimiter = opt.Value +func (delimeter csvDelimiterOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error { + dataFormat.CsvSettings.Delimiter = delimeter return nil } // Fields delimiter in CSV file. It's "," if not set. func WithCsvDelimiter(value []byte) csvFormatOption { - return &csvDelimiterOption{value} + return csvDelimiterOption(value) } -type csvSkipRowsOption struct { - Count uint32 -} +type csvSkipRowsOption uint32 -func (opt *csvSkipRowsOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error { - ensureCsvDataFormatSettings(req).SkipRows = opt.Count +func (skipRows csvSkipRowsOption) applyCsvFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_CsvSettings) error { + dataFormat.CsvSettings.SkipRows = uint32(skipRows) return nil } // Number of rows to skip before CSV data. It should be present only in the first upsert of CSV file. -func WithCsvSkipRows(count uint32) csvFormatOption { - return &csvSkipRowsOption{count} +func WithCsvSkipRows(skipRows uint32) csvFormatOption { + return csvSkipRowsOption(skipRows) } type bulkUpsertArrow struct { - Data []byte - Options []arrowFormatOption + data []byte + opts []arrowFormatOption } type arrowFormatOption interface { - ApplyArrowFormatOption(req *BulkUpsertRequest) (err error) + applyArrowFormatOption(req *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings) (err error) } -func (data bulkUpsertArrow) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error { - req.Data = data.Data +func (data bulkUpsertArrow) ToYDB(a *allocator.Allocator, tableName string) (*Ydb_Table.BulkUpsertRequest, error) { + var ( + request = &Ydb_Table.BulkUpsertRequest{ + Table: tableName, + Data: data.data, + } + dataFormat = &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{ + ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{}, + } + ) - var err error - for _, opt := range data.Options { + for _, opt := range data.opts { if opt != nil { - err = opt.ApplyArrowFormatOption(req) - if err != nil { - return err + if err := opt.applyArrowFormatOption(dataFormat); err != nil { + return nil, err } } } - return err + request.DataFormat = dataFormat + + return request, nil } func BulkUpsertDataArrow(data []byte, opts ...arrowFormatOption) bulkUpsertArrow { return bulkUpsertArrow{ - Data: data, - Options: opts, - } -} - -func ensureArrowDataFormatSettings(req *BulkUpsertRequest) (format *Ydb_Formats.ArrowBatchSettings) { - if settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_ArrowBatchSettings); ok { - if settings.ArrowBatchSettings == nil { - settings.ArrowBatchSettings = &Ydb_Formats.ArrowBatchSettings{} - } - - return settings.ArrowBatchSettings + data: data, + opts: opts, } - - req.DataFormat = &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{ - ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{}, - } - - settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_ArrowBatchSettings) - if !ok { - return nil - } - - return settings.ArrowBatchSettings } -type arrowSchemaOption struct { - Schema []byte -} +type arrowSchemaOption []byte -func (opt *arrowSchemaOption) ApplyArrowFormatOption(req *BulkUpsertRequest) error { - ensureArrowDataFormatSettings(req).Schema = opt.Schema +func (schema arrowSchemaOption) applyArrowFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings) error { + dataFormat.ArrowBatchSettings.Schema = schema return nil } func WithArrowSchema(schema []byte) arrowFormatOption { - return &arrowSchemaOption{schema} + return arrowSchemaOption(schema) }