Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrate circuitbreaker for region calls #1543

Merged
merged 8 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ type TiKVClient struct {
// If a Region has not been accessed for more than the given duration (in seconds), it
// will be reloaded from the PD.
RegionCacheTTL uint `toml:"region-cache-ttl" json:"region-cache-ttl"`
// CircuitBreakerSettingsList is the config for default circuit breaker settings
CircuitBreakerSettingsList CircuitBreakerSettingsList `toml:"circuit-breaker" json:"circuit-breaker"`
// If a store has been up to the limit, it will return error for successive request to
// prevent the store occupying too much token in dispatching level.
StoreLimit int64 `toml:"store-limit" json:"store-limit"`
Expand Down Expand Up @@ -149,6 +151,23 @@ type CoprocessorCache struct {
AdmissionMinProcessMs uint64 `toml:"admission-min-process-ms" json:"-"`
}

// CircuitBreakerSettings is the config for default circuit breaker settings excluding the error rate
type CircuitBreakerSettings struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about referring to config/retry/backoff.go? We can use NewCircuitBreakerWithVars or something like that and only add a var for the error rate threshold. The other configs can be hard coded and we don't need to support changing them through the config file. @Tema @MyonKeminta @okJiang WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is problematic. Backoff is a request scope object so we can create it from vars all the time, however CircuitBreaker is an instance scope, which is created on startup and aggregates stats across all requests. Any concerns from propagating error rate directly from system variables like pingcap/tidb#58737 proposes?

As for all other configs, I think it is still make sense to keep them configurable as e.g. MinQPSToOpen could depend on the cluster size and might need tuning per workload. But I don't feel too strong about that and can rollback the last change. Otherwise, I will move them to PDClient section instead of TiKVClient as they belong there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about propagating the error rate instead of the whole Settings? As for the other configs, as we discussed before, we can make them hard-coded temporarily. If we do need to change them, then we can make them configurable. Right now, it's better to hide them to reduce the complexity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't propagate the full settings. Circuit breaker has a callback to modify any part of the settings:

func (cb *CircuitBreaker) ChangeSettings(apply func(config *Settings)) {
	apply(cb.config)
}

so in pingcap/tidb#58737 I propagate only error rate through system variables:

func (do *Domain) changePDMetadataCircuitBreakerErrorRateThresholdPct(errorRatePct uint32) {
	tikv.ChangePdRegionMetaCircuitBreakerSettings(func(config *circuitbreaker.Settings) {
		config.ErrorRateThresholdPct = errorRatePct
	})

I would like to keep this API as it allows easily to propagate any other part of the setting if needed, but not required so.

I will remove everything from TiKVClient config back to hardcoded values and keep propagating only error rate system variable as it was in the original version of this PR. Let me know if there is still any concern with that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, /cc @okJiang

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I removed all configurations except sysvar from this PR and pingcap/tidb#58737

// ErrorRateEvaluationWindowSeconds defines how long to track errors before evaluating error rate threshold.
ErrorRateEvaluationWindowSeconds uint `toml:"error-rate-evaluation-window-seconds" json:"error-rate-evaluation-window-seconds"`
// MinQPSToOpen defines the average qps over the error-rate-evaluation-window-seconds that must be met before evaluating the error rate threshold.
MinQPSToOpen uint `toml:"min-qps-to-open" json:"min-qps-to-open"`
// CooldownIntervalSeconds defines how long to wait after circuit breaker is open before go to half-open state to send a probe request.
CooldownIntervalSeconds uint `toml:"cooldown-interval-seconds" json:"cooldown-interval-seconds"`
// HalfOpenSuccessCount defines how many subsequent requests to test after cooldown period before fully close the circuit. All request in excess of this count will be errored till the circuit is fully closed pending results of the firsts HalfOpenSuccessCount requests.
HalfOpenSuccessCount uint `toml:"half-open-success-count" json:"half-open-success-count"`
}

// CircuitBreakerSettingsList is a container to configure all circuit breakers
type CircuitBreakerSettingsList struct {
PDRegionsMetadata CircuitBreakerSettings `toml:"pd-region-metadata" json:"pd-region-metadata"`
}

// DefaultTiKVClient returns default config for TiKVClient.
func DefaultTiKVClient() TiKVClient {
return TiKVClient{
Expand Down Expand Up @@ -176,7 +195,16 @@ func DefaultTiKVClient() TiKVClient {

EnableChunkRPC: true,

RegionCacheTTL: 600,
RegionCacheTTL: 600,
CircuitBreakerSettingsList: CircuitBreakerSettingsList{
PDRegionsMetadata: CircuitBreakerSettings{
ErrorRateEvaluationWindowSeconds: 30,
MinQPSToOpen: 10,
CooldownIntervalSeconds: 10,
HalfOpenSuccessCount: 1,
},
},

StoreLimit: 0,
StoreLivenessTimeout: DefStoreLivenessTimeout,

Expand Down
16 changes: 13 additions & 3 deletions internal/locate/region_cache.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you forgot to update L2218 and L2283?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I thought it is not needed as they low qps and didn't want to affect GC unnecessary, but it is probably better to throttle them as well. Just added them.

Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/circuitbreaker"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -133,6 +134,13 @@ func nextTTL(ts int64) int64 {
return ts + regionCacheTTLSec + jitter
}

var pdRegionMetaCircuitBreaker = circuitbreaker.NewCircuitBreaker("region-meta", circuitbreaker.AlwaysClosedSettings)

// ChangePdRegionMetaCircuitBreakerSettings changes circuit breaker changes for region metadata calls
func ChangePdRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func ChangePdRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) {
func ChangePDRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

pdRegionMetaCircuitBreaker.ChangeSettings(apply)
}

// nextTTLWithoutJitter is used for test.
func nextTTLWithoutJitter(ts int64) int64 {
return ts + regionCacheTTLSec
Expand Down Expand Up @@ -2070,10 +2078,11 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool,
start := time.Now()
var reg *router.Region
var err error
cbCtx := circuitbreaker.WithCircuitBreaker(ctx, pdRegionMetaCircuitBreaker)
if searchPrev {
reg, err = c.pdClient.GetPrevRegion(ctx, key, opts...)
reg, err = c.pdClient.GetPrevRegion(cbCtx, key, opts...)
} else {
reg, err = c.pdClient.GetRegion(ctx, key, opts...)
reg, err = c.pdClient.GetRegion(cbCtx, key, opts...)
}
metrics.LoadRegionCacheHistogramWhenCacheMiss.Observe(time.Since(start).Seconds())
if err != nil {
Expand Down Expand Up @@ -2121,7 +2130,8 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg
}
}
start := time.Now()
reg, err := c.pdClient.GetRegionByID(ctx, regionID, opt.WithBuckets())
cbCtx := circuitbreaker.WithCircuitBreaker(ctx, pdRegionMetaCircuitBreaker)
reg, err := c.pdClient.GetRegionByID(cbCtx, regionID, opt.WithBuckets())
metrics.LoadRegionCacheHistogramWithRegionByID.Observe(time.Since(start).Seconds())
if err != nil {
metrics.RegionCacheCounterWithGetRegionByIDError.Inc()
Expand Down
10 changes: 10 additions & 0 deletions internal/mockstore/mocktikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/tikv/pd/client/clients/tso"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
"github.com/tikv/pd/client/pkg/circuitbreaker"
sd "github.com/tikv/pd/client/servicediscovery"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -226,6 +227,7 @@ func (m *mockTSFuture) Wait() (int64, int64, error) {
}

func (c *pdClient) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) {
enforceCircuitBreakerFor("GetRegion", ctx)
region, peer, buckets, downPeers := c.cluster.GetRegionByKey(key)
if len(opts) == 0 {
buckets = nil
Expand All @@ -244,6 +246,7 @@ func (c *pdClient) GetRegionFromMember(ctx context.Context, key []byte, memberUR
}

func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) {
enforceCircuitBreakerFor("GetPrevRegion", ctx)
region, peer, buckets, downPeers := c.cluster.GetPrevRegionByKey(key)
if len(opts) == 0 {
buckets = nil
Expand All @@ -252,6 +255,7 @@ func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.Ge
}

func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) {
enforceCircuitBreakerFor("GetRegionByID", ctx)
region, peer, buckets, downPeers := c.cluster.GetRegionByID(regionID)
return &router.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil
}
Expand Down Expand Up @@ -465,3 +469,9 @@ func (m *pdClient) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGrou
func (m *pdClient) GetServiceDiscovery() sd.ServiceDiscovery { return nil }

func (m *pdClient) WithCallerComponent(caller.Component) pd.Client { return m }

func enforceCircuitBreakerFor(name string, ctx context.Context) {
if circuitbreaker.FromContext(ctx) == nil {
panic(fmt.Errorf("CircuitBreaker must be configured for %s", name))
}
}
6 changes: 6 additions & 0 deletions tikv/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/pkg/circuitbreaker"
)

// RPCContext contains data that is needed to send RPC to a region.
Expand Down Expand Up @@ -197,6 +198,11 @@ func SetRegionCacheTTLSec(t int64) {
locate.SetRegionCacheTTLSec(t)
}

// ChangePdRegionMetaCircuitBreakerSettings changes circuit breaker settings for region metadata calls
func ChangePdRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) {
locate.ChangePdRegionMetaCircuitBreakerSettings(apply)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

// SetRegionCacheTTLWithJitter sets region cache TTL with jitter. The real TTL is in range of [base, base+jitter).
func SetRegionCacheTTLWithJitter(base int64, jitter int64) {
locate.SetRegionCacheTTLWithJitter(base, jitter)
Expand Down
Loading