diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 245a9acfe2d..c79bfec1e56 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -1019,6 +1019,9 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType } // collect request resource selected := gc.run.requestInProgress + failpoint.Inject("triggerUpdate", func() { + selected = true + }) switch gc.mode { case rmpb.GroupMode_RawMode: requests := make([]*rmpb.RawResourceItem, 0, len(requestResourceLimitTypeList)) @@ -1070,13 +1073,7 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType if !selected { return nil } - - deltaConsumption := &rmpb.Consumption{} - *deltaConsumption = *gc.run.consumption - sub(deltaConsumption, gc.run.lastRequestConsumption) - req.ConsumptionSinceLastRequest = deltaConsumption - - *gc.run.lastRequestConsumption = *gc.run.consumption + req.ConsumptionSinceLastRequest = updateDeltaConsumption(gc.run.lastRequestConsumption, gc.run.consumption) gc.run.lastRequestTime = time.Now() gc.run.requestInProgress = true return req @@ -1155,6 +1152,9 @@ func (gc *groupCostController) onRequestWait( gc.mu.Lock() sub(gc.mu.consumption, delta) gc.mu.Unlock() + failpoint.Inject("triggerUpdate", func() { + gc.lowRUNotifyChan <- struct{}{} + }) return nil, nil, err } gc.successfulRequestDuration.Observe(d.Seconds()) diff --git a/client/resource_group/controller/model.go b/client/resource_group/controller/model.go index cf89aa535f9..dedc2ed7359 100644 --- a/client/resource_group/controller/model.go +++ b/client/resource_group/controller/model.go @@ -237,6 +237,43 @@ func add(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) { custom1.KvWriteRpcCount += custom2.KvWriteRpcCount } +func updateDeltaConsumption(last *rmpb.Consumption, now *rmpb.Consumption) *rmpb.Consumption { + delta := &rmpb.Consumption{} + if now.RRU >= last.RRU { + delta.RRU = now.RRU - last.RRU + last.RRU = now.RRU + } + if now.WRU >= last.WRU { + delta.WRU = now.WRU - last.WRU + last.WRU = now.WRU + } + if now.ReadBytes >= last.ReadBytes { + delta.ReadBytes = now.ReadBytes - last.ReadBytes + last.ReadBytes = now.ReadBytes + } + if now.WriteBytes >= last.WriteBytes { + delta.WriteBytes = now.WriteBytes - last.WriteBytes + last.WriteBytes = now.WriteBytes + } + if now.TotalCpuTimeMs >= last.TotalCpuTimeMs { + delta.TotalCpuTimeMs = now.TotalCpuTimeMs - last.TotalCpuTimeMs + last.TotalCpuTimeMs = now.TotalCpuTimeMs + } + if now.SqlLayerCpuTimeMs >= last.SqlLayerCpuTimeMs { + delta.SqlLayerCpuTimeMs = now.SqlLayerCpuTimeMs - last.SqlLayerCpuTimeMs + last.SqlLayerCpuTimeMs = now.SqlLayerCpuTimeMs + } + if now.KvReadRpcCount >= last.KvReadRpcCount { + delta.KvReadRpcCount = now.KvReadRpcCount - last.KvReadRpcCount + last.KvReadRpcCount = now.KvReadRpcCount + } + if now.KvWriteRpcCount >= last.KvWriteRpcCount { + delta.KvWriteRpcCount = now.KvWriteRpcCount - last.KvWriteRpcCount + last.KvWriteRpcCount = now.KvWriteRpcCount + } + return delta +} + func sub(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) { if custom1 == nil || custom2 == nil { return diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index 1118e347f59..a9e53f347fa 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -320,17 +320,17 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { writeRequestCountMetrics = requestCount.WithLabelValues(name, writeTypeLabel) ) // RU info. - if consumption.RRU != 0 { + if consumption.RRU > 0 { rruMetrics.Add(consumption.RRU) } - if consumption.WRU != 0 { + if consumption.WRU > 0 { wruMetrics.Add(consumption.WRU) } // Byte info. - if consumption.ReadBytes != 0 { + if consumption.ReadBytes > 0 { readByteMetrics.Add(consumption.ReadBytes) } - if consumption.WriteBytes != 0 { + if consumption.WriteBytes > 0 { writeByteMetrics.Add(consumption.WriteBytes) } // CPU time info. @@ -342,10 +342,10 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { kvCPUMetrics.Add(consumption.TotalCpuTimeMs - consumption.SqlLayerCpuTimeMs) } // RPC count info. - if consumption.KvReadRpcCount != 0 { + if consumption.KvReadRpcCount > 0 { readRequestCountMetrics.Add(consumption.KvReadRpcCount) } - if consumption.KvWriteRpcCount != 0 { + if consumption.KvWriteRpcCount > 0 { writeRequestCountMetrics.Add(consumption.KvWriteRpcCount) } diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index a48d0cda156..e0d295c825a 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -429,6 +429,13 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { break } } + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)")) + tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0} + wreq := tcs.makeWriteRequest() + _, _, err := controller.OnRequestWait(suite.ctx, suite.initGroups[0].Name, wreq) + re.Error(err) + time.Sleep(time.Millisecond * 200) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate")) controller.Stop() }