From a04d2be285d605ac19d8c88d0a7596d3d024c0a4 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Fri, 1 Nov 2024 18:21:07 +0100 Subject: [PATCH] fix --- internal/conn/conn.go | 10 ++++++++++ internal/query/client.go | 15 ++------------- internal/table/client.go | 9 +-------- internal/table/retry.go | 2 +- retry/retry.go | 17 ++++++++--------- 5 files changed, 22 insertions(+), 31 deletions(-) diff --git a/internal/conn/conn.go b/internal/conn/conn.go index 438fd2c59..8ac68efe9 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -604,6 +604,16 @@ func newConn(e endpoint.Endpoint, config Config, opts ...option) *conn { return c } +func ModifyConn(cc grpc.ClientConnInterface, nodeID uint32) grpc.ClientConnInterface { + if nodeID != 0 { + return conn.WithContextModifier(cc, func(ctx context.Context) context.Context { + return balancerContext.WithNodeID(ctx, nodeID) + }) + } else { + return cc + } +} + func New(e endpoint.Endpoint, config Config, opts ...option) Conn { return newConn(e, config, opts...) } diff --git a/internal/query/client.go b/internal/query/client.go index a6bb7700d..080a67789 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -12,7 +12,6 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" "github.com/ydb-platform/ydb-go-sdk/v3/internal/conn" - balancerContext "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" "github.com/ydb-platform/ydb-go-sdk/v3/internal/operation" "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" @@ -41,7 +40,7 @@ type ( closer.Closer Stats() pool.Stats - With(ctx context.Context, f func(ctx context.Context, s *Session) error, preferredNodeID uint32, opts ...retry.Option) error + With(ctx context.Context, f func(context.Context, *Session) error, nodeID uint32, opts ...retry.Option) error } Client struct { config *config.Config @@ -514,16 +513,6 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options return nil } -func modifyConn(cc grpc.ClientConnInterface, nodeID uint32) grpc.ClientConnInterface { - if nodeID != 0 { - return conn.WithContextModifier(cc, func(ctx context.Context) context.Context { - return balancerContext.WithNodeID(ctx, nodeID) - }) - } else { - return cc - } -} - func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *Client { onDone := trace.QueryOnNew(cfg.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.New"), @@ -556,7 +545,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) * defer cancelCreate() s, err := createSession(createCtx, client, - session.WithConn(modifyConn(cc, nodeID)), + session.WithConn(conn.ModifyConn(cc, nodeID)), session.WithDeleteTimeout(cfg.SessionDeleteTimeout()), session.WithTrace(cfg.Trace()), ) diff --git a/internal/table/client.go b/internal/table/client.go index 5142402b4..8b4d786e1 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -2,14 +2,12 @@ package table import ( "context" - "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1" "google.golang.org/grpc" "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/conn" - balancerContext "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" @@ -43,12 +41,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config pool.WithCloseItemTimeout[*session, session](config.DeleteTimeout()), pool.WithClock[*session, session](config.Clock()), pool.WithCreateItemFunc[*session, session](func(ctx context.Context, nodeId uint32) (*session, error) { - if nodeId != 0 { - cc = conn.WithContextModifier(cc, func(ctx context.Context) context.Context { - return balancerContext.WithNodeID(ctx, nodeId) - }) - } - return newSession(ctx, cc, config) + return newSession(ctx, conn.ModifyConn(cc, nodeId), config) }), pool.WithTrace[*session, session](&pool.Trace{ OnNew: func(ctx *context.Context, call stack.Caller) func(limit int) { diff --git a/internal/table/retry.go b/internal/table/retry.go index dfa012852..1e4d52468 100644 --- a/internal/table/retry.go +++ b/internal/table/retry.go @@ -18,7 +18,7 @@ type sessionPool interface { closer.Closer Stats() pool.Stats - With(ctx context.Context, f func(ctx context.Context, s *session) error, preferredNodeID uint32, opts ...retry.Option) error + With(ctx context.Context, f func(ctx context.Context, s *session) error, nodeID uint32, opts ...retry.Option) error } func do( diff --git a/retry/retry.go b/retry/retry.go index 3b66ef5aa..4394b6146 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -19,15 +19,14 @@ import ( type retryOperation func(context.Context) (err error) type retryOptions struct { - label string - call call - trace *trace.Retry - idempotent bool - stackTrace bool - fastBackoff backoff.Backoff - slowBackoff backoff.Backoff - budget budget.Budget - preferredNodeID uint32 + label string + call call + trace *trace.Retry + idempotent bool + stackTrace bool + fastBackoff backoff.Backoff + slowBackoff backoff.Backoff + budget budget.Budget panicCallback func(e interface{}) }