From 0ea4a311d119333a4eccdb0d6b555dd40d50221a Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Sun, 6 Oct 2024 15:20:16 +0300 Subject: [PATCH] splitted trace.BulkUpsert{Start,Done}Info for client and session --- internal/table/client.go | 6 +++--- internal/table/session.go | 2 +- table/table.go | 4 +++- trace/table.go | 15 ++++++++++++++- trace/table_gtrace.go | 21 ++++++++++----------- 5 files changed, 31 insertions(+), 17 deletions(-) diff --git a/internal/table/client.go b/internal/table/client.go index 55e2af776..e463eba16 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -29,7 +29,6 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config return &Client{ clock: config.Clock(), config: config, - client: Ydb_Table_V1.NewTableServiceClient(cc), cc: cc, build: func(ctx context.Context) (s *session, err error) { return newSession(ctx, cc, config) @@ -89,7 +88,6 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config type Client struct { // read-only fields config *config.Config - client Ydb_Table_V1.TableServiceClient build sessionBuilder cc grpc.ClientConnInterface clock clockwork.Clock @@ -309,10 +307,12 @@ func (c *Client) BulkUpsert( return xerrors.WithStackTrace(err) } + client := Ydb_Table_V1.NewTableServiceClient(c.cc) + err = retry.Retry(ctx, func(ctx context.Context) (err error) { attempts++ - _, err = c.client.BulkUpsert(ctx, request) + _, err = client.BulkUpsert(ctx, request) return err }, diff --git a/internal/table/session.go b/internal/table/session.go index c6b23c335..9acbe5de3 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -1265,7 +1265,7 @@ func (s *session) BulkUpsert(ctx context.Context, table string, rows value.Value ) defer func() { defer a.Free() - onDone(err, 1) + onDone(err) }() for _, opt := range opts { diff --git a/table/table.go b/table/table.go index 231d2a426..961a994c3 100644 --- a/table/table.go +++ b/table/table.go @@ -743,7 +743,9 @@ func BulkUpsertDataArrow(data []byte, opts ...arrowFormatOption) bulkUpsertArrow type arrowSchemaOption []byte -func (schema arrowSchemaOption) applyArrowFormatOption(dataFormat *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings) error { +func (schema arrowSchemaOption) applyArrowFormatOption( + dataFormat *Ydb_Table.BulkUpsertRequest_ArrowBatchSettings, +) error { dataFormat.ArrowBatchSettings.Schema = schema return nil diff --git a/trace/table.go b/trace/table.go index 3e378d3ba..fe0574215 100644 --- a/trace/table.go +++ b/trace/table.go @@ -35,7 +35,7 @@ type ( OnSessionKeepAlive func(TableKeepAliveStartInfo) func(TableKeepAliveDoneInfo) // Query events // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals - OnSessionBulkUpsert func(TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) + OnSessionBulkUpsert func(TableSessionBulkUpsertStartInfo) func(TableSessionBulkUpsertDoneInfo) // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnSessionQueryPrepare func(TablePrepareDataQueryStartInfo) func(TablePrepareDataQueryDoneInfo) // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals @@ -151,6 +151,19 @@ type ( Attempts int } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + TableSessionBulkUpsertStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + TableSessionBulkUpsertDoneInfo struct { + Error error + } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TableSessionDeleteStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. diff --git a/trace/table_gtrace.go b/trace/table_gtrace.go index ee982df07..a9e6127fb 100644 --- a/trace/table_gtrace.go +++ b/trace/table_gtrace.go @@ -351,7 +351,7 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { { h1 := t.OnSessionBulkUpsert h2 := x.OnSessionBulkUpsert - ret.OnSessionBulkUpsert = func(t TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) { + ret.OnSessionBulkUpsert = func(t TableSessionBulkUpsertStartInfo) func(TableSessionBulkUpsertDoneInfo) { if options.panicCallback != nil { defer func() { if e := recover(); e != nil { @@ -359,14 +359,14 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { } }() } - var r, r1 func(TableBulkUpsertDoneInfo) + var r, r1 func(TableSessionBulkUpsertDoneInfo) if h1 != nil { r = h1(t) } if h2 != nil { r1 = h2(t) } - return func(t TableBulkUpsertDoneInfo) { + return func(t TableSessionBulkUpsertDoneInfo) { if options.panicCallback != nil { defer func() { if e := recover(); e != nil { @@ -1067,16 +1067,16 @@ func (t *Table) onSessionKeepAlive(t1 TableKeepAliveStartInfo) func(TableKeepAli } return res } -func (t *Table) onSessionBulkUpsert(t1 TableBulkUpsertStartInfo) func(TableBulkUpsertDoneInfo) { +func (t *Table) onSessionBulkUpsert(t1 TableSessionBulkUpsertStartInfo) func(TableSessionBulkUpsertDoneInfo) { fn := t.OnSessionBulkUpsert if fn == nil { - return func(TableBulkUpsertDoneInfo) { + return func(TableSessionBulkUpsertDoneInfo) { return } } res := fn(t1) if res == nil { - return func(TableBulkUpsertDoneInfo) { + return func(TableSessionBulkUpsertDoneInfo) { return } } @@ -1436,15 +1436,14 @@ func TableOnSessionKeepAlive(t *Table, c *context.Context, call call, session se } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TableOnSessionBulkUpsert(t *Table, c *context.Context, call call) func(_ error, attempts int) { - var p TableBulkUpsertStartInfo +func TableOnSessionBulkUpsert(t *Table, c *context.Context, call call) func(error) { + var p TableSessionBulkUpsertStartInfo p.Context = c p.Call = call res := t.onSessionBulkUpsert(p) - return func(e error, attempts int) { - var p TableBulkUpsertDoneInfo + return func(e error) { + var p TableSessionBulkUpsertDoneInfo p.Error = e - p.Attempts = attempts res(p) } }