Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_control: dynamically change the controller config (#7042) #7495

Merged
3 changes: 2 additions & 1 deletion client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ install-tools:
cd .. && $(MAKE) install-tools

static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
@ golangci-lint run -c ../.golangci.yml --verbose ./... --allow-parallel-runners
@ golangci-lint run -c ../.golangci.yml --verbose ./...
@ echo "revive ..."
@ revive -formatter friendly -config ../revive.toml ./...

Expand Down
29 changes: 16 additions & 13 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,27 @@ const (
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
// RetryTimeoutErr indicates the request is timeout.
// RetryTimeoutErr indicates the server is busy.
RetryTimeoutErr = "retry timeout"
)

// client errors
var (
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetLeader = errors.Normalize("get leader from %v error", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientGetMetaStorageClient = errors.Normalize("failed to get meta storage client", errors.RFCCodeText("PD:client:ErrClientGetMetaStorageClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetLeader = errors.Normalize("get leader from %v error", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID"))
ErrClientWatchGCSafePointV2Stream = errors.Normalize("watch gc safe point v2 stream failed, %s", errors.RFCCodeText("PD:client:ErrClientWatchGCSafePointV2Stream"))
)

// grpcutil errors
Expand Down
1 change: 1 addition & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/pd/client
go 1.20

require (
github.com/BurntSushi/toml v0.3.1
github.com/elastic/gosigar v0.14.2
github.com/gogo/protobuf v1.3.2
github.com/opentracing/opentracing-go v1.2.0
Expand Down
1 change: 1 addition & 0 deletions client/go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
31 changes: 22 additions & 9 deletions client/meta_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/meta_storagepb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
)

// MetaStorageClient is the interface for meta storage client.
Expand Down Expand Up @@ -125,7 +125,12 @@
PrevKv: options.prevKv,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.metaStorageClient().Put(ctx, req)
cli := c.metaStorageClient()
if cli == nil {
cancel()
return nil, errs.ErrClientGetMetaStorageClient

Check warning on line 131 in client/meta_storage_client.go

View check run for this annotation

Codecov / codecov/patch

client/meta_storage_client.go#L130-L131

Added lines #L130 - L131 were not covered by tests
}
resp, err := cli.Put(ctx, req)
cancel()

if err = c.respForMetaStorageErr(cmdFailedDurationPut, start, err, resp.GetHeader()); err != nil {
Expand Down Expand Up @@ -158,7 +163,12 @@
Revision: options.revision,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.metaStorageClient().Get(ctx, req)
cli := c.metaStorageClient()
if cli == nil {
cancel()
return nil, errs.ErrClientGetMetaStorageClient

Check warning on line 169 in client/meta_storage_client.go

View check run for this annotation

Codecov / codecov/patch

client/meta_storage_client.go#L168-L169

Added lines #L168 - L169 were not covered by tests
}
resp, err := cli.Get(ctx, req)
cancel()

if err = c.respForMetaStorageErr(cmdFailedDurationGet, start, err, resp.GetHeader()); err != nil {
Expand All @@ -177,7 +187,11 @@
options.rangeEnd = getPrefix(key)
}

res, err := c.metaStorageClient().Watch(ctx, &meta_storagepb.WatchRequest{
cli := c.metaStorageClient()
if cli == nil {
return nil, errs.ErrClientGetMetaStorageClient

Check warning on line 192 in client/meta_storage_client.go

View check run for this annotation

Codecov / codecov/patch

client/meta_storage_client.go#L192

Added line #L192 was not covered by tests
}
res, err := cli.Watch(ctx, &meta_storagepb.WatchRequest{
Key: key,
RangeEnd: options.rangeEnd,
StartRevision: options.revision,
Expand All @@ -190,13 +204,12 @@
go func() {
defer func() {
close(eventCh)
if r := recover(); r != nil {
log.Error("[pd] panic in client `Watch`", zap.Any("error", r))
return
}
}()
for {
resp, err := res.Recv()
failpoint.Inject("watchStreamError", func() {
err = errors.Errorf("fake error")
})

Check warning on line 212 in client/meta_storage_client.go

View check run for this annotation

Codecov / codecov/patch

client/meta_storage_client.go#L211-L212

Added lines #L211 - L212 were not covered by tests
if err != nil {
return
}
Expand Down
66 changes: 33 additions & 33 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const (
// According to the resource control Grafana panel and Prometheus sampling period, the period should be the factor of 15.
defaultTargetPeriod = 5 * time.Second
// defaultMaxWaitDuration is the max duration to wait for the token before throwing error.
defaultMaxWaitDuration = time.Second
defaultMaxWaitDuration = 30 * time.Second
)

const (
Expand All @@ -67,23 +67,27 @@ const (

// Because the resource manager has not been deployed in microservice mode,
// do not enable this function.
defaultDegradedModeWaitDuration = "0s"
defaultDegradedModeWaitDuration = 0
)

// ControllerConfig is the configuration of the resource manager controller which includes some option for client needed.
type ControllerConfig struct {
// Config is the configuration of the resource manager controller which includes some option for client needed.
type Config struct {
// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`
DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`

// LTBMaxWaitDuration is the max wait time duration for local token bucket.
LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`

// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
}

// DefaultControllerConfig returns the default resource manager controller configuration.
func DefaultControllerConfig() *ControllerConfig {
return &ControllerConfig{
DegradedModeWaitDuration: defaultDegradedModeWaitDuration,
// DefaultConfig returns the default resource manager controller configuration.
func DefaultConfig() *Config {
return &Config{
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
RequestUnit: DefaultRequestUnitConfig(),
}
}
Expand Down Expand Up @@ -117,44 +121,40 @@ func DefaultRequestUnitConfig() RequestUnitConfig {
}
}

// Config is the configuration of the resource units, which gives the read/write request
// RUConfig is the configuration of the resource units, which gives the read/write request
// units or request resource cost standards. It should be calculated by a given `RequestUnitConfig`
// or `RequestResourceConfig`.
type Config struct {
type RUConfig struct {
// RU model config
ReadBaseCost RequestUnit
ReadBytesCost RequestUnit
WriteBaseCost RequestUnit
WriteBytesCost RequestUnit
CPUMsCost RequestUnit
// The CPU statistics need to distinguish between different environments.
isSingleGroupByKeyspace bool
maxWaitDuration time.Duration
isSingleGroupByKeyspace bool

// some config for client
LTBMaxWaitDuration time.Duration
DegradedModeWaitDuration time.Duration
}

// DefaultConfig returns the default configuration.
func DefaultConfig() *Config {
return GenerateConfig(
DefaultControllerConfig(),
// DefaultRUConfig returns the default configuration.
func DefaultRUConfig() *RUConfig {
return GenerateRUConfig(
DefaultConfig(),
)
}

// GenerateConfig generates the configuration by the given request unit configuration.
func GenerateConfig(config *ControllerConfig) *Config {
cfg := &Config{
ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost),
ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte),
WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost),
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
maxWaitDuration: defaultMaxWaitDuration,
}
duration, err := time.ParseDuration(config.DegradedModeWaitDuration)
if err != nil {
cfg.DegradedModeWaitDuration, _ = time.ParseDuration(defaultDegradedModeWaitDuration)
} else {
cfg.DegradedModeWaitDuration = duration
// GenerateRUConfig generates the configuration by the given request unit configuration.
func GenerateRUConfig(config *Config) *RUConfig {
return &RUConfig{
ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost),
ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte),
WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost),
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration,
DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration,
}
return cfg
}
Loading
Loading