Skip to content

Commit

Permalink
Merge branch 'master' of github.com:tikv/pd into rm-exitAfterDefer
Browse files Browse the repository at this point in the history
  • Loading branch information
okJiang committed Jul 17, 2024
2 parents 682443e + 8cc53a5 commit 5b3bca3
Show file tree
Hide file tree
Showing 64 changed files with 1,239 additions and 648 deletions.
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ linters-settings:
excludes:
- G402
- G404
- G601
testifylint:
enable:
- bool-compare
Expand Down
19 changes: 13 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,9 @@ func WithSkipStoreLimit() RegionsOption {

// GetRegionOp represents available options when getting regions.
type GetRegionOp struct {
needBuckets bool
allowFollowerHandle bool
needBuckets bool
allowFollowerHandle bool
outputMustContainAllKeyRange bool
}

// GetRegionOption configures GetRegionOp.
Expand All @@ -231,6 +232,11 @@ func WithAllowFollowerHandle() GetRegionOption {
return func(op *GetRegionOp) { op.allowFollowerHandle = true }
}

// WithOutputMustContainAllKeyRange means the output must contain all key ranges.
func WithOutputMustContainAllKeyRange() GetRegionOption {
return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true }
}

var (
// errUnmatchedClusterID is returned when found a PD with a different cluster ID.
errUnmatchedClusterID = errors.New("[pd] unmatched cluster id")
Expand Down Expand Up @@ -1193,10 +1199,11 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit
pbRanges = append(pbRanges, &pdpb.KeyRange{StartKey: r.StartKey, EndKey: r.EndKey})
}
req := &pdpb.BatchScanRegionsRequest{
Header: c.requestHeader(),
NeedBuckets: options.needBuckets,
Ranges: pbRanges,
Limit: int32(limit),
Header: c.requestHeader(),
NeedBuckets: options.needBuckets,
Ranges: pbRanges,
Limit: int32(limit),
ContainAllKeyRange: options.outputMustContainAllKeyRange,
}
serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
Expand Down
2 changes: 1 addition & 1 deletion client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,5 @@ type ErrClientGetResourceGroup struct {
}

func (e *ErrClientGetResourceGroup) Error() string {
return fmt.Sprintf("get resource group %v failed, %v", e.ResourceGroupName, e.Cause)
return fmt.Sprintf("get resource group %s failed, %s", e.ResourceGroupName, e.Cause)
}
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 h1:V9XS3FQ/P6u+kFaoSyY5DBswIA774BMpIOLDBMrpxKc=
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
169 changes: 125 additions & 44 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
)

const (
defaultResourceGroupName = "default"
controllerConfigPath = "resource_group/controller"
maxNotificationChanLen = 200
needTokensAmplification = 1.1
Expand Down Expand Up @@ -356,23 +357,38 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
if err = proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
if item, ok := c.groupsController.Load(group.Name); ok {
gc := item.(*groupCostController)
name := group.GetName()
gc, ok := c.loadGroupController(name)
if !ok {
continue
}
if !gc.tombstone.Load() {
gc.modifyMeta(group)
continue
}
// If the resource group is marked as tombstone before, re-create the resource group controller.
newGC, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
if err != nil {
log.Warn("[resource group controller] re-create resource group cost controller for tombstone failed",
zap.String("name", name), zap.Error(err))
continue
}
if c.groupsController.CompareAndSwap(name, gc, newGC) {
log.Info("[resource group controller] re-create resource group cost controller for tombstone",
zap.String("name", name))
}
case meta_storagepb.Event_DELETE:
if item.PrevKv != nil {
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue
}
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
resourceGroupStatusGauge.DeleteLabelValues(group.Name, group.Name)
}
} else {
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
if item.PrevKv == nil {
log.Info("[resource group controller] previous key-value pair has been compacted",
zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))
continue
}
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue
}
c.tombstoneGroupCostController(group.GetName())
}
}
case resp, ok := <-watchConfigChannel:
Expand Down Expand Up @@ -420,40 +436,102 @@ func (c *ResourceGroupsController) Stop() error {
return nil
}

// tryGetResourceGroup will try to get the resource group controller from local cache first,
// if the local cache misses, it will then call gRPC to fetch the resource group info from server.
func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name string) (*groupCostController, error) {
// loadGroupController just wraps the `Load` method of `sync.Map`.
func (c *ResourceGroupsController) loadGroupController(name string) (*groupCostController, bool) {
tmp, ok := c.groupsController.Load(name)
if !ok {
return nil, false
}
return tmp.(*groupCostController), true
}

// loadOrStoreGroupController just wraps the `LoadOrStore` method of `sync.Map`.
func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *groupCostController) (*groupCostController, bool) {
tmp, loaded := c.groupsController.LoadOrStore(name, gc)
return tmp.(*groupCostController), loaded
}

// NewResourceGroupNotExistErr returns a new error that indicates the resource group does not exist.
// It's exported for testing.
func NewResourceGroupNotExistErr(name string) error {
return errors.Errorf("%s does not exist", name)
}

// tryGetResourceGroupController will try to get the resource group controller from local cache first.
// If the local cache misses, it will then call gRPC to fetch the resource group info from the remote server.
// If `useTombstone` is true, it will return the resource group controller even if it is marked as tombstone.
func (c *ResourceGroupsController) tryGetResourceGroupController(
ctx context.Context, name string, useTombstone bool,
) (*groupCostController, error) {
// Get from the local cache first.
if tmp, ok := c.groupsController.Load(name); ok {
return tmp.(*groupCostController), nil
gc, ok := c.loadGroupController(name)
if ok {
if !useTombstone && gc.tombstone.Load() {
return nil, NewResourceGroupNotExistErr(name)
}
return gc, nil
}
// Call gRPC to fetch the resource group info.
group, err := c.provider.GetResourceGroup(ctx, name)
if err != nil {
return nil, err
}
if group == nil {
return nil, errors.Errorf("%s does not exists", name)
return nil, NewResourceGroupNotExistErr(name)
}
// Check again to prevent initializing the same resource group concurrently.
if tmp, ok := c.groupsController.Load(name); ok {
gc := tmp.(*groupCostController)
if gc, ok = c.loadGroupController(name); ok {
return gc, nil
}
// Initialize the resource group controller.
gc, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
gc, err = newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
if err != nil {
return nil, err
}
// TODO: re-init the state if user change mode from RU to RAW mode.
gc.initRunState()
// Check again to prevent initializing the same resource group concurrently.
tmp, loaded := c.groupsController.LoadOrStore(group.GetName(), gc)
gc, loaded := c.loadOrStoreGroupController(name, gc)
if !loaded {
resourceGroupStatusGauge.WithLabelValues(name, group.Name).Set(1)
log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName()))
log.Info("[resource group controller] create resource group cost controller", zap.String("name", name))
}
return tmp.(*groupCostController), nil
return gc, nil
}

// Do not delete the resource group immediately to prevent from interrupting the ongoing request,
// mark it as tombstone and create a default resource group controller for it.
func (c *ResourceGroupsController) tombstoneGroupCostController(name string) {
_, ok := c.loadGroupController(name)
if !ok {
return
}
// The default resource group controller should never be deleted.
if name == defaultResourceGroupName {
return
}
// Try to get the default group meta first.
defaultGC, err := c.tryGetResourceGroupController(c.loopCtx, defaultResourceGroupName, false)
if err != nil || defaultGC == nil {
log.Warn("[resource group controller] get default resource group meta for tombstone failed",
zap.String("name", name), zap.Error(err))
// Directly delete the resource group controller if the default group is not available.
c.groupsController.Delete(name)
return
}
// Create a default resource group controller for the tombstone resource group independently.
gc, err := newGroupCostController(defaultGC.getMeta(), c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
if err != nil {
log.Warn("[resource group controller] create default resource group cost controller for tombstone failed",
zap.String("name", name), zap.Error(err))
// Directly delete the resource group controller if the default group controller cannot be created.
c.groupsController.Delete(name)
return
}
gc.tombstone.Store(true)
c.groupsController.Store(name, gc)
// Its metrics will be deleted in the cleanup process.
resourceGroupStatusGauge.WithLabelValues(name, name).Set(2)
log.Info("[resource group controller] default resource group controller cost created for tombstone",
zap.String("name", name))
}

func (c *ResourceGroupsController) cleanUpResourceGroup() {
Expand All @@ -465,14 +543,14 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() {
latestConsumption := *gc.mu.consumption
gc.mu.Unlock()
if equalRU(latestConsumption, *gc.run.consumption) {
if gc.tombstone {
if gc.inactive || gc.tombstone.Load() {
c.groupsController.Delete(resourceGroupName)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName)
return true
}
gc.tombstone = true
gc.inactive = true
} else {
gc.tombstone = false
gc.inactive = false
}
return true
})
Expand All @@ -498,12 +576,11 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB
c.run.inDegradedMode = false
for _, res := range resp {
name := res.GetResourceGroupName()
v, ok := c.groupsController.Load(name)
gc, ok := c.loadGroupController(name)
if !ok {
log.Warn("[resource group controller] a non-existent resource group was found when handle token response", zap.String("name", name))
continue
}
gc := v.(*groupCostController)
gc.handleTokenBucketResponse(res)
}
}
Expand Down Expand Up @@ -561,7 +638,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
func (c *ResourceGroupsController) OnRequestWait(
ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
gc, err := c.tryGetResourceGroupController(ctx, resourceGroupName, true)
if err != nil {
return nil, nil, time.Duration(0), 0, err
}
Expand All @@ -572,18 +649,18 @@ func (c *ResourceGroupsController) OnRequestWait(
func (c *ResourceGroupsController) OnResponse(
resourceGroupName string, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
tmp, ok := c.groupsController.Load(resourceGroupName)
gc, ok := c.loadGroupController(resourceGroupName)
if !ok {
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, nil
}
return tmp.(*groupCostController).onResponse(req, resp)
return gc.onResponse(req, resp)
}

// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
resourceGroupName, requestResource string) bool {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
gc, err := c.tryGetResourceGroupController(ctx, resourceGroupName, false)
if err != nil {
return false
}
Expand All @@ -594,8 +671,7 @@ func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, bg *rmpb.BackgroundSettings, requestResource string) bool {
// fallback to default resource group.
if bg == nil {
resourceGroupName := "default"
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
gc, err := c.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
if err != nil {
return false
}
Expand All @@ -615,7 +691,7 @@ func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context,

// GetResourceGroup returns the meta setting of the given resource group name.
func (c *ResourceGroupsController) GetResourceGroup(resourceGroupName string) (*rmpb.ResourceGroup, error) {
gc, err := c.tryGetResourceGroup(c.loopCtx, resourceGroupName)
gc, err := c.tryGetResourceGroupController(c.loopCtx, resourceGroupName, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -681,7 +757,10 @@ type groupCostController struct {
requestUnitTokens map[rmpb.RequestUnitType]*tokenCounter
}

tombstone bool
// tombstone is set to true when the resource group is deleted.
tombstone atomic.Bool
// inactive is set to true when the resource group has not been updated for a long time.
inactive bool
}

type groupMetricsCollection struct {
Expand Down Expand Up @@ -774,6 +853,8 @@ func newGroupCostController(
gc.mu.consumption = &rmpb.Consumption{}
gc.mu.storeCounter = make(map[uint64]*rmpb.Consumption)
gc.mu.globalCounter = &rmpb.Consumption{}
// TODO: re-init the state if user change mode from RU to RAW mode.
gc.initRunState()
return gc, nil
}

Expand Down Expand Up @@ -1359,14 +1440,14 @@ func (gc *groupCostController) onResponse(
return delta, nil
}

// GetActiveResourceGroup is used to get action resource group.
// GetActiveResourceGroup is used to get active resource group.
// This is used for test only.
func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup {
tmp, ok := c.groupsController.Load(resourceGroupName)
if !ok {
gc, ok := c.loadGroupController(resourceGroupName)
if !ok || gc.tombstone.Load() {
return nil
}
return tmp.(*groupCostController).getMeta()
return gc.getMeta()
}

// This is used for test only.
Expand Down
Loading

0 comments on commit 5b3bca3

Please sign in to comment.