Skip to content

Commit

Permalink
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jul 15, 2024
2 parents e13e336 + 0c7dc0b commit ff02183
Show file tree
Hide file tree
Showing 84 changed files with 872 additions and 527 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pd-analysis:
pd-heartbeat-bench:
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-heartbeat-bench pd-heartbeat-bench/main.go
simulator:
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-simulator pd-simulator/main.go
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_CGO_ENABLED) go build $(BUILD_FLAGS) -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-simulator pd-simulator/main.go
regions-dump:
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/regions-dump regions-dump/main.go
stores-dump:
Expand Down Expand Up @@ -254,7 +254,7 @@ basic-test: install-tools

ci-test-job: install-tools dashboard-ui pd-ut
@$(FAILPOINT_ENABLE)
./scripts/ci-subtask.sh $(JOB_COUNT) $(JOB_INDEX) || { $(FAILPOINT_DISABLE); exit 1; }
./scripts/ci-subtask.sh $(JOB_INDEX) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

TSO_INTEGRATION_TEST_PKGS := $(PD_PKG)/tests/server/tso
Expand Down
2 changes: 1 addition & 1 deletion client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,5 @@ type ErrClientGetResourceGroup struct {
}

func (e *ErrClientGetResourceGroup) Error() string {
return fmt.Sprintf("get resource group %v failed, %v", e.ResourceGroupName, e.Cause)
return fmt.Sprintf("get resource group %s failed, %s", e.ResourceGroupName, e.Cause)
}
1 change: 1 addition & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncj
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down
2 changes: 1 addition & 1 deletion client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func initMetrics(constLabels prometheus.Labels) {
Subsystem: "request",
Name: "tso_batch_send_latency",
ConstLabels: constLabels,
Buckets: prometheus.ExponentialBuckets(1, 2, 34), // 1ns ~ 8s
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
Help: "tso batch send latency",
})

Expand Down
102 changes: 72 additions & 30 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
)

const (
defaultResourceGroupName = "default"
controllerConfigPath = "resource_group/controller"
maxNotificationChanLen = 200
needTokensAmplification = 1.1
Expand Down Expand Up @@ -330,9 +331,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
case notifyMsg := <-c.lowTokenNotifyChan:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg)
}
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg)
if c.run.inDegradedMode {
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
}
Expand All @@ -358,22 +357,32 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
if err = proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
if item, ok := c.groupsController.Load(group.Name); ok {
gc := item.(*groupCostController)
if gc, ok := c.loadGroupController(group.Name); ok {
gc.modifyMeta(group)
// If the resource group is marked as tombstone before, set it as active again.
if swapped := gc.tombstone.CompareAndSwap(true, false); swapped {
resourceGroupStatusGauge.WithLabelValues(group.Name, gc.name).Set(1)
log.Info("[resource group controller] mark resource group as active", zap.String("name", group.Name))
}
}
case meta_storagepb.Event_DELETE:
if item.PrevKv != nil {
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue
}
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
// Do not delete the resource group immediately, just mark it as tombstone.
// For the requests that are still in progress, fallback to the default resource group.
if gc, ok := c.loadGroupController(group.Name); ok {
gc.tombstone.Store(true)
resourceGroupStatusGauge.DeleteLabelValues(group.Name, group.Name)
resourceGroupStatusGauge.WithLabelValues(group.Name, defaultResourceGroupName).Set(1)
log.Info("[resource group controller] mark resource group as tombstone", zap.String("name", group.Name))
}
} else {
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))
log.Info("[resource group controller] previous key-value pair has been compacted",
zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))
}
}
}
Expand Down Expand Up @@ -422,12 +431,32 @@ func (c *ResourceGroupsController) Stop() error {
return nil
}

// loadGroupController just wraps the `Load` method of `sync.Map`.
func (c *ResourceGroupsController) loadGroupController(name string) (*groupCostController, bool) {
tmp, ok := c.groupsController.Load(name)
if !ok {
return nil, false
}
return tmp.(*groupCostController), true
}

// loadOrStoreGroupController just wraps the `LoadOrStore` method of `sync.Map`.
func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *groupCostController) (*groupCostController, bool) {
tmp, loaded := c.groupsController.LoadOrStore(name, gc)
return tmp.(*groupCostController), loaded
}

// tryGetResourceGroup will try to get the resource group controller from local cache first,
// if the local cache misses, it will then call gRPC to fetch the resource group info from server.
func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name string) (*groupCostController, error) {
// Get from the local cache first.
if tmp, ok := c.groupsController.Load(name); ok {
return tmp.(*groupCostController), nil
gc, ok := c.loadGroupController(name)
if ok {
// If the resource group is marked as tombstone, fallback to the default resource group.
if gc.tombstone.Load() && name != defaultResourceGroupName {
return c.tryGetResourceGroup(ctx, defaultResourceGroupName)
}
return gc, nil
}
// Call gRPC to fetch the resource group info.
group, err := c.provider.GetResourceGroup(ctx, name)
Expand All @@ -438,24 +467,21 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name
return nil, errors.Errorf("%s does not exists", name)
}
// Check again to prevent initializing the same resource group concurrently.
if tmp, ok := c.groupsController.Load(name); ok {
gc := tmp.(*groupCostController)
if gc, ok = c.loadGroupController(name); ok {
return gc, nil
}
// Initialize the resource group controller.
gc, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
gc, err = newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
if err != nil {
return nil, err
}
// TODO: re-init the state if user change mode from RU to RAW mode.
gc.initRunState()
// Check again to prevent initializing the same resource group concurrently.
tmp, loaded := c.groupsController.LoadOrStore(group.GetName(), gc)
gc, loaded := c.loadOrStoreGroupController(group.Name, gc)
if !loaded {
resourceGroupStatusGauge.WithLabelValues(name, group.Name).Set(1)
log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName()))
}
return tmp.(*groupCostController), nil
return gc, nil
}

func (c *ResourceGroupsController) cleanUpResourceGroup() {
Expand All @@ -467,14 +493,15 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() {
latestConsumption := *gc.mu.consumption
gc.mu.Unlock()
if equalRU(latestConsumption, *gc.run.consumption) {
if gc.tombstone {
if gc.inactive || gc.tombstone.Load() {
c.groupsController.Delete(resourceGroupName)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, defaultResourceGroupName)
return true
}
gc.tombstone = true
gc.inactive = true
} else {
gc.tombstone = false
gc.inactive = false
}
return true
})
Expand All @@ -500,12 +527,11 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB
c.run.inDegradedMode = false
for _, res := range resp {
name := res.GetResourceGroupName()
v, ok := c.groupsController.Load(name)
gc, ok := c.loadGroupController(name)
if !ok {
log.Warn("[resource group controller] a non-existent resource group was found when handle token response", zap.String("name", name))
continue
}
gc := v.(*groupCostController)
gc.handleTokenBucketResponse(res)
}
}
Expand Down Expand Up @@ -574,12 +600,16 @@ func (c *ResourceGroupsController) OnRequestWait(
func (c *ResourceGroupsController) OnResponse(
resourceGroupName string, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
tmp, ok := c.groupsController.Load(resourceGroupName)
gc, ok := c.loadGroupController(resourceGroupName)
if !ok {
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, nil
}
return tmp.(*groupCostController).onResponse(req, resp)
// If the resource group is marked as tombstone, fallback to the default resource group.
if gc.tombstone.Load() && resourceGroupName != defaultResourceGroupName {
return c.OnResponse(defaultResourceGroupName, req, resp)
}
return gc.onResponse(req, resp)
}

// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
Expand All @@ -596,8 +626,7 @@ func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, bg *rmpb.BackgroundSettings, requestResource string) bool {
// fallback to default resource group.
if bg == nil {
resourceGroupName := "default"
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
gc, err := c.tryGetResourceGroup(ctx, defaultResourceGroupName)
if err != nil {
return false
}
Expand Down Expand Up @@ -683,7 +712,10 @@ type groupCostController struct {
requestUnitTokens map[rmpb.RequestUnitType]*tokenCounter
}

tombstone bool
// tombstone is set to true when the resource group is deleted.
tombstone atomic.Bool
// inactive is set to true when the resource group has not been updated for a long time.
inactive bool
}

type groupMetricsCollection struct {
Expand Down Expand Up @@ -776,6 +808,8 @@ func newGroupCostController(
gc.mu.consumption = &rmpb.Consumption{}
gc.mu.storeCounter = make(map[uint64]*rmpb.Consumption)
gc.mu.globalCounter = &rmpb.Consumption{}
// TODO: re-init the state if user change mode from RU to RAW mode.
gc.initRunState()
return gc, nil
}

Expand Down Expand Up @@ -1179,11 +1213,19 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType
switch selectTyp {
case periodicReport:
selected = selected || gc.shouldReportConsumption()
failpoint.Inject("triggerPeriodicReport", func(val failpoint.Value) {
selected = gc.name == val.(string)
})
fallthrough
case lowToken:
if counter.limiter.IsLowTokens() {
selected = true
}
failpoint.Inject("triggerLowRUReport", func(val failpoint.Value) {
if selectTyp == lowToken {
selected = gc.name == val.(string)
}
})
}
request := &rmpb.RequestUnitItem{
Type: typ,
Expand Down Expand Up @@ -1353,14 +1395,14 @@ func (gc *groupCostController) onResponse(
return delta, nil
}

// GetActiveResourceGroup is used to get action resource group.
// GetActiveResourceGroup is used to get active resource group.
// This is used for test only.
func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup {
tmp, ok := c.groupsController.Load(resourceGroupName)
if !ok {
gc, ok := c.loadGroupController(resourceGroupName)
if !ok || gc.tombstone.Load() {
return nil
}
return tmp.(*groupCostController).getMeta()
return gc.getMeta()
}

// This is used for test only.
Expand Down
Loading

0 comments on commit ff02183

Please sign in to comment.