Skip to content

Commit

Permalink
Merge pull request #852 from cloudwego/release/v0.5.0
Browse files Browse the repository at this point in the history
chore: release v0.5.0
  • Loading branch information
YangruiEmma authored Mar 8, 2023
2 parents dd39b82 + bb0bcb3 commit 1f307f2
Show file tree
Hide file tree
Showing 131 changed files with 5,317 additions and 1,857 deletions.
3 changes: 3 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# For more information, please refer to https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/customizing-your-repository/about-code-owners

* @cloudwego/Kitex-reviewers @cloudwego/Kitex-approvers @cloudwego/Kitex-maintainers
2 changes: 1 addition & 1 deletion .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

- name: Check Spell
uses: crate-ci/typos@master
uses: crate-ci/typos@v1.13.14

staticcheck:
runs-on: [ self-hosted, X64 ]
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ jobs:
with:
go-version: ${{ matrix.go }}
- name: Unit Test
run: go test -gcflags=-l -race -covermode=atomic -coverprofile=coverage.txt ./...
run: go test -gcflags=-l -race -covermode=atomic ./...
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Before you submit your Pull Request (PR) consider the following guidelines:
```
git checkout -b my-fix-branch develop
```
6. Create your patch, including appropriate test cases.
6. Create your patch, including appropriate test cases. Please refer to [Go-UT](https://pkg.go.dev/testing#pkg-overview) for writing guides. [Go-Mock](https://github.com/golang/mock) is recommended to mock interface, please refer to internal/mocks/readme.md for more details, and [Mockey](https://github.com/bytedance/mockey) is recommended to mock functions, please refer to its readme doc for specific usage.
7. Follow our [Style Guides](#code-style-guides).
8. Commit your changes using a descriptive commit message that follows [AngularJS Git Commit Message Conventions](https://docs.google.com/document/d/1QrDFcIiPjSLDn3EL15IJygNPiHORgU1_OOAqWjiDU5Y/edit).
Adherence to these conventions is necessary because release notes are automatically generated from these messages.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Kitex has built-in code generation tools that support generating **Thrift**, **P

- **Basic Features**

Including Message Type, Supported Protocols, Directly Invoke, Connection Pool, Timeout Control, Request Retry, LoadBalancer, Circuit Breaker, Rate Limiting, Instrumentation Control, Logging and HttpResolver.[[more]](https://www.cloudwego.io/docs/tutorials/basic-feature/)
Including Message Type, Supported Protocols, Directly Invoke, Connection Pool, Timeout Control, Request Retry, LoadBalancer, Circuit Breaker, Rate Limiting, Instrumentation Control, Logging and HttpResolver.[[more]](https://www.cloudwego.io/docs/kitex/tutorials/basic-feature/)

- **Governance Features**

Expand Down
28 changes: 24 additions & 4 deletions client/callopt/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/cloudwego/kitex/internal/client"
"github.com/cloudwego/kitex/pkg/discovery"
"github.com/cloudwego/kitex/pkg/fallback"
"github.com/cloudwego/kitex/pkg/http"
"github.com/cloudwego/kitex/pkg/retry"
"github.com/cloudwego/kitex/pkg/rpcinfo"
Expand All @@ -45,6 +46,7 @@ type CallOptions struct {

// export field for using in client
RetryPolicy retry.Policy
Fallback *fallback.Policy
}

func newOptions() interface{} {
Expand All @@ -63,6 +65,7 @@ func (co *CallOptions) Recycle() {
co.configs = nil
co.svr = nil
co.RetryPolicy = retry.Policy{}
co.Fallback = nil
co.locks.Zero()
callOptionsPool.Put(co)
}
Expand Down Expand Up @@ -171,14 +174,14 @@ func WithTag(key, val string) Option {

// WithRetryPolicy sets the retry policy for a RPC call.
// Build retry.Policy with retry.BuildFailurePolicy or retry.BuildBackupRequest instead of building retry.Policy directly.
// Below is use demo, eg:
// Demos are provided below:
//
// demo1. call with failure retry policy, default retry error is Timeout
// resp, err := cli.Mock(ctx, req, callopt.WithRetryPolicy(retry.BuildFailurePolicy(retry.NewFailurePolicy())))
// `resp, err := cli.Mock(ctx, req, callopt.WithRetryPolicy(retry.BuildFailurePolicy(retry.NewFailurePolicy())))`
// demo2. call with backup request policy
// bp := retry.NewBackupPolicy(10)
// `bp := retry.NewBackupPolicy(10)
// bp.WithMaxRetryTimes(1)
// resp, err := cli.Mock(ctx, req, callopt.WithRetryPolicy(retry.BuildBackupRequest(bp)))
// resp, err := cli.Mock(ctx, req, callopt.WithRetryPolicy(retry.BuildBackupRequest(bp)))`
func WithRetryPolicy(p retry.Policy) Option {
return Option{f: func(o *CallOptions, di *strings.Builder) {
if !p.Enable {
Expand All @@ -193,6 +196,23 @@ func WithRetryPolicy(p retry.Policy) Option {
}}
}

// WithFallback is used to set the fallback policy for a RPC call.
// Demos are provided below:
//
// demo1. call with fallback for error
// `resp, err := cli.Mock(ctx, req, callopt.WithFallback(fallback.ErrorFallback(yourFBFunc))`
// demo2. call with fallback for error and enable reportAsFallback, which sets reportAsFallback to be true and will do report(metric) as Fallback result
// `resp, err := cli.Mock(ctx, req, callopt.WithFallback(fallback.ErrorFallback(yourFBFunc).EnableReportAsFallback())`
func WithFallback(fb *fallback.Policy) Option {
return Option{f: func(o *CallOptions, di *strings.Builder) {
if !fallback.IsPolicyValid(fb) {
return
}
di.WriteString("WithFallback")
o.Fallback = fb
}}
}

// Apply applies call options to the rpcinfo.RPCConfig and internal.RemoteInfo of kitex client.
// The return value records the name and arguments of each option.
// This function is for internal purpose only.
Expand Down
26 changes: 26 additions & 0 deletions client/callopt/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
package callopt

import (
"context"
"fmt"
"testing"
"time"

"github.com/cloudwego/kitex/internal/client"
"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/fallback"
"github.com/cloudwego/kitex/pkg/http"
"github.com/cloudwego/kitex/pkg/retry"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/rpcinfo/remoteinfo"
)
Expand Down Expand Up @@ -89,4 +92,27 @@ func TestApply(t *testing.T) {
v, exist := remoteInfo.Tag(mockKey)
test.Assert(t, exist)
test.Assert(t, v == mockVal, v)

// WithRetryPolicy
option = WithRetryPolicy(retry.BuildFailurePolicy(retry.NewFailurePolicy()))
_, co := Apply([]Option{option}, rpcConfig, remoteInfo, client.NewConfigLocks(), http.NewDefaultResolver())
test.Assert(t, co.RetryPolicy.Enable)
test.Assert(t, co.RetryPolicy.FailurePolicy != nil)

// WithRetryPolicy pass empty struct
option = WithRetryPolicy(retry.Policy{})
_, co = Apply([]Option{option}, rpcConfig, remoteInfo, client.NewConfigLocks(), http.NewDefaultResolver())
test.Assert(t, !co.RetryPolicy.Enable)

// WithFallback
option = WithFallback(fallback.ErrorFallback(fallback.UnwrapHelper(func(ctx context.Context, req, resp interface{}, err error) (fbResp interface{}, fbErr error) {
return
})).EnableReportAsFallback())
_, co = Apply([]Option{option}, rpcConfig, remoteInfo, client.NewConfigLocks(), http.NewDefaultResolver())
test.Assert(t, co.Fallback != nil)

// WithFallback pass nil
option = WithFallback(nil)
_, co = Apply([]Option{option}, rpcConfig, remoteInfo, client.NewConfigLocks(), http.NewDefaultResolver())
test.Assert(t, co.Fallback == nil)
}
108 changes: 74 additions & 34 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"runtime"
"runtime/debug"
"strconv"
"sync/atomic"

Expand All @@ -34,6 +35,7 @@ import (
"github.com/cloudwego/kitex/pkg/discovery"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/event"
"github.com/cloudwego/kitex/pkg/fallback"
"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/loadbalance"
Expand Down Expand Up @@ -244,11 +246,14 @@ func (kc *kClient) initLBCache() error {
NameFunc: func() string { return "no_resolver" },
}
}
// because we cannot ensure that user's custom loadbalancer is cacheable, we need to disable it here
cacheOpts := lbcache.Options{DiagnosisService: kc.opt.DebugService, Cacheable: false}
balancer := kc.opt.Balancer
if balancer == nil {
// default internal lb balancer is cacheable
cacheOpts.Cacheable = true
balancer = loadbalance.NewWeightedBalancer()
}
cacheOpts := lbcache.Options{DiagnosisService: kc.opt.DebugService}
if kc.opt.BalancerCacheOpt != nil {
cacheOpts = *kc.opt.BalancerCacheOpt
}
Expand Down Expand Up @@ -365,48 +370,67 @@ func (kc *kClient) Call(ctx context.Context, method string, request, response in
ctx = kc.opt.TracerCtl.DoStart(ctx, ri)

var callOptRetry retry.Policy
if callOpts != nil && callOpts.RetryPolicy.Enable {
callOptRetry = callOpts.RetryPolicy
}
if kc.opt.RetryContainer == nil {
if callOptRetry.Enable {
// setup retry in callopt
kc.opt.RetryContainer = retry.NewRetryContainer()
} else {
err := kc.eps(ctx, request, response)
kc.opt.TracerCtl.DoFinish(ctx, ri, err)
if err == nil {
err = ri.Invocation().BizStatusErr()
rpcinfo.PutRPCInfo(ri)
}
return err
var callOptFallback *fallback.Policy
if callOpts != nil {
callOptFallback = callOpts.Fallback
if callOpts.RetryPolicy.Enable {
callOptRetry = callOpts.RetryPolicy
}
}
if kc.opt.RetryContainer == nil && callOptRetry.Enable {
// setup retry in callopt
kc.opt.RetryContainer = retry.NewRetryContainer()
}

var callTimes int32
var prevRI rpcinfo.RPCInfo
recycleRI, err := kc.opt.RetryContainer.WithRetryIfNeeded(ctx, callOptRetry, func(ctx context.Context, r retry.Retryer) (rpcinfo.RPCInfo, interface{}, error) {
currCallTimes := int(atomic.AddInt32(&callTimes, 1))
retryCtx := ctx
cRI := ri
if currCallTimes > 1 {
retryCtx, cRI, _ = kc.initRPCInfo(ctx, method)
retryCtx = metainfo.WithPersistentValue(retryCtx, retry.TransitKey, strconv.Itoa(currCallTimes-1))
if prevRI == nil {
prevRI = ri
var err error
var recycleRI bool
if kc.opt.RetryContainer == nil {
err = kc.eps(ctx, request, response)
if err == nil {
recycleRI = true
}
} else {
var callTimes int32
// prevRI represents a value of rpcinfo.RPCInfo type.
var prevRI atomic.Value
recycleRI, err = kc.opt.RetryContainer.WithRetryIfNeeded(ctx, callOptRetry, func(ctx context.Context, r retry.Retryer) (rpcinfo.RPCInfo, interface{}, error) {
currCallTimes := int(atomic.AddInt32(&callTimes, 1))
retryCtx := ctx
cRI := ri
if currCallTimes > 1 {
retryCtx, cRI, _ = kc.initRPCInfo(ctx, method)
retryCtx = metainfo.WithPersistentValue(retryCtx, retry.TransitKey, strconv.Itoa(currCallTimes-1))
if prevRI.Load() == nil {
prevRI.Store(ri)
}
r.Prepare(retryCtx, prevRI.Load().(rpcinfo.RPCInfo), cRI)
prevRI.Store(cRI)
}
r.Prepare(retryCtx, prevRI, cRI)
prevRI = cRI
err := kc.eps(retryCtx, request, response)
return cRI, response, err
}, ri, request)
}

// do fallback if with setup
fallback, hasFallback := getFallbackPolicy(callOptFallback, kc.opt.Fallback)
var fbErr error
reportErr := err
if hasFallback {
reportAsFB := false
// Notice: If rpc err is nil, rpcStatAsFB will always be false, even if it's set to true by user.
fbErr, reportAsFB = fallback.DoIfNeeded(ctx, ri, request, response, err)
if reportAsFB {
reportErr = fbErr
}
err := kc.eps(retryCtx, request, response)
return cRI, response, err
}, ri, request)
err = fbErr
}

kc.opt.TracerCtl.DoFinish(ctx, ri, err)
kc.opt.TracerCtl.DoFinish(ctx, ri, reportErr)
callOpts.Recycle()
if err == nil {
if err == nil && !hasFallback {
err = ri.Invocation().BizStatusErr()
}

if recycleRI {
// why need check recycleRI to decide if recycle RPCInfo?
// 1. no retry, rpc timeout happen will cause panic when response return
Expand Down Expand Up @@ -503,6 +527,11 @@ func (kc *kClient) invokeHandleEndpoint() (endpoint.Endpoint, error) {

// Close is not concurrency safe.
func (kc *kClient) Close() error {
defer func() {
if err := recover(); err != nil {
klog.Warnf("KITEX: panic when close client, error=%s, stack=%s", err, string(debug.Stack()))
}
}()
if kc.closed {
return nil
}
Expand Down Expand Up @@ -631,3 +660,14 @@ func (kc *kClient) warmingUp() error {

return nil
}

// return fallback policy from call option and client option.
func getFallbackPolicy(callOptFB, cliOptFB *fallback.Policy) (fb *fallback.Policy, hasFallback bool) {
if callOptFB != nil {
return callOptFB, true
}
if cliOptFB != nil {
return cliOptFB, true
}
return nil, false
}
Loading

0 comments on commit 1f307f2

Please sign in to comment.