Skip to content

Commit

Permalink
resource_control: dynamically change the controller config (#7042) (#…
Browse files Browse the repository at this point in the history
…7495)

close #7043

resource_control: supports dynamically change the controller config 
- supports dynamically changing the controller config
- export the `maxWaitDuration` for the local bucket limiter.

Signed-off-by: nolouch <[email protected]>
Signed-off-by: husharp <[email protected]>

Co-authored-by: nolouch <[email protected]>
Co-authored-by: Hu# <[email protected]>
Co-authored-by: ShuNing <[email protected]>
  • Loading branch information
3 people authored Dec 7, 2023
1 parent 0b048b1 commit 02b5952
Show file tree
Hide file tree
Showing 30 changed files with 922 additions and 303 deletions.
3 changes: 2 additions & 1 deletion client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ install-tools:
cd .. && $(MAKE) install-tools

static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
@ golangci-lint run -c ../.golangci.yml --verbose ./... --allow-parallel-runners
@ golangci-lint run -c ../.golangci.yml --verbose ./...
@ echo "revive ..."
@ revive -formatter friendly -config ../revive.toml ./...

Expand Down
29 changes: 16 additions & 13 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,27 @@ const (
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
// RetryTimeoutErr indicates the request is timeout.
// RetryTimeoutErr indicates the server is busy.
RetryTimeoutErr = "retry timeout"
)

// client errors
var (
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetLeader = errors.Normalize("get leader from %v error", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientGetMetaStorageClient = errors.Normalize("failed to get meta storage client", errors.RFCCodeText("PD:client:ErrClientGetMetaStorageClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetLeader = errors.Normalize("get leader from %v error", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID"))
ErrClientWatchGCSafePointV2Stream = errors.Normalize("watch gc safe point v2 stream failed, %s", errors.RFCCodeText("PD:client:ErrClientWatchGCSafePointV2Stream"))
)

// grpcutil errors
Expand Down
1 change: 1 addition & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/pd/client
go 1.20

require (
github.com/BurntSushi/toml v0.3.1
github.com/elastic/gosigar v0.14.2
github.com/gogo/protobuf v1.3.2
github.com/opentracing/opentracing-go v1.2.0
Expand Down
1 change: 1 addition & 0 deletions client/go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
31 changes: 22 additions & 9 deletions client/meta_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/meta_storagepb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
)

// MetaStorageClient is the interface for meta storage client.
Expand Down Expand Up @@ -125,7 +125,12 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) (
PrevKv: options.prevKv,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.metaStorageClient().Put(ctx, req)
cli := c.metaStorageClient()
if cli == nil {
cancel()
return nil, errs.ErrClientGetMetaStorageClient
}
resp, err := cli.Put(ctx, req)
cancel()

if err = c.respForMetaStorageErr(cmdFailedDurationPut, start, err, resp.GetHeader()); err != nil {
Expand Down Expand Up @@ -158,7 +163,12 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s
Revision: options.revision,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.metaStorageClient().Get(ctx, req)
cli := c.metaStorageClient()
if cli == nil {
cancel()
return nil, errs.ErrClientGetMetaStorageClient
}
resp, err := cli.Get(ctx, req)
cancel()

if err = c.respForMetaStorageErr(cmdFailedDurationGet, start, err, resp.GetHeader()); err != nil {
Expand All @@ -177,7 +187,11 @@ func (c *client) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan
options.rangeEnd = getPrefix(key)
}

res, err := c.metaStorageClient().Watch(ctx, &meta_storagepb.WatchRequest{
cli := c.metaStorageClient()
if cli == nil {
return nil, errs.ErrClientGetMetaStorageClient
}
res, err := cli.Watch(ctx, &meta_storagepb.WatchRequest{
Key: key,
RangeEnd: options.rangeEnd,
StartRevision: options.revision,
Expand All @@ -190,13 +204,12 @@ func (c *client) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan
go func() {
defer func() {
close(eventCh)
if r := recover(); r != nil {
log.Error("[pd] panic in client `Watch`", zap.Any("error", r))
return
}
}()
for {
resp, err := res.Recv()
failpoint.Inject("watchStreamError", func() {
err = errors.Errorf("fake error")
})
if err != nil {
return
}
Expand Down
66 changes: 33 additions & 33 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const (
// According to the resource control Grafana panel and Prometheus sampling period, the period should be the factor of 15.
defaultTargetPeriod = 5 * time.Second
// defaultMaxWaitDuration is the max duration to wait for the token before throwing error.
defaultMaxWaitDuration = time.Second
defaultMaxWaitDuration = 30 * time.Second
)

const (
Expand All @@ -67,23 +67,27 @@ const (

// Because the resource manager has not been deployed in microservice mode,
// do not enable this function.
defaultDegradedModeWaitDuration = "0s"
defaultDegradedModeWaitDuration = 0
)

// ControllerConfig is the configuration of the resource manager controller which includes some option for client needed.
type ControllerConfig struct {
// Config is the configuration of the resource manager controller which includes some option for client needed.
type Config struct {
// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`
DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`

// LTBMaxWaitDuration is the max wait time duration for local token bucket.
LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`

// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
}

// DefaultControllerConfig returns the default resource manager controller configuration.
func DefaultControllerConfig() *ControllerConfig {
return &ControllerConfig{
DegradedModeWaitDuration: defaultDegradedModeWaitDuration,
// DefaultConfig returns the default resource manager controller configuration.
func DefaultConfig() *Config {
return &Config{
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
RequestUnit: DefaultRequestUnitConfig(),
}
}
Expand Down Expand Up @@ -117,44 +121,40 @@ func DefaultRequestUnitConfig() RequestUnitConfig {
}
}

// Config is the configuration of the resource units, which gives the read/write request
// RUConfig is the configuration of the resource units, which gives the read/write request
// units or request resource cost standards. It should be calculated by a given `RequestUnitConfig`
// or `RequestResourceConfig`.
type Config struct {
type RUConfig struct {
// RU model config
ReadBaseCost RequestUnit
ReadBytesCost RequestUnit
WriteBaseCost RequestUnit
WriteBytesCost RequestUnit
CPUMsCost RequestUnit
// The CPU statistics need to distinguish between different environments.
isSingleGroupByKeyspace bool
maxWaitDuration time.Duration
isSingleGroupByKeyspace bool

// some config for client
LTBMaxWaitDuration time.Duration
DegradedModeWaitDuration time.Duration
}

// DefaultConfig returns the default configuration.
func DefaultConfig() *Config {
return GenerateConfig(
DefaultControllerConfig(),
// DefaultRUConfig returns the default configuration.
func DefaultRUConfig() *RUConfig {
return GenerateRUConfig(
DefaultConfig(),
)
}

// GenerateConfig generates the configuration by the given request unit configuration.
func GenerateConfig(config *ControllerConfig) *Config {
cfg := &Config{
ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost),
ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte),
WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost),
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
maxWaitDuration: defaultMaxWaitDuration,
}
duration, err := time.ParseDuration(config.DegradedModeWaitDuration)
if err != nil {
cfg.DegradedModeWaitDuration, _ = time.ParseDuration(defaultDegradedModeWaitDuration)
} else {
cfg.DegradedModeWaitDuration = duration
// GenerateRUConfig generates the configuration by the given request unit configuration.
func GenerateRUConfig(config *Config) *RUConfig {
return &RUConfig{
ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost),
ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte),
WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost),
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration,
DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration,
}
return cfg
}
Loading

0 comments on commit 02b5952

Please sign in to comment.