Skip to content

Commit

Permalink
Merge branch 'master' into fix_simulator_race
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jul 9, 2024
2 parents dff1346 + bdbe73e commit 170ddde
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 21 deletions.
1 change: 1 addition & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncj
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down
2 changes: 1 addition & 1 deletion client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func initMetrics(constLabels prometheus.Labels) {
Subsystem: "request",
Name: "tso_batch_send_latency",
ConstLabels: constLabels,
Buckets: prometheus.ExponentialBuckets(1, 2, 34), // 1ns ~ 8s
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
Help: "tso batch send latency",
})

Expand Down
12 changes: 9 additions & 3 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
case notifyMsg := <-c.lowTokenNotifyChan:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg)
}
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg)
if c.run.inDegradedMode {
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
}
Expand Down Expand Up @@ -1179,11 +1177,19 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType
switch selectTyp {
case periodicReport:
selected = selected || gc.shouldReportConsumption()
failpoint.Inject("triggerPeriodicReport", func(val failpoint.Value) {
selected = gc.name == val.(string)
})
fallthrough
case lowToken:
if counter.limiter.IsLowTokens() {
selected = true
}
failpoint.Inject("triggerLowRUReport", func(val failpoint.Value) {
if selectTyp == lowToken {
selected = gc.name == val.(string)
}
})
}
request := &rmpb.RequestUnitItem{
Type: typ,
Expand Down
139 changes: 139 additions & 0 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/meta_storagepb"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
)

Expand Down Expand Up @@ -132,3 +136,138 @@ func TestResourceGroupThrottledError(t *testing.T) {
re.Error(err)
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))
}

// MockResourceGroupProvider is a mock implementation of the ResourceGroupProvider interface.
type MockResourceGroupProvider struct {
mock.Mock
}

func (m *MockResourceGroupProvider) GetResourceGroup(ctx context.Context, resourceGroupName string, opts ...pd.GetResourceGroupOption) (*rmpb.ResourceGroup, error) {
args := m.Called(ctx, resourceGroupName, opts)
return args.Get(0).(*rmpb.ResourceGroup), args.Error(1)
}

func (m *MockResourceGroupProvider) ListResourceGroups(ctx context.Context, opts ...pd.GetResourceGroupOption) ([]*rmpb.ResourceGroup, error) {
args := m.Called(ctx, opts)
return args.Get(0).([]*rmpb.ResourceGroup), args.Error(1)
}

func (m *MockResourceGroupProvider) AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) {
args := m.Called(ctx, metaGroup)
return args.String(0), args.Error(1)
}

func (m *MockResourceGroupProvider) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) {
args := m.Called(ctx, metaGroup)
return args.String(0), args.Error(1)
}

func (m *MockResourceGroupProvider) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) {
args := m.Called(ctx, resourceGroupName)
return args.String(0), args.Error(1)
}

func (m *MockResourceGroupProvider) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) {
args := m.Called(ctx, request)
return args.Get(0).([]*rmpb.TokenBucketResponse), args.Error(1)
}

func (m *MockResourceGroupProvider) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) {
args := m.Called(ctx)
return args.Get(0).([]*rmpb.ResourceGroup), args.Get(1).(int64), args.Error(2)
}

func (m *MockResourceGroupProvider) Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) {
args := m.Called(ctx, key, opts)
return args.Get(0).(chan []*meta_storagepb.Event), args.Error(1)
}

func (m *MockResourceGroupProvider) Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) {
args := m.Called(ctx, key, opts)
return args.Get(0).(*meta_storagepb.GetResponse), args.Error(1)
}

func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockProvider := new(MockResourceGroupProvider)

mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil)
// LoadResourceGroups
mockProvider.On("LoadResourceGroups", mock.Anything).Return([]*rmpb.ResourceGroup{}, int64(0), nil)
// Watch
mockProvider.On("Watch", mock.Anything, mock.Anything, mock.Anything).Return(make(chan []*meta_storagepb.Event), nil)

re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", "default")))
defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport")
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport", fmt.Sprintf("return(\"%s\")", "test-group")))
defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport")

controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil)
controller.Start(ctx)

defaultResourceGroup := &rmpb.ResourceGroup{Name: "default", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
testResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
mockProvider.On("GetResourceGroup", mock.Anything, "default", mock.Anything).Return(defaultResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil)

c1, err := controller.tryGetResourceGroup(ctx, "default")
re.NoError(err)
re.Equal(defaultResourceGroup, c1.meta)

c2, err := controller.tryGetResourceGroup(ctx, "test-group")
re.NoError(err)
re.Equal(testResourceGroup, c2.meta)

var expectResp []*rmpb.TokenBucketResponse
recTestGroupAcquireTokenRequest := make(chan bool)
mockProvider.On("AcquireTokenBuckets", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
request := args.Get(1).(*rmpb.TokenBucketsRequest)
var responses []*rmpb.TokenBucketResponse
for _, req := range request.Requests {
if req.ResourceGroupName == "default" {
// no response the default group request, that's mean `len(c.run.currentRequests) != 0` always.
time.Sleep(100 * time.Second)
responses = append(responses, &rmpb.TokenBucketResponse{
ResourceGroupName: "default",
GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{
{
GrantedTokens: &rmpb.TokenBucket{
Tokens: 100000,
},
},
},
})
} else {
responses = append(responses, &rmpb.TokenBucketResponse{
ResourceGroupName: req.ResourceGroupName,
GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{
{
GrantedTokens: &rmpb.TokenBucket{
Tokens: 100000,
},
},
},
})
}
}
// receive test-group request
if len(request.Requests) == 1 && request.Requests[0].ResourceGroupName == "test-group" {
recTestGroupAcquireTokenRequest <- true
}
expectResp = responses
}).Return(expectResp, nil)
// wait default group request token by PeriodicReport.
time.Sleep(2 * time.Second)
counter := c2.run.requestUnitTokens[0]
counter.limiter.mu.Lock()
counter.limiter.notify()
counter.limiter.mu.Unlock()
select {
case res := <-recTestGroupAcquireTokenRequest:
re.True(res)
case <-time.After(5 * time.Second):
re.Fail("timeout")
}
}
15 changes: 14 additions & 1 deletion client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ var (
t8 = t0.Add(time.Duration(8) * d)
)

func resetTime() {
t0 = time.Now()
t1 = t0.Add(time.Duration(1) * d)
t2 = t0.Add(time.Duration(2) * d)
t3 = t0.Add(time.Duration(3) * d)
t4 = t0.Add(time.Duration(4) * d)
t5 = t0.Add(time.Duration(5) * d)
t6 = t0.Add(time.Duration(6) * d)
t7 = t0.Add(time.Duration(7) * d)
t8 = t0.Add(time.Duration(8) * d)
}

type request struct {
t time.Time
n float64
Expand Down Expand Up @@ -144,6 +156,7 @@ func TestNotify(t *testing.T) {
}

func TestCancel(t *testing.T) {
resetTime()
ctx := context.Background()
ctx1, cancel1 := context.WithDeadline(ctx, t2)
re := require.New(t)
Expand All @@ -161,8 +174,8 @@ func TestCancel(t *testing.T) {
checkTokens(re, lim1, t2, 7)
checkTokens(re, lim2, t2, 2)
d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2})
re.Equal(4*time.Second, d)
re.Error(err)
re.Equal(4*time.Second, d)
re.Contains(err.Error(), "estimated wait time 4s, ltb state is 1.00:-4.00")
checkTokens(re, lim1, t3, 13)
checkTokens(re, lim2, t3, 3)
Expand Down
4 changes: 2 additions & 2 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *pdTSOStream) processRequests(
}
return
}
tsoBatchSendLatency.Observe(float64(time.Since(batchStartTime)))
tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds())
resp, err := s.stream.Recv()
if err != nil {
if err == io.EOF {
Expand Down Expand Up @@ -195,7 +195,7 @@ func (s *tsoTSOStream) processRequests(
}
return
}
tsoBatchSendLatency.Observe(float64(time.Since(batchStartTime)))
tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds())
resp, err := s.stream.Recv()
if err != nil {
if err == io.EOF {
Expand Down
10 changes: 9 additions & 1 deletion metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -1655,12 +1655,20 @@
],
"targets": [
{
"expr": "pd_cluster_placement_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}",
"expr": "sum(pd_cluster_placement_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}) by (name)",
"format": "time_series",
"hide": false,
"intervalFactor": 2,
"legendFormat": "{{name}}",
"refId": "A"
},
{
"expr": "pd_cluster_placement_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}",
"format": "time_series",
"hide": true,
"intervalFactor": 2,
"legendFormat": "{{name}}--{{store}}",
"refId": "B"
}
],
"timeFrom": "1s",
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var (
Subsystem: "cluster",
Name: "placement_status",
Help: "Status of the cluster placement.",
}, []string{"type", "name"})
}, []string{"type", "name", "store"})

configStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down
12 changes: 7 additions & 5 deletions pkg/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type storeStatistics struct {
LeaderCount int
LearnerCount int
WitnessCount int
LabelCounter map[string]int
LabelCounter map[string][]uint64
Preparing int
Serving int
Removing int
Expand All @@ -57,7 +57,7 @@ type storeStatistics struct {
func newStoreStatistics(opt config.ConfProvider) *storeStatistics {
return &storeStatistics{
opt: opt,
LabelCounter: make(map[string]int),
LabelCounter: make(map[string][]uint64),
}
}

Expand All @@ -70,7 +70,7 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) {
key := fmt.Sprintf("%s:%s", k, v)
// exclude tombstone
if !store.IsRemoved() {
s.LabelCounter[key]++
s.LabelCounter[key] = append(s.LabelCounter[key], store.GetID())
}
}
storeAddress := store.GetAddress()
Expand Down Expand Up @@ -249,8 +249,10 @@ func (s *storeStatistics) Collect() {
configStatusGauge.WithLabelValues(typ).Set(value)
}

for name, value := range s.LabelCounter {
placementStatusGauge.WithLabelValues(labelType, name).Set(float64(value))
for name, stores := range s.LabelCounter {
for _, storeID := range stores {
placementStatusGauge.WithLabelValues(labelType, name, strconv.FormatUint(storeID, 10)).Set(1)
}
}

for storeID, limit := range s.opt.GetStoresLimit() {
Expand Down
14 changes: 8 additions & 6 deletions pkg/statistics/store_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,14 @@ func TestStoreStatistics(t *testing.T) {
re.Equal(0, stats.Disconnect)
re.Equal(1, stats.Tombstone)
re.Equal(1, stats.LowSpace)
re.Equal(2, stats.LabelCounter["zone:z1"])
re.Equal(2, stats.LabelCounter["zone:z2"])
re.Equal(2, stats.LabelCounter["zone:z3"])
re.Equal(4, stats.LabelCounter["host:h1"])
re.Equal(4, stats.LabelCounter["host:h2"])
re.Equal(2, stats.LabelCounter["zone:unknown"])
re.Len(stats.LabelCounter["zone:z1"], 2)
re.Equal([]uint64{1, 2}, stats.LabelCounter["zone:z1"])
re.Len(stats.LabelCounter["zone:z2"], 2)
re.Len(stats.LabelCounter["zone:z3"], 2)
re.Len(stats.LabelCounter["host:h1"], 4)
re.Equal([]uint64{1, 3, 5, 7}, stats.LabelCounter["host:h1"])
re.Len(stats.LabelCounter["host:h2"], 4)
re.Len(stats.LabelCounter["zone:unknown"], 2)
}

func TestSummaryStoreInfos(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
Subsystem: "scheduler",
Name: "region_heartbeat_latency_seconds",
Help: "Bucketed histogram of latency (s) of receiving heartbeat.",
Buckets: prometheus.ExponentialBuckets(1, 2, 12),
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
}, []string{"address", "store"})

metadataGauge = prometheus.NewGaugeVec(
Expand Down

0 comments on commit 170ddde

Please sign in to comment.