Skip to content

Commit

Permalink
fix processing errors on stream calls
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Mar 29, 2024
1 parent 154f7e2 commit 0db72fa
Showing 1 changed file with 69 additions and 49 deletions.
118 changes: 69 additions & 49 deletions internal/conn/grpc_client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ type grpcClientStream struct {
onDone func(ctx context.Context, md metadata.MD)
}

func (s *grpcClientStream) CloseSend() (err error) {
func (s *grpcClientStream) CloseSend() (finalErr error) {
onDone := trace.DriverOnConnStreamCloseSend(s.c.config.Trace(), &s.ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).CloseSend"),
)
defer func() {
onDone(err)
onDone(finalErr)
}()

locked, unlock := s.c.inUse.TryLock()
Expand All @@ -41,35 +41,47 @@ func (s *grpcClientStream) CloseSend() (err error) {
stop := s.c.lastUsage.Start()
defer stop()

err = s.ClientStream.CloseSend()

err := s.ClientStream.CloseSend()
if err != nil {
if xerrors.IsContextError(err) {
if !s.wrapping {
return err
}

if !xerrors.IsTransportError(err) {
return xerrors.WithStackTrace(err)
}

if s.wrapping {
defer func() {
s.c.onTransportError(s.Context(), finalErr)
}()

if s.sentMark.canRetry() {
return s.wrapError(
xerrors.Transport(
err,
xerrors.WithAddress(s.c.Address()),
xerrors.WithTraceID(s.traceID),
xerrors.Retryable(
xerrors.Transport(err,
xerrors.WithAddress(s.c.Address()),
xerrors.WithTraceID(s.traceID),
),
xerrors.WithName("CloseSend"),
),
)
}

return s.wrapError(err)
return s.wrapError(xerrors.Transport(err,
xerrors.WithAddress(s.c.Address()),
xerrors.WithTraceID(s.traceID),
))
}

return nil
}

func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
func (s *grpcClientStream) SendMsg(m interface{}) (finalErr error) {
onDone := trace.DriverOnConnStreamSendMsg(s.c.config.Trace(), &s.ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).SendMsg"),
)
defer func() {
onDone(err)
onDone(finalErr)
}()

locked, unlock := s.c.inUse.TryLock()
Expand All @@ -81,43 +93,47 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
stop := s.c.lastUsage.Start()
defer stop()

err = s.ClientStream.SendMsg(m)

err := s.ClientStream.SendMsg(m)
if err != nil {
if xerrors.IsContextError(err) {
if !s.wrapping {
return err
}

if !xerrors.IsTransportError(err) {
return xerrors.WithStackTrace(err)
}

defer func() {
s.c.onTransportError(s.Context(), err)
s.c.onTransportError(s.Context(), finalErr)
}()

if s.wrapping {
err = xerrors.Transport(err,
xerrors.WithAddress(s.c.Address()),
xerrors.WithTraceID(s.traceID),
)
if s.sentMark.canRetry() {
return s.wrapError(xerrors.Retryable(err,
if s.sentMark.canRetry() {
return s.wrapError(
xerrors.Retryable(
xerrors.Transport(err,
xerrors.WithAddress(s.c.Address()),
xerrors.WithTraceID(s.traceID),
),
xerrors.WithName("SendMsg"),
))
}

return s.wrapError(err)
),
)
}

return err
return s.wrapError(xerrors.Transport(err,
xerrors.WithAddress(s.c.Address()),
xerrors.WithTraceID(s.traceID),
))
}

return nil
}

func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
func (s *grpcClientStream) RecvMsg(m interface{}) (finalErr error) {
onDone := trace.DriverOnConnStreamRecvMsg(s.c.config.Trace(), &s.ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).RecvMsg"),
)
defer func() {
onDone(err)
onDone(finalErr)
}()

locked, unlock := s.c.inUse.TryLock()
Expand All @@ -130,39 +146,42 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
defer stop()

defer func() {
if err != nil {
if finalErr != nil {
md := s.ClientStream.Trailer()
s.onDone(s.ctx, md)
}
}()

err = s.ClientStream.RecvMsg(m)
err := s.ClientStream.RecvMsg(m)
if err != nil {
if xerrors.Is(err, io.EOF) || !s.wrapping {
return io.EOF
}

if err != nil { //nolint:nestif
if xerrors.IsContextError(err) {
if !xerrors.IsTransportError(err) {
return xerrors.WithStackTrace(err)
}

defer func() {
if !xerrors.Is(err, io.EOF) {
s.c.onTransportError(s.Context(), err)
}
s.c.onTransportError(s.Context(), finalErr)
}()

if s.wrapping {
err = xerrors.Transport(err,
xerrors.WithAddress(s.c.Address()),
)
if s.sentMark.canRetry() {
return s.wrapError(xerrors.Retryable(err,
if s.sentMark.canRetry() {
return s.wrapError(
xerrors.Retryable(
xerrors.Transport(err,
xerrors.WithAddress(s.c.Address()),
xerrors.WithTraceID(s.traceID),
),
xerrors.WithName("RecvMsg"),
))
}

return s.wrapError(err)
),
)
}

return err
return s.wrapError(xerrors.Transport(err,
xerrors.WithAddress(s.c.Address()),
xerrors.WithTraceID(s.traceID),
))
}

if s.wrapping {
Expand All @@ -172,6 +191,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
xerrors.Operation(
xerrors.FromOperation(operation),
xerrors.WithAddress(s.c.Address()),
xerrors.WithTraceID(s.traceID),
),
)
}
Expand Down

0 comments on commit 0db72fa

Please sign in to comment.