From f400f655a3faa542b8b6df27618f59b95cf35b83 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 2 May 2024 17:59:04 +0300 Subject: [PATCH 1/3] * Added `trace.DriverConnStreamEvents` details bit --- CHANGELOG.md | 2 ++ log/driver.go | 8 ++++---- metrics/driver.go | 2 +- trace/details.go | 3 +++ 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8aad167ba..3d90c0aa3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added `trace.DriverConnStreamEvents` details bit + ## v3.66.1 * Added flush messages from buffer before close topic writer * Added Flush method for topic writer diff --git a/log/driver.go b/log/driver.go index fb9c8b1a1..7972d0f07 100644 --- a/log/driver.go +++ b/log/driver.go @@ -216,7 +216,7 @@ func internalDriver(l Logger, d trace.Detailer) trace.Driver { //nolint:gocyclo ) func( trace.DriverConnNewStreamDoneInfo, ) { - if d.Details()&trace.DriverConnEvents == 0 { + if d.Details()&trace.DriverConnStreamEvents == 0 { return nil } ctx := with(*info.Context, TRACE, "ydb", "driver", "conn", "stream", "New") @@ -249,7 +249,7 @@ func internalDriver(l Logger, d trace.Detailer) trace.Driver { //nolint:gocyclo OnConnStreamCloseSend: func(info trace.DriverConnStreamCloseSendStartInfo) func( trace.DriverConnStreamCloseSendDoneInfo, ) { - if d.Details()&trace.DriverConnEvents == 0 { + if d.Details()&trace.DriverConnStreamEvents == 0 { return nil } ctx := with(*info.Context, TRACE, "ydb", "driver", "conn", "stream", "CloseSend") @@ -271,7 +271,7 @@ func internalDriver(l Logger, d trace.Detailer) trace.Driver { //nolint:gocyclo } }, OnConnStreamSendMsg: func(info trace.DriverConnStreamSendMsgStartInfo) func(trace.DriverConnStreamSendMsgDoneInfo) { - if d.Details()&trace.DriverConnEvents == 0 { + if d.Details()&trace.DriverConnStreamEvents == 0 { return nil } ctx := with(*info.Context, TRACE, "ydb", "driver", "conn", "stream", "SendMsg") @@ -293,7 +293,7 @@ func internalDriver(l Logger, d trace.Detailer) trace.Driver { //nolint:gocyclo } }, OnConnStreamRecvMsg: func(info trace.DriverConnStreamRecvMsgStartInfo) func(trace.DriverConnStreamRecvMsgDoneInfo) { - if d.Details()&trace.DriverConnEvents == 0 { + if d.Details()&trace.DriverConnStreamEvents == 0 { return nil } ctx := with(*info.Context, TRACE, "ydb", "driver", "conn", "stream", "RecvMsg") diff --git a/metrics/driver.go b/metrics/driver.go index b477f8f3d..d6fbed12d 100644 --- a/metrics/driver.go +++ b/metrics/driver.go @@ -55,7 +55,7 @@ func driver(config Config) (t trace.Driver) { ) return func(info trace.DriverConnNewStreamDoneInfo) { - if config.Details()&trace.DriverConnEvents != 0 { + if config.Details()&trace.DriverConnStreamEvents != 0 { requests.With(map[string]string{ "status": errorBrief(info.Error), "method": string(method), diff --git a/trace/details.go b/trace/details.go index 124885135..a60fdb6d2 100644 --- a/trace/details.go +++ b/trace/details.go @@ -31,6 +31,7 @@ func (d Details) String() string { const ( DriverNetEvents Details = 1 << iota // for bitmask: 1, 2, 4, 8, 16, 32, ... DriverConnEvents + DriverConnStreamEvents DriverBalancerEvents DriverResolverEvents DriverRepeaterEvents @@ -79,6 +80,7 @@ const ( CoordinationEvents DriverEvents = DriverConnEvents | + DriverConnStreamEvents | DriverBalancerEvents | DriverResolverEvents | DriverRepeaterEvents | @@ -128,6 +130,7 @@ var ( DriverResolverEvents: "ydb.driver.resolver", DriverRepeaterEvents: "ydb.driver.repeater", DriverConnEvents: "ydb.driver.conn", + DriverConnStreamEvents: "ydb.driver.conn.stream", DriverCredentialsEvents: "ydb.driver.credentials", DiscoveryEvents: "ydb.discovery", From f887b4ddcc73d3e035b72b800e8d75d99712cd8b Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 2 May 2024 18:22:56 +0300 Subject: [PATCH 2/3] * Added `trace.Driver.OnConnStreamFinish` event --- CHANGELOG.md | 1 + internal/conn/conn.go | 55 ++++++++--------- internal/conn/grpc_client_stream.go | 81 +++++++++++++++---------- internal/xcontext/cancels_quard.go | 26 ++++---- internal/xcontext/cancels_quard_test.go | 13 ++-- trace/driver.go | 12 ++++ trace/driver_gtrace.go | 59 ++++++++++++++++++ 7 files changed, 167 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d90c0aa3..16dac2391 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ * Added `trace.DriverConnStreamEvents` details bit +* Added `trace.Driver.OnConnStreamFinish` event ## v3.66.1 * Added flush messages from buffer before close topic writer diff --git a/internal/conn/conn.go b/internal/conn/conn.go index d7e05bd42..a31e8e195 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -51,7 +51,7 @@ type Conn interface { type conn struct { mtx sync.RWMutex config Config // ro access - cc *grpc.ClientConn + grpcConn *grpc.ClientConn done chan struct{} endpoint endpoint.Endpoint // ro access closed bool @@ -121,7 +121,7 @@ func (c *conn) park(ctx context.Context) (err error) { return nil } - if c.cc == nil { + if c.grpcConn == nil { return nil } @@ -161,7 +161,7 @@ func (c *conn) setState(ctx context.Context, s State) State { func (c *conn) Unban(ctx context.Context) State { var newState State c.mtx.RLock() - cc := c.cc + cc := c.grpcConn c.mtx.RUnlock() if isAvailable(cc) { newState = Online @@ -186,8 +186,8 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) { c.mtx.Lock() defer c.mtx.Unlock() - if c.cc != nil { - return c.cc, nil + if c.grpcConn != nil { + return c.grpcConn, nil } if dialTimeout := c.config.DialTimeout(); dialTimeout > 0 { @@ -234,10 +234,10 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) { ) } - c.cc = cc + c.grpcConn = cc c.setState(ctx, Online) - return c.cc, nil + return c.grpcConn, nil } func (c *conn) onTransportError(ctx context.Context, cause error) { @@ -252,11 +252,11 @@ func isAvailable(raw *grpc.ClientConn) bool { // conn must be locked func (c *conn) close(ctx context.Context) (err error) { - if c.cc == nil { + if c.grpcConn == nil { return nil } - err = c.cc.Close() - c.cc = nil + err = c.grpcConn.Close() + c.grpcConn = nil c.setState(ctx, Offline) return c.wrapError(err) @@ -423,19 +423,23 @@ func (c *conn) NewStream( ctx, sentMark := markContext(meta.WithTraceID(ctx, traceID)) - ctx, cancel := xcontext.WithCancel(ctx) + ctx, cancel := c.childStreams.WithCancel(ctx) defer func() { if finalErr != nil { cancel() - } else { - c.childStreams.Remember(&cancel) } }() - s, err := cc.NewStream(ctx, desc, method, append(opts, grpc.OnFinish(func(err error) { - cancel() - c.childStreams.Forget(&cancel) - }))...) + s := &grpcClientStream{ + parentConn: c, + streamCtx: ctx, + streamCancel: cancel, + wrapping: useWrapping, + traceID: traceID, + sentMark: sentMark, + } + + s.stream, err = cc.NewStream(ctx, desc, method, append(opts, grpc.OnFinish(s.finish))...) if err != nil { if xerrors.IsContextError(err) { return nil, xerrors.WithStackTrace(err) @@ -451,25 +455,16 @@ func (c *conn) NewStream( xerrors.WithTraceID(traceID), ) if sentMark.canRetry() { - return s, c.wrapError(xerrors.Retryable(err, xerrors.WithName("NewStream"))) + return nil, c.wrapError(xerrors.Retryable(err, xerrors.WithName("NewStream"))) } - return s, c.wrapError(err) + return nil, c.wrapError(err) } - return s, err + return nil, err } - return &grpcClientStream{ - ClientStream: s, - c: c, - wrapping: useWrapping, - traceID: traceID, - sentMark: sentMark, - onDone: func(ctx context.Context, md metadata.MD) { - meta.CallTrailerCallback(ctx, md) - }, - }, nil + return s, nil } func (c *conn) wrapError(err error) error { diff --git a/internal/conn/grpc_client_stream.go b/internal/conn/grpc_client_stream.go index ea7c08cfe..8cf4f7cba 100644 --- a/internal/conn/grpc_client_stream.go +++ b/internal/conn/grpc_client_stream.go @@ -8,6 +8,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/meta" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/wrap" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -15,18 +16,31 @@ import ( ) type grpcClientStream struct { - grpc.ClientStream - c *conn - wrapping bool - traceID string - sentMark *modificationMark - onDone func(ctx context.Context, md metadata.MD) + parentConn *conn + stream grpc.ClientStream + streamCtx context.Context //nolint:containedctx + streamCancel context.CancelFunc + wrapping bool + traceID string + sentMark *modificationMark +} + +func (s *grpcClientStream) Header() (metadata.MD, error) { + return s.stream.Header() +} + +func (s *grpcClientStream) Trailer() metadata.MD { + return s.stream.Trailer() +} + +func (s *grpcClientStream) Context() context.Context { + return s.stream.Context() } func (s *grpcClientStream) CloseSend() (err error) { var ( - ctx = s.Context() - onDone = trace.DriverOnConnStreamCloseSend(s.c.config.Trace(), &ctx, + ctx = s.streamCtx + onDone = trace.DriverOnConnStreamCloseSend(s.parentConn.config.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).CloseSend"), ) ) @@ -34,10 +48,10 @@ func (s *grpcClientStream) CloseSend() (err error) { onDone(err) }() - stop := s.c.lastUsage.Start() + stop := s.parentConn.lastUsage.Start() defer stop() - err = s.ClientStream.CloseSend() + err = s.stream.CloseSend() if err != nil { if xerrors.IsContextError(err) { @@ -48,7 +62,7 @@ func (s *grpcClientStream) CloseSend() (err error) { return s.wrapError( xerrors.Transport( err, - xerrors.WithAddress(s.c.Address()), + xerrors.WithAddress(s.parentConn.Address()), xerrors.WithTraceID(s.traceID), ), ) @@ -62,8 +76,8 @@ func (s *grpcClientStream) CloseSend() (err error) { func (s *grpcClientStream) SendMsg(m interface{}) (err error) { var ( - ctx = s.Context() - onDone = trace.DriverOnConnStreamSendMsg(s.c.config.Trace(), &ctx, + ctx = s.streamCtx + onDone = trace.DriverOnConnStreamSendMsg(s.parentConn.config.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).SendMsg"), ) ) @@ -71,10 +85,10 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) { onDone(err) }() - stop := s.c.lastUsage.Start() + stop := s.parentConn.lastUsage.Start() defer stop() - err = s.ClientStream.SendMsg(m) + err = s.stream.SendMsg(m) if err != nil { if xerrors.IsContextError(err) { @@ -82,12 +96,12 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) { } defer func() { - s.c.onTransportError(ctx, err) + s.parentConn.onTransportError(ctx, err) }() if s.wrapping { err = xerrors.Transport(err, - xerrors.WithAddress(s.c.Address()), + xerrors.WithAddress(s.parentConn.Address()), xerrors.WithTraceID(s.traceID), ) if s.sentMark.canRetry() { @@ -105,28 +119,31 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) { return nil } +func (s *grpcClientStream) finish(err error) { + s.streamCancel() + trace.DriverOnConnStreamFinish(s.parentConn.config.Trace(), s.streamCtx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).finish"), err, + ) +} + func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { var ( - ctx = s.Context() - onDone = trace.DriverOnConnStreamRecvMsg(s.c.config.Trace(), &ctx, + ctx = s.streamCtx + onDone = trace.DriverOnConnStreamRecvMsg(s.parentConn.config.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).RecvMsg"), ) ) defer func() { onDone(err) - }() - - stop := s.c.lastUsage.Start() - defer stop() - - defer func() { if err != nil { - md := s.ClientStream.Trailer() - s.onDone(ctx, md) + meta.CallTrailerCallback(s.streamCtx, s.stream.Trailer()) } }() - err = s.ClientStream.RecvMsg(m) + stop := s.parentConn.lastUsage.Start() + defer stop() + + err = s.stream.RecvMsg(m) if err != nil { //nolint:nestif if xerrors.IsContextError(err) { @@ -135,13 +152,13 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { defer func() { if !xerrors.Is(err, io.EOF) { - s.c.onTransportError(ctx, err) + s.parentConn.onTransportError(ctx, err) } }() if s.wrapping { err = xerrors.Transport(err, - xerrors.WithAddress(s.c.Address()), + xerrors.WithAddress(s.parentConn.Address()), ) if s.sentMark.canRetry() { return s.wrapError(xerrors.Retryable(err, @@ -161,7 +178,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { return s.wrapError( xerrors.Operation( xerrors.FromOperation(operation), - xerrors.WithAddress(s.c.Address()), + xerrors.WithAddress(s.parentConn.Address()), ), ) } @@ -177,7 +194,7 @@ func (s *grpcClientStream) wrapError(err error) error { } return xerrors.WithStackTrace( - newConnError(s.c.endpoint.NodeID(), s.c.endpoint.Address(), err), + newConnError(s.parentConn.endpoint.NodeID(), s.parentConn.endpoint.Address(), err), xerrors.WithSkipDepth(1), ) } diff --git a/internal/xcontext/cancels_quard.go b/internal/xcontext/cancels_quard.go index 7fd344696..b2743e656 100644 --- a/internal/xcontext/cancels_quard.go +++ b/internal/xcontext/cancels_quard.go @@ -5,10 +5,12 @@ import ( "sync" ) -type CancelsGuard struct { - mu sync.Mutex - cancels map[*context.CancelFunc]struct{} -} +type ( + CancelsGuard struct { + mu sync.Mutex + cancels map[*context.CancelFunc]struct{} + } +) func NewCancelsGuard() *CancelsGuard { return &CancelsGuard{ @@ -16,16 +18,18 @@ func NewCancelsGuard() *CancelsGuard { } } -func (g *CancelsGuard) Remember(cancel *context.CancelFunc) { +func (g *CancelsGuard) WithCancel(ctx context.Context) (context.Context, context.CancelFunc) { g.mu.Lock() defer g.mu.Unlock() - g.cancels[cancel] = struct{}{} -} + ctx, cancel := WithCancel(ctx) + g.cancels[&cancel] = struct{}{} -func (g *CancelsGuard) Forget(cancel *context.CancelFunc) { - g.mu.Lock() - defer g.mu.Unlock() - delete(g.cancels, cancel) + return ctx, func() { + cancel() + g.mu.Lock() + defer g.mu.Unlock() + delete(g.cancels, &cancel) + } } func (g *CancelsGuard) Cancel() { diff --git a/internal/xcontext/cancels_quard_test.go b/internal/xcontext/cancels_quard_test.go index 98c2faf2c..5b27a187d 100644 --- a/internal/xcontext/cancels_quard_test.go +++ b/internal/xcontext/cancels_quard_test.go @@ -9,16 +9,15 @@ import ( func TestCancelsGuard(t *testing.T) { g := NewCancelsGuard() - ctx, cancel1 := context.WithCancel(context.Background()) - g.Remember(&cancel1) + ctx, cancel1 := g.WithCancel(context.Background()) require.Len(t, g.cancels, 1) - g.Forget(&cancel1) + cancel1() + require.Error(t, ctx.Err()) require.Empty(t, g.cancels, 0) - cancel2 := context.CancelFunc(func() { - cancel1() - }) - g.Remember(&cancel2) + ctx, _ = g.WithCancel(context.Background()) require.Len(t, g.cancels, 1) + ctx, _ = g.WithCancel(ctx) + require.Len(t, g.cancels, 2) g.Cancel() require.Error(t, ctx.Err()) } diff --git a/trace/driver.go b/trace/driver.go index 8cf5dedc3..31a55d088 100644 --- a/trace/driver.go +++ b/trace/driver.go @@ -48,6 +48,8 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnConnStreamCloseSend func(DriverConnStreamCloseSendStartInfo) func(DriverConnStreamCloseSendDoneInfo) // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + OnConnStreamFinish func(info DriverConnStreamFinishInfo) + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnConnDial func(DriverConnDialStartInfo) func(DriverConnDialDoneInfo) // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnConnBan func(DriverConnBanStartInfo) func(DriverConnBanDoneInfo) @@ -416,6 +418,16 @@ type ( Error error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + DriverConnStreamFinishInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context context.Context //nolint:containedctx + Call call + Error error + } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals DriverBalancerInitStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. diff --git a/trace/driver_gtrace.go b/trace/driver_gtrace.go index 50491225a..14df36f4b 100644 --- a/trace/driver_gtrace.go +++ b/trace/driver_gtrace.go @@ -453,6 +453,25 @@ func (t *Driver) Compose(x *Driver, opts ...DriverComposeOption) *Driver { } } } + { + h1 := t.OnConnStreamFinish + h2 := x.OnConnStreamFinish + ret.OnConnStreamFinish = func(info DriverConnStreamFinishInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(info) + } + if h2 != nil { + h2(info) + } + } + } { h1 := t.OnConnDial h2 := x.OnConnDial @@ -1055,6 +1074,13 @@ func (t *Driver) onConnStreamCloseSend(d DriverConnStreamCloseSendStartInfo) fun } return res } +func (t *Driver) onConnStreamFinish(info DriverConnStreamFinishInfo) { + fn := t.OnConnStreamFinish + if fn == nil { + return + } + fn(info) +} func (t *Driver) onConnDial(d DriverConnDialStartInfo) func(DriverConnDialDoneInfo) { fn := t.OnConnDial if fn == nil { @@ -1235,6 +1261,7 @@ func (t *Driver) onGetCredentials(d DriverGetCredentialsStartInfo) func(DriverGe } return res } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnInit(t *Driver, c *context.Context, call call, endpoint string, database string, secure bool) func(error) { var p DriverInitStartInfo @@ -1250,6 +1277,7 @@ func DriverOnInit(t *Driver, c *context.Context, call call, endpoint string, dat res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnWith(t *Driver, c *context.Context, call call, endpoint string, database string, secure bool) func(error) { var p DriverWithStartInfo @@ -1265,6 +1293,7 @@ func DriverOnWith(t *Driver, c *context.Context, call call, endpoint string, dat res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnClose(t *Driver, c *context.Context, call call) func(error) { var p DriverCloseStartInfo @@ -1277,6 +1306,7 @@ func DriverOnClose(t *Driver, c *context.Context, call call) func(error) { res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnPoolNew(t *Driver, c *context.Context, call call) func() { var p DriverConnPoolNewStartInfo @@ -1288,6 +1318,7 @@ func DriverOnPoolNew(t *Driver, c *context.Context, call call) func() { res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnPoolRelease(t *Driver, c *context.Context, call call) func(error) { var p DriverConnPoolReleaseStartInfo @@ -1300,6 +1331,7 @@ func DriverOnPoolRelease(t *Driver, c *context.Context, call call) func(error) { res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnResolve(t *Driver, call call, target string, resolved []string) func(error) { var p DriverResolveStartInfo @@ -1313,6 +1345,7 @@ func DriverOnResolve(t *Driver, call call, target string, resolved []string) fun res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnStateChange(t *Driver, c *context.Context, call call, endpoint EndpointInfo, state ConnState) func(state ConnState) { var p DriverConnStateChangeStartInfo @@ -1327,6 +1360,7 @@ func DriverOnConnStateChange(t *Driver, c *context.Context, call call, endpoint res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnInvoke(t *Driver, c *context.Context, call call, endpoint EndpointInfo, m Method) func(_ error, issues []Issue, opID string, state ConnState, metadata map[string][]string) { var p DriverConnInvokeStartInfo @@ -1345,6 +1379,7 @@ func DriverOnConnInvoke(t *Driver, c *context.Context, call call, endpoint Endpo res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnNewStream(t *Driver, c *context.Context, call call, endpoint EndpointInfo, m Method) func(_ error, state ConnState) { var p DriverConnNewStreamStartInfo @@ -1360,6 +1395,7 @@ func DriverOnConnNewStream(t *Driver, c *context.Context, call call, endpoint En res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnStreamRecvMsg(t *Driver, c *context.Context, call call) func(error) { var p DriverConnStreamRecvMsgStartInfo @@ -1372,6 +1408,7 @@ func DriverOnConnStreamRecvMsg(t *Driver, c *context.Context, call call) func(er res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnStreamSendMsg(t *Driver, c *context.Context, call call) func(error) { var p DriverConnStreamSendMsgStartInfo @@ -1384,6 +1421,7 @@ func DriverOnConnStreamSendMsg(t *Driver, c *context.Context, call call) func(er res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnStreamCloseSend(t *Driver, c *context.Context, call call) func(error) { var p DriverConnStreamCloseSendStartInfo @@ -1396,6 +1434,16 @@ func DriverOnConnStreamCloseSend(t *Driver, c *context.Context, call call) func( res(p) } } + +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals +func DriverOnConnStreamFinish(t *Driver, c context.Context, call call, e error) { + var p DriverConnStreamFinishInfo + p.Context = c + p.Call = call + p.Error = e + t.onConnStreamFinish(p) +} + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnDial(t *Driver, c *context.Context, call call, endpoint EndpointInfo) func(error) { var p DriverConnDialStartInfo @@ -1409,6 +1457,7 @@ func DriverOnConnDial(t *Driver, c *context.Context, call call, endpoint Endpoin res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnBan(t *Driver, c *context.Context, call call, endpoint EndpointInfo, state ConnState, cause error) func(state ConnState) { var p DriverConnBanStartInfo @@ -1424,6 +1473,7 @@ func DriverOnConnBan(t *Driver, c *context.Context, call call, endpoint Endpoint res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnAllow(t *Driver, c *context.Context, call call, endpoint EndpointInfo, state ConnState) func(state ConnState) { var p DriverConnAllowStartInfo @@ -1438,6 +1488,7 @@ func DriverOnConnAllow(t *Driver, c *context.Context, call call, endpoint Endpoi res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnPark(t *Driver, c *context.Context, call call, endpoint EndpointInfo) func(error) { var p DriverConnParkStartInfo @@ -1451,6 +1502,7 @@ func DriverOnConnPark(t *Driver, c *context.Context, call call, endpoint Endpoin res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnClose(t *Driver, c *context.Context, call call, endpoint EndpointInfo) func(error) { var p DriverConnCloseStartInfo @@ -1464,6 +1516,7 @@ func DriverOnConnClose(t *Driver, c *context.Context, call call, endpoint Endpoi res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnRepeaterWakeUp(t *Driver, c *context.Context, call call, name string, event string) func(error) { var p DriverRepeaterWakeUpStartInfo @@ -1478,6 +1531,7 @@ func DriverOnRepeaterWakeUp(t *Driver, c *context.Context, call call, name strin res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnBalancerInit(t *Driver, c *context.Context, call call, name string) func(error) { var p DriverBalancerInitStartInfo @@ -1491,6 +1545,7 @@ func DriverOnBalancerInit(t *Driver, c *context.Context, call call, name string) res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnBalancerClose(t *Driver, c *context.Context, call call) func(error) { var p DriverBalancerCloseStartInfo @@ -1503,6 +1558,7 @@ func DriverOnBalancerClose(t *Driver, c *context.Context, call call) func(error) res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnBalancerChooseEndpoint(t *Driver, c *context.Context, call call) func(endpoint EndpointInfo, _ error) { var p DriverBalancerChooseEndpointStartInfo @@ -1516,6 +1572,7 @@ func DriverOnBalancerChooseEndpoint(t *Driver, c *context.Context, call call) fu res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnBalancerClusterDiscoveryAttempt(t *Driver, c *context.Context, call call, address string) func(error) { var p DriverBalancerClusterDiscoveryAttemptStartInfo @@ -1529,6 +1586,7 @@ func DriverOnBalancerClusterDiscoveryAttempt(t *Driver, c *context.Context, call res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnBalancerUpdate(t *Driver, c *context.Context, call call, needLocalDC bool) func(endpoints []EndpointInfo, added []EndpointInfo, dropped []EndpointInfo, localDC string) { var p DriverBalancerUpdateStartInfo @@ -1545,6 +1603,7 @@ func DriverOnBalancerUpdate(t *Driver, c *context.Context, call call, needLocalD res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnGetCredentials(t *Driver, c *context.Context, call call) func(token string, _ error) { var p DriverGetCredentialsStartInfo From 36141fb35276aa92653a1df5fe14861a707ea33c Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 2 May 2024 21:13:43 +0300 Subject: [PATCH 3/3] prettify xtest grpc stream msgs --- internal/xtest/grpclogger.go | 41 ++++++++++++++++++++++-------------- trace/driver_gtrace.go | 25 ---------------------- 2 files changed, 25 insertions(+), 41 deletions(-) diff --git a/internal/xtest/grpclogger.go b/internal/xtest/grpclogger.go index 3506d4ccb..102eeb5d2 100644 --- a/internal/xtest/grpclogger.go +++ b/internal/xtest/grpclogger.go @@ -35,13 +35,11 @@ func (l GrpcLogger) UnaryClientInterceptor( opts ...grpc.CallOption, ) error { err := invoker(ctx, method, req, reply, cc, opts...) - l.t.Logf( - "UnaryClientInterceptor: %s - err: %v\n\nreq:\n%v\n\nresp:\n%v", - method, - err, - req, - reply, - ) + if err != nil { + l.t.Logf("UnaryClientInterceptor: %s - err: %v\n\nreq:\n%v\n\nresp:\n%v", method, err, req, reply) + } else { + l.t.Logf("UnaryClientInterceptor: %s:\n\nreq:\n%v\n\nresp:\n%v", method, req, reply) + } return err } @@ -59,12 +57,11 @@ func (l GrpcLogger) StreamClientInterceptor( if stream != nil { stream = streamWrapper } - l.t.Logf( - "StreamStart: %v with err '%v' (streamID: %v)", - method, - err, - streamWrapper.streamID, - ) + if err != nil { + l.t.Logf("StreamStart: %v with err '%v' (streamID: %v)", method, err, streamWrapper.streamID) + } else { + l.t.Logf("StreamStart: %v (streamID: %v)", method, streamWrapper.streamID) + } return stream, err } @@ -81,21 +78,33 @@ func newGrpcLoggerStream(stream grpc.ClientStream, t testing.TB) grpcLoggerStrea func (g grpcLoggerStream) CloseSend() error { err := g.ClientStream.CloseSend() - g.t.Logf("CloseSend: %v (streamID: %v)", err, g.streamID) + if err != nil { + g.t.Logf("CloseSend: %v (streamID: %v)", err, g.streamID) + } else { + g.t.Logf("CloseSend (streamID: %v)", g.streamID) + } return err } func (g grpcLoggerStream) SendMsg(m interface{}) error { err := g.ClientStream.SendMsg(m) - g.t.Logf("SendMsg (streamID: %v) with err '%v':\n%v ", g.streamID, err, m) + if err != nil { + g.t.Logf("SendMsg (streamID: %v) with err '%v':\n%v ", g.streamID, err, m) + } else { + g.t.Logf("SendMsg (streamID: %v):\n%v ", g.streamID, m) + } return err } func (g grpcLoggerStream) RecvMsg(m interface{}) error { err := g.ClientStream.RecvMsg(m) - g.t.Logf("RecvMsg (streamID: %v) with err '%v':\n%v ", g.streamID, err, m) + if err != nil { + g.t.Logf("RecvMsg (streamID: %v) with err '%v':\n%v ", g.streamID, err, m) + } else { + g.t.Logf("RecvMsg (streamID: %v):\n%v ", g.streamID, m) + } return err } diff --git a/trace/driver_gtrace.go b/trace/driver_gtrace.go index 14df36f4b..32c008d09 100644 --- a/trace/driver_gtrace.go +++ b/trace/driver_gtrace.go @@ -1261,7 +1261,6 @@ func (t *Driver) onGetCredentials(d DriverGetCredentialsStartInfo) func(DriverGe } return res } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnInit(t *Driver, c *context.Context, call call, endpoint string, database string, secure bool) func(error) { var p DriverInitStartInfo @@ -1277,7 +1276,6 @@ func DriverOnInit(t *Driver, c *context.Context, call call, endpoint string, dat res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnWith(t *Driver, c *context.Context, call call, endpoint string, database string, secure bool) func(error) { var p DriverWithStartInfo @@ -1293,7 +1291,6 @@ func DriverOnWith(t *Driver, c *context.Context, call call, endpoint string, dat res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnClose(t *Driver, c *context.Context, call call) func(error) { var p DriverCloseStartInfo @@ -1306,7 +1303,6 @@ func DriverOnClose(t *Driver, c *context.Context, call call) func(error) { res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnPoolNew(t *Driver, c *context.Context, call call) func() { var p DriverConnPoolNewStartInfo @@ -1318,7 +1314,6 @@ func DriverOnPoolNew(t *Driver, c *context.Context, call call) func() { res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnPoolRelease(t *Driver, c *context.Context, call call) func(error) { var p DriverConnPoolReleaseStartInfo @@ -1331,7 +1326,6 @@ func DriverOnPoolRelease(t *Driver, c *context.Context, call call) func(error) { res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnResolve(t *Driver, call call, target string, resolved []string) func(error) { var p DriverResolveStartInfo @@ -1345,7 +1339,6 @@ func DriverOnResolve(t *Driver, call call, target string, resolved []string) fun res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnStateChange(t *Driver, c *context.Context, call call, endpoint EndpointInfo, state ConnState) func(state ConnState) { var p DriverConnStateChangeStartInfo @@ -1360,7 +1353,6 @@ func DriverOnConnStateChange(t *Driver, c *context.Context, call call, endpoint res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnInvoke(t *Driver, c *context.Context, call call, endpoint EndpointInfo, m Method) func(_ error, issues []Issue, opID string, state ConnState, metadata map[string][]string) { var p DriverConnInvokeStartInfo @@ -1379,7 +1371,6 @@ func DriverOnConnInvoke(t *Driver, c *context.Context, call call, endpoint Endpo res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnNewStream(t *Driver, c *context.Context, call call, endpoint EndpointInfo, m Method) func(_ error, state ConnState) { var p DriverConnNewStreamStartInfo @@ -1395,7 +1386,6 @@ func DriverOnConnNewStream(t *Driver, c *context.Context, call call, endpoint En res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnStreamRecvMsg(t *Driver, c *context.Context, call call) func(error) { var p DriverConnStreamRecvMsgStartInfo @@ -1408,7 +1398,6 @@ func DriverOnConnStreamRecvMsg(t *Driver, c *context.Context, call call) func(er res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnStreamSendMsg(t *Driver, c *context.Context, call call) func(error) { var p DriverConnStreamSendMsgStartInfo @@ -1421,7 +1410,6 @@ func DriverOnConnStreamSendMsg(t *Driver, c *context.Context, call call) func(er res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnStreamCloseSend(t *Driver, c *context.Context, call call) func(error) { var p DriverConnStreamCloseSendStartInfo @@ -1434,7 +1422,6 @@ func DriverOnConnStreamCloseSend(t *Driver, c *context.Context, call call) func( res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnStreamFinish(t *Driver, c context.Context, call call, e error) { var p DriverConnStreamFinishInfo @@ -1443,7 +1430,6 @@ func DriverOnConnStreamFinish(t *Driver, c context.Context, call call, e error) p.Error = e t.onConnStreamFinish(p) } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnDial(t *Driver, c *context.Context, call call, endpoint EndpointInfo) func(error) { var p DriverConnDialStartInfo @@ -1457,7 +1443,6 @@ func DriverOnConnDial(t *Driver, c *context.Context, call call, endpoint Endpoin res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnBan(t *Driver, c *context.Context, call call, endpoint EndpointInfo, state ConnState, cause error) func(state ConnState) { var p DriverConnBanStartInfo @@ -1473,7 +1458,6 @@ func DriverOnConnBan(t *Driver, c *context.Context, call call, endpoint Endpoint res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnAllow(t *Driver, c *context.Context, call call, endpoint EndpointInfo, state ConnState) func(state ConnState) { var p DriverConnAllowStartInfo @@ -1488,7 +1472,6 @@ func DriverOnConnAllow(t *Driver, c *context.Context, call call, endpoint Endpoi res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnPark(t *Driver, c *context.Context, call call, endpoint EndpointInfo) func(error) { var p DriverConnParkStartInfo @@ -1502,7 +1485,6 @@ func DriverOnConnPark(t *Driver, c *context.Context, call call, endpoint Endpoin res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnConnClose(t *Driver, c *context.Context, call call, endpoint EndpointInfo) func(error) { var p DriverConnCloseStartInfo @@ -1516,7 +1498,6 @@ func DriverOnConnClose(t *Driver, c *context.Context, call call, endpoint Endpoi res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnRepeaterWakeUp(t *Driver, c *context.Context, call call, name string, event string) func(error) { var p DriverRepeaterWakeUpStartInfo @@ -1531,7 +1512,6 @@ func DriverOnRepeaterWakeUp(t *Driver, c *context.Context, call call, name strin res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnBalancerInit(t *Driver, c *context.Context, call call, name string) func(error) { var p DriverBalancerInitStartInfo @@ -1545,7 +1525,6 @@ func DriverOnBalancerInit(t *Driver, c *context.Context, call call, name string) res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnBalancerClose(t *Driver, c *context.Context, call call) func(error) { var p DriverBalancerCloseStartInfo @@ -1558,7 +1537,6 @@ func DriverOnBalancerClose(t *Driver, c *context.Context, call call) func(error) res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnBalancerChooseEndpoint(t *Driver, c *context.Context, call call) func(endpoint EndpointInfo, _ error) { var p DriverBalancerChooseEndpointStartInfo @@ -1572,7 +1550,6 @@ func DriverOnBalancerChooseEndpoint(t *Driver, c *context.Context, call call) fu res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnBalancerClusterDiscoveryAttempt(t *Driver, c *context.Context, call call, address string) func(error) { var p DriverBalancerClusterDiscoveryAttemptStartInfo @@ -1586,7 +1563,6 @@ func DriverOnBalancerClusterDiscoveryAttempt(t *Driver, c *context.Context, call res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnBalancerUpdate(t *Driver, c *context.Context, call call, needLocalDC bool) func(endpoints []EndpointInfo, added []EndpointInfo, dropped []EndpointInfo, localDC string) { var p DriverBalancerUpdateStartInfo @@ -1603,7 +1579,6 @@ func DriverOnBalancerUpdate(t *Driver, c *context.Context, call call, needLocalD res(p) } } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func DriverOnGetCredentials(t *Driver, c *context.Context, call call) func(token string, _ error) { var p DriverGetCredentialsStartInfo