From d36d48c1eded90560993158b695104009b73e137 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 24 Aug 2023 12:42:01 +0800 Subject: [PATCH] fix consumption less zero Signed-off-by: Cabinfever_B --- .../resource_group/controller/controller.go | 14 +++---- client/resource_group/controller/model.go | 37 +++++++++++++++++++ pkg/mcs/resourcemanager/server/manager.go | 12 +++--- .../resourcemanager/resource_manager_test.go | 7 ++++ 4 files changed, 57 insertions(+), 13 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 245a9acfe2d..3eb0d847127 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -1070,13 +1070,7 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType if !selected { return nil } - - deltaConsumption := &rmpb.Consumption{} - *deltaConsumption = *gc.run.consumption - sub(deltaConsumption, gc.run.lastRequestConsumption) - req.ConsumptionSinceLastRequest = deltaConsumption - - *gc.run.lastRequestConsumption = *gc.run.consumption + req.ConsumptionSinceLastRequest = updateDeltaConsumption(gc.run.lastRequestConsumption, gc.run.consumption) gc.run.lastRequestTime = time.Now() gc.run.requestInProgress = true return req @@ -1155,6 +1149,12 @@ func (gc *groupCostController) onRequestWait( gc.mu.Lock() sub(gc.mu.consumption, delta) gc.mu.Unlock() + failpoint.Inject("triggerUpdate", func() { + select { + case gc.lowRUNotifyChan <- struct{}{}: + default: + } + }) return nil, nil, err } gc.successfulRequestDuration.Observe(d.Seconds()) diff --git a/client/resource_group/controller/model.go b/client/resource_group/controller/model.go index cf89aa535f9..dedc2ed7359 100644 --- a/client/resource_group/controller/model.go +++ b/client/resource_group/controller/model.go @@ -237,6 +237,43 @@ func add(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) { custom1.KvWriteRpcCount += custom2.KvWriteRpcCount } +func updateDeltaConsumption(last *rmpb.Consumption, now *rmpb.Consumption) *rmpb.Consumption { + delta := &rmpb.Consumption{} + if now.RRU >= last.RRU { + delta.RRU = now.RRU - last.RRU + last.RRU = now.RRU + } + if now.WRU >= last.WRU { + delta.WRU = now.WRU - last.WRU + last.WRU = now.WRU + } + if now.ReadBytes >= last.ReadBytes { + delta.ReadBytes = now.ReadBytes - last.ReadBytes + last.ReadBytes = now.ReadBytes + } + if now.WriteBytes >= last.WriteBytes { + delta.WriteBytes = now.WriteBytes - last.WriteBytes + last.WriteBytes = now.WriteBytes + } + if now.TotalCpuTimeMs >= last.TotalCpuTimeMs { + delta.TotalCpuTimeMs = now.TotalCpuTimeMs - last.TotalCpuTimeMs + last.TotalCpuTimeMs = now.TotalCpuTimeMs + } + if now.SqlLayerCpuTimeMs >= last.SqlLayerCpuTimeMs { + delta.SqlLayerCpuTimeMs = now.SqlLayerCpuTimeMs - last.SqlLayerCpuTimeMs + last.SqlLayerCpuTimeMs = now.SqlLayerCpuTimeMs + } + if now.KvReadRpcCount >= last.KvReadRpcCount { + delta.KvReadRpcCount = now.KvReadRpcCount - last.KvReadRpcCount + last.KvReadRpcCount = now.KvReadRpcCount + } + if now.KvWriteRpcCount >= last.KvWriteRpcCount { + delta.KvWriteRpcCount = now.KvWriteRpcCount - last.KvWriteRpcCount + last.KvWriteRpcCount = now.KvWriteRpcCount + } + return delta +} + func sub(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) { if custom1 == nil || custom2 == nil { return diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index 18d8c1f4d3c..5aee44b9db2 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -320,17 +320,17 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { writeRequestCountMetrics = requestCount.WithLabelValues(name, writeTypeLabel) ) // RU info. - if consumption.RRU != 0 { + if consumption.RRU > 0 { rruMetrics.Add(consumption.RRU) } - if consumption.WRU != 0 { + if consumption.WRU > 0 { wruMetrics.Add(consumption.WRU) } // Byte info. - if consumption.ReadBytes != 0 { + if consumption.ReadBytes > 0 { readByteMetrics.Add(consumption.ReadBytes) } - if consumption.WriteBytes != 0 { + if consumption.WriteBytes > 0 { writeByteMetrics.Add(consumption.WriteBytes) } // CPU time info. @@ -342,10 +342,10 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { kvCPUMetrics.Add(consumption.TotalCpuTimeMs - consumption.SqlLayerCpuTimeMs) } // RPC count info. - if consumption.KvReadRpcCount != 0 { + if consumption.KvReadRpcCount > 0 { readRequestCountMetrics.Add(consumption.KvReadRpcCount) } - if consumption.KvWriteRpcCount != 0 { + if consumption.KvWriteRpcCount > 0 { writeRequestCountMetrics.Add(consumption.KvWriteRpcCount) } diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 6c3a0a39146..909b6fbbbee 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -429,6 +429,13 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { break } } + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)")) + tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0} + wreq := tcs.makeWriteRequest() + _, _, err := controller.OnRequestWait(suite.ctx, suite.initGroups[0].Name, wreq) + re.Error(err) + time.Sleep(time.Millisecond * 500) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate")) controller.Stop() }