Skip to content

Commit

Permalink
Fix double close in stream client (#2693)
Browse files Browse the repository at this point in the history
* [fix] etcd config source prefix issue (#2389)

* http transport data race issue (#2436)

* [fix] #2431 http transport data race issue

* [feature] Ability to close connection while receiving.
Ability to send messages while receiving.
Icreased r channel limit to 100 to more fluently communication.
Do not dropp sent request if r channel is full.

* [fix] Do not close the transport client twice in stream connection , the transport client is closed in the rpc codec

---------

Co-authored-by: Johnson C <[email protected]>
Co-authored-by: Hunyadvári Péter <[email protected]>
  • Loading branch information
3 people authored Feb 15, 2024
1 parent 6e55bb1 commit f1a8b39
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
return nil, fmt.Errorf("unsupported Content-Type: %s", contentType)
}

func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error {
func (r *rpcClient) call(
ctx context.Context,
node *registry.Node,
req Request,
resp interface{},
opts CallOptions,
) error {
address := node.Address
logger := r.Options().Logger

Expand Down Expand Up @@ -292,12 +298,6 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
r.codec = codec
}

releaseFunc := func(_ error) {
if err = c.Close(); err != nil {
logger.Log(log.ErrorLevel, err)
}
}

stream := &rpcStream{
id: id,
context: ctx,
Expand All @@ -308,7 +308,7 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
closed: make(chan bool),
// signal the end of stream,
sendEOS: true,
release: releaseFunc,
release: func(_ error) {},
}

// wait for error response
Expand Down Expand Up @@ -490,7 +490,10 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
return merrors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}

return merrors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error())
return merrors.InternalServerError("go.micro.client",
"error getting next %s node: %s",
service,
err.Error())
}

// make the call
Expand Down Expand Up @@ -586,7 +589,10 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
return nil, merrors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}

return nil, merrors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error())
return nil, merrors.InternalServerError("go.micro.client",
"error getting next %s node: %s",
service,
err.Error())
}

stream, err := r.stream(ctx, node, request, callOpts)
Expand Down

0 comments on commit f1a8b39

Please sign in to comment.