diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 018afe1d6a9..e6d314c2e00 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -1218,7 +1218,6 @@ }, "yaxes": [ { - "$$hashKey": "object:192", "format": "short", "label": null, "logBase": 1, @@ -1227,7 +1226,6 @@ "show": true }, { - "$$hashKey": "object:193", "format": "short", "label": null, "logBase": 1, @@ -11431,7 +11429,6 @@ "renderer": "flot", "seriesOverrides": [ { - "$$hashKey": "object:1147", "alias": "WaitRegionsLock", "bars": false, "lines": true, @@ -11439,7 +11436,6 @@ "stack": false }, { - "$$hashKey": "object:1251", "alias": "WaitSubRegionsLock", "bars": false, "lines": true, @@ -11486,14 +11482,12 @@ }, "yaxes": [ { - "$$hashKey": "object:322", "format": "s", "logBase": 1, "min": "0", "show": true }, { - "$$hashKey": "object:323", "format": "s", "logBase": 1, "show": true @@ -11606,10 +11600,15 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The count of the corresponding schedule commands which PD sends to each TiKV instance", + "description": "The count of the heartbeats which pending in the task queue.", "editable": true, "error": false, + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, "fill": 0, + "fillGradient": 0, "grid": {}, "gridPos": { "h": 8, @@ -11617,6 +11616,236 @@ "x": 12, "y": 39 }, + "hiddenSeries": false, + "id": 1608, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "options": { + "alertThreshold": true + }, + "paceLength": 10, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "pd_ratelimit_runner_task_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{task_type}}_({{runner_name}})", + "refId": "A", + "step": 4 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Heartbeat Runner Pending Task", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "opm", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The count of the heartbeats which faileds in the task queue.", + "editable": true, + "error": false, + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 0, + "fillGradient": 0, + "grid": {}, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 47 + }, + "hiddenSeries": false, + "id": 1609, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "options": { + "alertThreshold": true + }, + "paceLength": 10, + "percentage": false, + "pluginVersion": "7.5.17", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "/max-wait-duration.*/", + "bars": true, + "lines": false, + "transform": "negative-Y", + "yaxis": 2 + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "rate(pd_ratelimit_runner_task_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "failed-tasks-({{runner_name}})", + "refId": "A", + "step": 4 + }, + { + "exemplar": true, + "expr": "pd_ratelimit_runner_task_max_waiting_duration_seconds{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", + "hide": false, + "interval": "", + "legendFormat": "max-wait-duration-({{runner_name}})", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Concurrent Runner Failed Task", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": null, + "format": "opm", + "label": "", + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The count of the corresponding schedule commands which PD sends to each TiKV instance", + "editable": true, + "error": false, + "fill": 0, + "grid": {}, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 47 + }, "id": 1305, "legend": { "alignAsTable": true, @@ -11709,7 +11938,7 @@ "h": 8, "w": 12, "x": 0, - "y": 47 + "y": 55 }, "id": 1306, "legend": { @@ -11799,7 +12028,7 @@ "h": 8, "w": 12, "x": 12, - "y": 47 + "y": 55 }, "id": 1307, "legend": { @@ -11892,7 +12121,7 @@ "h": 8, "w": 12, "x": 0, - "y": 55 + "y": 63 }, "id": 1308, "legend": { @@ -11989,7 +12218,7 @@ "h": 8, "w": 12, "x": 12, - "y": 55 + "y": 63 }, "id": 1309, "legend": { @@ -12086,7 +12315,7 @@ "h": 8, "w": 12, "x": 0, - "y": 63 + "y": 71 }, "id": 1310, "legend": { @@ -12183,7 +12412,7 @@ "h": 8, "w": 12, "x": 12, - "y": 63 + "y": 71 }, "id": 1311, "legend": { @@ -12280,7 +12509,7 @@ "h": 8, "w": 12, "x": 0, - "y": 71 + "y": 79 }, "id": 1312, "legend": { diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go index 3c5020554a8..5d4443a1cc4 100644 --- a/pkg/ratelimit/metrics.go +++ b/pkg/ratelimit/metrics.go @@ -18,7 +18,10 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -const nameStr = "runner_name" +const ( + nameStr = "runner_name" + taskStr = "task_type" +) var ( RunnerTaskMaxWaitingDuration = prometheus.NewGaugeVec( @@ -35,7 +38,7 @@ var ( Subsystem: "ratelimit", Name: "runner_task_pending_tasks", Help: "The number of pending tasks in the runner.", - }, []string{nameStr}) + }, []string{nameStr, taskStr}) RunnerTaskFailedTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 44ee54971f5..bfa1bf1865f 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -65,6 +65,7 @@ type ConcurrentRunner struct { pendingMu sync.Mutex stopChan chan struct{} wg sync.WaitGroup + pendingTaskCount map[string]int64 failedTaskCount prometheus.Counter maxWaitingDuration prometheus.Gauge } @@ -78,6 +79,7 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur taskChan: make(chan *Task), pendingTasks: make([]*Task, 0, initialCapacity), failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name), + pendingTaskCount: make(map[string]int64), maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s @@ -128,6 +130,9 @@ func (cr *ConcurrentRunner) Start() { if len(cr.pendingTasks) > 0 { maxDuration = time.Since(cr.pendingTasks[0].submittedAt) } + for name, cnt := range cr.pendingTaskCount { + RunnerTaskPendingTasks.WithLabelValues(cr.name, name).Set(float64(cnt)) + } cr.pendingMu.Unlock() cr.maxWaitingDuration.Set(maxDuration.Seconds()) } @@ -151,6 +156,7 @@ func (cr *ConcurrentRunner) processPendingTasks() { select { case cr.taskChan <- task: cr.pendingTasks = cr.pendingTasks[1:] + cr.pendingTaskCount[task.Opts.TaskName]-- return default: return @@ -191,6 +197,7 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, f func(context.Context) } task.submittedAt = time.Now() cr.pendingTasks = append(cr.pendingTasks, task) + cr.pendingTaskCount[taskOpts.TaskName]++ } return nil } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 69c3f46d21e..42a1e94849c 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -107,8 +107,8 @@ const ( minSnapshotDurationSec = 5 // heartbeat relative const - heartbeatTaskRunner = "heartbeat-async-task-runner" - logTaskRunner = "log-async-task-runner" + heartbeatTaskRunner = "heartbeat-async" + logTaskRunner = "log-async" ) // Server is the interface for cluster.