From 4c3662afce7e252e526c3d6143003ce1064cf7f7 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Fri, 2 Aug 2024 19:57:07 +0300 Subject: [PATCH] added simple wrapper for grpc errors for getting nodeID and address --- internal/conn/conn.go | 4 ++-- internal/conn/errors.go | 31 +++++++++++++++++++++++++++++ internal/conn/errors_test.go | 18 +++++++++++++++++ internal/conn/grpc_client_stream.go | 6 +++--- internal/xerrors/transport.go | 4 ++-- 5 files changed, 56 insertions(+), 7 deletions(-) diff --git a/internal/conn/conn.go b/internal/conn/conn.go index 067971fcf..795378443 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -365,7 +365,7 @@ func invoke( defer onTransportError(ctx, err) if !useWrapping { - return opID, issues, err + return opID, issues, withConnInfo(err, nodeID, address) } if sentMark.canRetry() { @@ -530,7 +530,7 @@ func (c *conn) NewStream( }() if !useWrapping { - return nil, err + return nil, withConnInfo(err, c.NodeID(), c.Address()) } if sentMark.canRetry() { diff --git a/internal/conn/errors.go b/internal/conn/errors.go index 090dc4543..874a8aa39 100644 --- a/internal/conn/errors.go +++ b/internal/conn/errors.go @@ -38,3 +38,34 @@ func IsBadConn(err error, goodConnCodes ...grpcCodes.Code) bool { return true } + +type grpcError struct { + err error + + nodeID uint32 + address string +} + +func (e *grpcError) Error() string { + return e.err.Error() +} + +func (e *grpcError) As(target any) bool { + return xerrors.As(e.err, target) +} + +func (e *grpcError) NodeID() uint32 { + return e.nodeID +} + +func (e *grpcError) Address() string { + return e.address +} + +func withConnInfo(err error, nodeID uint32, address string) error { + return &grpcError{ + err: err, + nodeID: nodeID, + address: address, + } +} diff --git a/internal/conn/errors_test.go b/internal/conn/errors_test.go index dd6b4871a..20494157a 100644 --- a/internal/conn/errors_test.go +++ b/internal/conn/errors_test.go @@ -108,3 +108,21 @@ func TestIsBadConn(t *testing.T) { }) } } + +func TestGrpcError(t *testing.T) { + err := withConnInfo(grpcStatus.Error(grpcCodes.Unavailable, "test"), 123, "test:123") + require.Equal(t, `rpc error: code = Unavailable desc = test`, err.Error()) + var nodeID interface { + NodeID() uint32 + } + require.ErrorAs(t, err, &nodeID) + require.Equal(t, uint32(123), nodeID.NodeID()) + var address interface { + Address() string + } + require.ErrorAs(t, err, &address) + require.Equal(t, "test:123", address.Address()) + s, has := grpcStatus.FromError(err) + require.True(t, has) + require.Equal(t, grpcCodes.Unavailable, s.Code()) +} diff --git a/internal/conn/grpc_client_stream.go b/internal/conn/grpc_client_stream.go index 397d1e8d3..b80eb4898 100644 --- a/internal/conn/grpc_client_stream.go +++ b/internal/conn/grpc_client_stream.go @@ -59,7 +59,7 @@ func (s *grpcClientStream) CloseSend() (err error) { } if !s.wrapping { - return err + return withConnInfo(err, s.parentConn.NodeID(), s.parentConn.Address()) } return xerrors.WithStackTrace(xerrors.Transport( @@ -99,7 +99,7 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) { }() if !s.wrapping { - return err + return withConnInfo(err, s.parentConn.NodeID(), s.parentConn.Address()) } if s.sentMark.canRetry() { @@ -159,7 +159,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { //nolint:funlen }() if !s.wrapping { - return err + return withConnInfo(err, s.parentConn.NodeID(), s.parentConn.Address()) } if s.sentMark.canRetry() { diff --git a/internal/xerrors/transport.go b/internal/xerrors/transport.go index 8700652b1..890fd1f9c 100644 --- a/internal/xerrors/transport.go +++ b/internal/xerrors/transport.go @@ -134,8 +134,8 @@ func IsTransportError(err error, codes ...grpcCodes.Code) bool { var status *grpcStatus.Status if t := (*transportError)(nil); errors.As(err, &t) { status = t.status - } else if t, has := grpcStatus.FromError(err); has { - status = t + } else if s, has := grpcStatus.FromError(err); has { + status = s } if status != nil { if len(codes) == 0 {