Skip to content

Commit

Permalink
release v1.1.0 (#177)
Browse files Browse the repository at this point in the history
### Features

- {client, naming}: allow selector to define its own net.Addr parser
(#176)
- log: provide registration for custom format encoder (#146)

### Bug Fixes

- attachment: fix possible uint32 overflows (#161)
- attachment: copy attachment size if user provides their own rsp head
(#161)
- stream: fix the memory leak issue that occurs when stream.NewStream
fails (#161)
- errs: Msg should unwrap the inner trpc error (#161)
- http: use GotConn for obtaining remote addr in connection reuse
case(#161)
- http: http trace should not replace req ctx with transport ctx (#161)
- http: do not ignore server no response error (#161)
- restful: fix timeout does not take effect which is introduced in !1461
(#161)
- log: skip buffer and write directly when data packet exceeds expected
size( #161)
- config: set empty ip to default 0.0.0.0 to avoid graceful restart
error (#161)
- config: fix watch callback leak when call TrpcConfigLoader.Load
multiple times (#161)
- server: fix unaligned 64-bit atomic words at linux-386 (#161)
- server: don't wait entire service timeout, but close quickly on no
active request (#161)
- server: do not close old listener immediately after hot restart (#161)
- config: promise that dst of codec.Unmarshal is always
map[string]interface{} (#161)
- restful: fix that deep wildcard matches in reverse order (#161)
- log: log.Info("a","b") print "a b" instead of "ab" (#161)
- stream: return an error when receiving an unexpected frame type (#161)
- stream: ensure server returns an error when connection is closed
(#161)
- stream: fix connection overwriting when a client uses the same port to
connect.(#161)
- stream: fix client's compression type setting not working (#161)
- client: remove the write operation on *registry.Node in LoadNodeConfig
to avoid data race during selecting Node (#161)
- config: re-enable Config.Global.LocalIP to perfect !1936 (#161)
- http: get serializer should also be able to unmarshal nested structure
(#161)
- http: check type of url.Values for form serialization (#161)
- http: expose possible io.Writer interface for http response body
(#161)
- client: fix client wildcard match for config (#161)
- codec: revert !2059 "optimize performance of extracting method name
out of rpc name" (#161)
- http: fix form serialization panicking for compatibility (#161)

### Documentations

- docs: add note on listen to all addresses for a service (#144) 
- docs: refine idle timeout settings documentation for clarity (#172)
- docs: add notes on client connpool idletimeout (#170)
- pool: fix typos (#167)
- docs: add notes on service idle timeout (#166)
- docs: correct the spelling error (#163)
docs: add comments informing that global variable JSONAPI should not be
modified (#157)

### Enhancements

- github-action: allow dependabot to contribute and bump cla to v2.3.2
(#174)
- {trpc, examples, test}: upgrade google.golang.org/protobuf v1.30.0 =>
v1.33.0 (#171)
- http: improve stability of http test (#168)
- go.mod: upgrade tnet version to handle negative idle timeout (#169)
  • Loading branch information
sandyskies authored May 16, 2024
2 parents ddf95ed + 82ee6e8 commit 5904f6f
Show file tree
Hide file tree
Showing 82 changed files with 2,219 additions and 550 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/cla.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
steps:
- name: "CLA Assistant"
if: (github.event.comment.body == 'recheck' || github.event.comment.body == 'I have read the CLA Document and I hereby sign the CLA') || github.event_name == 'pull_request_target'
uses: contributor-assistant/[email protected].1
uses: contributor-assistant/[email protected].2
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
PERSONAL_ACCESS_TOKEN: ${{ secrets.CLA_DATABASE_ACCESS_TOKEN }}
Expand All @@ -28,4 +28,5 @@ jobs:
path-to-signatures: 'signatures/${{ github.event.repository.name }}-${{ github.repository_id }}/cla.json'
path-to-document: 'https://github.com/trpc-group/cla-database/blob/main/Tencent-Contributor-License-Agreement.md'
# branch should not be protected
branch: 'main'
branch: 'main'
allowlist: dependabot
15 changes: 12 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func selectorFilter(ctx context.Context, req interface{}, rsp interface{}, next
if err != nil {
return OptionsFromContext(ctx).fixTimeout(err)
}
ensureMsgRemoteAddr(msg, findFirstNonEmpty(node.Network, opts.Network), node.Address)
ensureMsgRemoteAddr(msg, findFirstNonEmpty(node.Network, opts.Network), node.Address, node.ParseAddr)

// Start to process the next filter and report.
begin := time.Now()
Expand Down Expand Up @@ -471,11 +471,21 @@ func getNode(opts *Options) (*registry.Node, error) {
return node, nil
}

func ensureMsgRemoteAddr(msg codec.Msg, network string, address string) {
func ensureMsgRemoteAddr(
msg codec.Msg,
network, address string,
parseAddr func(network, address string) net.Addr,
) {
// If RemoteAddr has already been set, just return.
if msg.RemoteAddr() != nil {
return
}

if parseAddr != nil {
msg.WithRemoteAddr(parseAddr(network, address))
return
}

switch network {
case "tcp", "tcp4", "tcp6", "udp", "udp4", "udp6":
// Check if address can be parsed as an ip.
Expand All @@ -484,7 +494,6 @@ func ensureMsgRemoteAddr(msg codec.Msg, network string, address string) {
return
}
}

var addr net.Addr
switch network {
case "tcp", "tcp4", "tcp6":
Expand Down
86 changes: 76 additions & 10 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package client_test
import (
"context"
"errors"
"fmt"
"net"
"testing"
"time"

Expand Down Expand Up @@ -409,6 +411,31 @@ func TestFixTimeout(t *testing.T) {
})
}

func TestSelectorRemoteAddrUseUserProvidedParser(t *testing.T) {
selector.Register(t.Name(), &fSelector{
selectNode: func(s string, option ...selector.Option) (*registry.Node, error) {
return &registry.Node{
Network: t.Name(),
Address: t.Name(),
ParseAddr: func(network, address string) net.Addr {
return newUnresolvedAddr(network, address)
}}, nil
},
report: func(node *registry.Node, duration time.Duration, err error) error { return nil },
})
fake := "fake"
codec.Register(fake, nil, &fakeCodec{})
ctx := trpc.BackgroundContext()
require.NotNil(t, client.New().Invoke(ctx, "failbody", nil,
client.WithServiceName(t.Name()),
client.WithProtocol(fake),
client.WithTarget(fmt.Sprintf("%s://xxx", t.Name()))))
addr := trpc.Message(ctx).RemoteAddr()
require.NotNil(t, addr)
require.Equal(t, t.Name(), addr.Network())
require.Equal(t, t.Name(), addr.String())
}

type multiplexedTransport struct {
require func(context.Context, []byte, ...transport.RoundTripOption)
fakeTransport
Expand All @@ -423,7 +450,11 @@ func (t *multiplexedTransport) RoundTrip(
return t.fakeTransport.RoundTrip(ctx, req, opts...)
}

type fakeTransport struct{}
type fakeTransport struct {
send func() error
recv func() ([]byte, error)
close func()
}

func (c *fakeTransport) RoundTrip(ctx context.Context, req []byte,
roundTripOpts ...transport.RoundTripOption) (rsp []byte, err error) {
Expand All @@ -447,18 +478,15 @@ func (c *fakeTransport) RoundTrip(ctx context.Context, req []byte,
}

func (c *fakeTransport) Send(ctx context.Context, req []byte, opts ...transport.RoundTripOption) error {
if c.send != nil {
return c.send()
}
return nil
}

func (c *fakeTransport) Recv(ctx context.Context, opts ...transport.RoundTripOption) ([]byte, error) {
body, ok := ctx.Value("recv-decode-error").(string)
if ok {
return []byte(body), nil
}

err, ok := ctx.Value("recv-error").(string)
if ok {
return nil, errors.New(err)
if c.recv != nil {
return c.recv()
}
return []byte("body"), nil
}
Expand All @@ -467,7 +495,9 @@ func (c *fakeTransport) Init(ctx context.Context, opts ...transport.RoundTripOpt
return nil
}
func (c *fakeTransport) Close(ctx context.Context) {
return
if c.close != nil {
c.close()
}
}

type fakeCodec struct {
Expand Down Expand Up @@ -524,3 +554,39 @@ func (c *fakeSelector) Select(serviceName string, opt ...selector.Option) (*regi
func (c *fakeSelector) Report(node *registry.Node, cost time.Duration, err error) error {
return nil
}

type fSelector struct {
selectNode func(string, ...selector.Option) (*registry.Node, error)
report func(*registry.Node, time.Duration, error) error
}

func (s *fSelector) Select(serviceName string, opts ...selector.Option) (*registry.Node, error) {
return s.selectNode(serviceName, opts...)
}

func (s *fSelector) Report(node *registry.Node, cost time.Duration, err error) error {
return s.report(node, cost, err)
}

// newUnresolvedAddr returns a new unresolvedAddr.
func newUnresolvedAddr(network, address string) *unresolvedAddr {
return &unresolvedAddr{network: network, address: address}
}

var _ net.Addr = (*unresolvedAddr)(nil)

// unresolvedAddr is a net.Addr which returns the original network or address.
type unresolvedAddr struct {
network string
address string
}

// Network returns the unresolved original network.
func (a *unresolvedAddr) Network() string {
return a.network
}

// String returns the unresolved original address.
func (a *unresolvedAddr) String() string {
return a.address
}
5 changes: 5 additions & 0 deletions client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ func RegisterConfig(conf map[string]*BackendConfig) error {

// RegisterClientConfig is called to replace backend config of single callee service by name.
func RegisterClientConfig(callee string, conf *BackendConfig) error {
if callee == "*" {
// Reset the callee and service name to enable wildcard matching.
conf.Callee = ""
conf.ServiceName = ""
}
opts, err := conf.genOptions()
if err != nil {
return err
Expand Down
22 changes: 22 additions & 0 deletions client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,25 @@ func TestConfig(t *testing.T) {
}
require.Nil(t, client.RegisterClientConfig("trpc.test.helloworld3", backconfig))
}

func TestRegisterWildcardClient(t *testing.T) {
cfg := client.Config("*")
t.Cleanup(func() {
client.RegisterClientConfig("*", cfg)
})
client.RegisterClientConfig("*", &client.BackendConfig{
DisableServiceRouter: true,
})

ch := make(chan *client.Options, 1)
c := client.New()
ctx, _ := codec.EnsureMessage(context.Background())
require.Nil(t, c.Invoke(ctx, nil, nil, client.WithFilter(
func(ctx context.Context, _, _ interface{}, _ filter.ClientHandleFunc) error {
ch <- client.OptionsFromContext(ctx)
// Skip next.
return nil
})))
opts := <-ch
require.True(t, opts.DisableServiceRouter)
}
28 changes: 18 additions & 10 deletions client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,16 @@ type RecvControl interface {
// It serializes the message and sends it to server through stream transport.
// It's safe to call Recv and Send in different goroutines concurrently, but calling
// Send in different goroutines concurrently is not thread-safe.
func (s *stream) Send(ctx context.Context, m interface{}) error {
func (s *stream) Send(ctx context.Context, m interface{}) (err error) {
defer func() {
if err != nil {
s.opts.StreamTransport.Close(ctx)
}
}()

msg := codec.Message(ctx)
reqBodyBuf, err := serializeAndCompress(ctx, msg, m, s.opts)
if err != nil {
s.opts.StreamTransport.Close(ctx)
return err
}

Expand All @@ -87,7 +92,6 @@ func (s *stream) Send(ctx context.Context, m interface{}) error {
}

if err := s.opts.StreamTransport.Send(ctx, reqBuf); err != nil {
s.opts.StreamTransport.Close(ctx)
return err
}
return nil
Expand All @@ -97,18 +101,24 @@ func (s *stream) Send(ctx context.Context, m interface{}) error {
// It decodes and decompresses the message and leaves serialization to upper layer.
// It's safe to call Recv and Send in different goroutines concurrently, but calling
// Send in different goroutines concurrently is not thread-safe.
func (s *stream) Recv(ctx context.Context) ([]byte, error) {
func (s *stream) Recv(ctx context.Context) (buf []byte, err error) {
defer func() {
if err != nil {
s.opts.StreamTransport.Close(ctx)
}
}()
rspBuf, err := s.opts.StreamTransport.Recv(ctx)
if err != nil {
s.opts.StreamTransport.Close(ctx)
return nil, err
}
msg := codec.Message(ctx)
rspBodyBuf, err := s.opts.Codec.Decode(msg, rspBuf)
if err != nil {
s.opts.StreamTransport.Close(ctx)
return nil, errs.NewFrameError(errs.RetClientDecodeFail, "client codec Decode: "+err.Error())
}
if err := msg.ClientRspErr(); err != nil {
return nil, err
}
if len(rspBodyBuf) > 0 {
compressType := msg.CompressType()
if icodec.IsValidCompressType(s.opts.CurrentCompressType) {
Expand All @@ -118,9 +128,7 @@ func (s *stream) Recv(ctx context.Context) ([]byte, error) {
if icodec.IsValidCompressType(compressType) && compressType != codec.CompressTypeNoop {
rspBodyBuf, err = codec.Decompress(compressType, rspBodyBuf)
if err != nil {
s.opts.StreamTransport.Close(ctx)
return nil,
errs.NewFrameError(errs.RetClientDecodeFail, "client codec Decompress: "+err.Error())
return nil, errs.NewFrameError(errs.RetClientDecodeFail, "client codec Decompress: "+err.Error())
}
}
}
Expand Down Expand Up @@ -154,7 +162,7 @@ func (s *stream) Init(ctx context.Context, opt ...Option) (*Options, error) {
report.SelectNodeFail.Incr()
return nil, err
}
ensureMsgRemoteAddr(msg, findFirstNonEmpty(node.Network, opts.Network), node.Address)
ensureMsgRemoteAddr(msg, findFirstNonEmpty(node.Network, opts.Network), node.Address, node.ParseAddr)
const invalidCost = -1
opts.Node.set(node, node.Address, invalidCost)
if opts.Codec == nil {
Expand Down
Loading

0 comments on commit 5904f6f

Please sign in to comment.