From 78151ff7c6e464bb91a4372f38891da610f95802 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Mon, 23 Sep 2024 11:11:25 +0800 Subject: [PATCH] topsql: add backend of group by table or db (#262) close pingcap/ng-monitoring#266, ref pingcap/tidb#55540 --- component/topsql/query/default_query.go | 90 +++++++++++++++++ component/topsql/query/model.go | 23 +++++ component/topsql/query/query.go | 1 + component/topsql/service/http.go | 83 ++++++++++++---- component/topsql/service/pools.go | 23 ++++- component/topsql/topsql_test.go | 122 ++++++++++++++++++++++++ 6 files changed, 321 insertions(+), 21 deletions(-) diff --git a/component/topsql/query/default_query.go b/component/topsql/query/default_query.go index 74e6a3e..82dc6b3 100644 --- a/component/topsql/query/default_query.go +++ b/component/topsql/query/default_query.go @@ -29,6 +29,15 @@ var ( sumMapP = sumMapPool{} ) +const ( + // AggLevelQuery is the query level aggregation + AggLevelQuery = "query" + // AggLevelTable is the table level aggregation + AggLevelTable = "table" + // AggLevelDB is the db level aggregation + AggLevelDB = "db" +) + type DefaultQuery struct { vmselectHandler http.HandlerFunc documentDB *genji.DB @@ -69,6 +78,52 @@ func (dq *DefaultQuery) Records(name string, startSecs, endSecs, windowSecs, top }) } +func (dq *DefaultQuery) SummaryBy(startSecs, endSecs, windowSecs, top int, instance, instanceType, aggBy string, fill *[]SummaryByItem) error { + if startSecs > endSecs { + return nil + } + + // adjust start to make result align to end + alignStartSecs := endSecs - (endSecs-startSecs)/windowSecs*windowSecs + + var recordsResponse recordsMetricRespV2 + if err := dq.fetchRecordsFromTSDBBy(store.MetricNameCPUTime, alignStartSecs, endSecs, windowSecs, instance, instanceType, aggBy, top, &recordsResponse); err != nil { + return err + } + if len(recordsResponse.Data.Results) == 0 { + return nil + } + + for _, result := range recordsResponse.Data.Results { + text := result.Metric[aggBy].(string) + sumItem := SummaryByItem{ + Text: text, + } + if text == "other" { + sumItem.IsOther = true + } + valueSum := uint64(0) + for _, value := range result.Values { + if len(value) != 2 { + continue + } + + ts := uint64(value[0].(float64)) + v, err := strconv.ParseUint(value[1].(string), 10, 64) + if err != nil { + continue + } + + valueSum += v + sumItem.TimestampSec = append(sumItem.TimestampSec, ts) + sumItem.CPUTimeMs = append(sumItem.CPUTimeMs, v) + } + sumItem.CPUTimeMsSum = valueSum + *fill = append(*fill, sumItem) + } + return nil +} + func (dq *DefaultQuery) Summary(startSecs, endSecs, windowSecs, top int, instance, instanceType string, fill *[]SummaryItem) error { if startSecs > endSecs { return nil @@ -303,6 +358,41 @@ func (dq *DefaultQuery) fetchRecordsFromTSDB(name string, startSecs int, endSecs return json.Unmarshal(respR.Body.Bytes(), metricResponse) } +func (dq *DefaultQuery) fetchRecordsFromTSDBBy(name string, startSecs int, endSecs int, windowSecs int, instance, instanceType, by string, top int, metricResponse *recordsMetricRespV2) error { + if dq.vmselectHandler == nil { + return fmt.Errorf("empty query handler") + } + + bufResp := bytesP.Get() + header := headerP.Get() + + defer bytesP.Put(bufResp) + defer headerP.Put(header) + + req, err := http.NewRequest("GET", "/api/v1/query_range", nil) + if err != nil { + return err + } + reqQuery := req.URL.Query() + reqQuery.Set("query", fmt.Sprintf("topk_avg(%d, sum(sum_over_time(%s{instance=\"%s\", instance_type=\"%s\"}[%d])) by (%s), \"%s=other\")", top, name, instance, instanceType, windowSecs, by, by)) + reqQuery.Set("start", strconv.Itoa(startSecs)) + reqQuery.Set("end", strconv.Itoa(endSecs)) + reqQuery.Set("step", strconv.Itoa(windowSecs)) + reqQuery.Set("nocache", "1") + req.URL.RawQuery = reqQuery.Encode() + req.Header.Set("Accept", "application/json") + + respR := utils.NewRespWriter(bufResp, header) + dq.vmselectHandler(&respR, req) + + if statusOK := respR.Code >= 200 && respR.Code < 300; !statusOK { + errStr := respR.Body.String() + log.Warn("failed to fetch timeseries db", zap.String("error", errStr)) + return errors.New(errStr) + } + return json.Unmarshal(respR.Body.Bytes(), metricResponse) +} + func (dq *DefaultQuery) fetchInstancesFromTSDB(startSecs, endSecs int, fill *[]InstanceItem) error { if dq.vmselectHandler == nil { return fmt.Errorf("empty query handler") diff --git a/component/topsql/query/model.go b/component/topsql/query/model.go index 2969fbb..167046a 100644 --- a/component/topsql/query/model.go +++ b/component/topsql/query/model.go @@ -26,6 +26,14 @@ type RecordPlanItem struct { SQLDurationCount []uint64 `json:"sql_duration_count,omitempty"` } +type SummaryByItem struct { + Text string `json:"text"` + TimestampSec []uint64 `json:"timestamp_sec"` + CPUTimeMs []uint64 `json:"cpu_time_ms,omitempty"` + CPUTimeMsSum uint64 `json:"cpu_time_ms_sum"` + IsOther bool `json:"is_other"` +} + type SummaryItem struct { SQLDigest string `json:"sql_digest"` SQLText string `json:"sql_text"` @@ -76,6 +84,21 @@ type recordsMetricRespDataResultMetric struct { PlanDigest string `json:"plan_digest"` } +type recordsMetricRespV2 struct { + Status string `json:"status"` + Data recordsMetricRespDataV2 `json:"data"` +} + +type recordsMetricRespDataV2 struct { + ResultType string `json:"resultType"` + Results []recordsMetricRespDataResultV2 `json:"result"` +} + +type recordsMetricRespDataResultV2 struct { + Metric map[string]interface{} `json:"metric"` + Values []recordsMetricRespDataResultValue `json:"values"` +} + type recordsMetricRespDataResultValue = []interface{} type instancesMetricResp struct { diff --git a/component/topsql/query/query.go b/component/topsql/query/query.go index a6b8799..c088de5 100644 --- a/component/topsql/query/query.go +++ b/component/topsql/query/query.go @@ -3,6 +3,7 @@ package query type Query interface { Records(name string, startSecs, endSecs, windowSecs, top int, instance, instanceType string, fill *[]RecordItem) error Summary(startSecs, endSecs, windowSecs, top int, instance, instanceType string, fill *[]SummaryItem) error + SummaryBy(startSecs, endSecs, windowSecs, top int, instance, instanceType, by string, fill *[]SummaryByItem) error Instances(startSecs, endSecs int, fill *[]InstanceItem) error Close() } diff --git a/component/topsql/service/http.go b/component/topsql/service/http.go index 63d0bd3..4af1b95 100644 --- a/component/topsql/service/http.go +++ b/component/topsql/service/http.go @@ -14,7 +14,8 @@ import ( var ( recordsP = recordsPool{} - summaryP = summaryPool{} + summaryBySqlP = summarySQLPool{} + summaryByItemP = summaryByItemPool{} instanceItemsP = InstanceItemsPool{} metricNames = []string{ @@ -79,7 +80,7 @@ func (s *Service) metricHandler(name string) gin.HandlerFunc { } func (s *Service) summaryHandler(c *gin.Context) { - start, end, windowSecs, top, instance, instanceType, err := parseAllParams(c) + start, end, windowSecs, top, instance, instanceType, groupBy, err := parseAllParams(c) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "status": "error", @@ -87,26 +88,71 @@ func (s *Service) summaryHandler(c *gin.Context) { }) return } - - items := summaryP.Get() - defer summaryP.Put(items) - err = s.query.Summary(start, end, windowSecs, top, instance, instanceType, items) - if err != nil { - c.JSON(http.StatusServiceUnavailable, gin.H{ - "status": "error", - "message": err.Error(), + switch groupBy { + case query.AggLevelTable: + if instanceType == "tidb" { + c.JSON(http.StatusBadRequest, gin.H{ + "status": "error", + "message": "table summary is not supported for tidb", + }) + return + } + items := summaryByItemP.Get() + defer summaryByItemP.Put(items) + err = s.query.SummaryBy(start, end, windowSecs, top, instance, instanceType, query.AggLevelTable, items) + if err != nil { + c.JSON(http.StatusServiceUnavailable, gin.H{ + "status": "error", + "message": err.Error(), + }) + return + } + c.JSON(http.StatusOK, gin.H{ + "status": "ok", + "data_by": items, + }) + case query.AggLevelDB: + if instanceType == "tidb" { + c.JSON(http.StatusBadRequest, gin.H{ + "status": "error", + "message": "db summary is not supported for tidb", + }) + return + } + items := summaryByItemP.Get() + defer summaryByItemP.Put(items) + err = s.query.SummaryBy(start, end, windowSecs, top, instance, instanceType, query.AggLevelDB, items) + if err != nil { + c.JSON(http.StatusServiceUnavailable, gin.H{ + "status": "error", + "message": err.Error(), + }) + return + } + c.JSON(http.StatusOK, gin.H{ + "status": "ok", + "data_by": items, + }) + default: + items := summaryBySqlP.Get() + defer summaryBySqlP.Put(items) + err = s.query.Summary(start, end, windowSecs, top, instance, instanceType, items) + if err != nil { + c.JSON(http.StatusServiceUnavailable, gin.H{ + "status": "error", + "message": err.Error(), + }) + return + } + c.JSON(http.StatusOK, gin.H{ + "status": "ok", + "data": items, }) - return } - c.JSON(http.StatusOK, gin.H{ - "status": "ok", - "data": items, - }) } - func (s *Service) queryMetric(c *gin.Context, name string) { - start, end, windowSecs, top, instance, instanceType, err := parseAllParams(c) + start, end, windowSecs, top, instance, instanceType, _, err := parseAllParams(c) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "status": "error", @@ -132,7 +178,7 @@ func (s *Service) queryMetric(c *gin.Context, name string) { }) } -func parseAllParams(c *gin.Context) (start, end, windowSecs, top int, instance, instanceType string, err error) { +func parseAllParams(c *gin.Context) (start, end, windowSecs, top int, instance, instanceType string, groupBy string, err error) { instance = c.Query("instance") if len(instance) == 0 { err = errors.New("no instance") @@ -174,6 +220,7 @@ func parseAllParams(c *gin.Context) (start, end, windowSecs, top int, instance, } windowSecs = int(duration.Seconds()) + groupBy = c.Query("group_by") return } diff --git a/component/topsql/service/pools.go b/component/topsql/service/pools.go index c42e9d9..eb5418c 100644 --- a/component/topsql/service/pools.go +++ b/component/topsql/service/pools.go @@ -77,11 +77,11 @@ func (tip *recordsPool) Put(ti *[]query.RecordItem) { tip.p.Put(ti) } -type summaryPool struct { +type summarySQLPool struct { p sync.Pool } -func (sp *summaryPool) Get() *[]query.SummaryItem { +func (sp *summarySQLPool) Get() *[]query.SummaryItem { sv := sp.p.Get() if sv == nil { return &[]query.SummaryItem{} @@ -89,11 +89,28 @@ func (sp *summaryPool) Get() *[]query.SummaryItem { return sv.(*[]query.SummaryItem) } -func (sp *summaryPool) Put(s *[]query.SummaryItem) { +func (sp *summarySQLPool) Put(s *[]query.SummaryItem) { *s = (*s)[:0] sp.p.Put(s) } +type summaryByItemPool struct { + p sync.Pool +} + +func (tp *summaryByItemPool) Get() *[]query.SummaryByItem { + tv := tp.p.Get() + if tv == nil { + return &[]query.SummaryByItem{} + } + return tv.(*[]query.SummaryByItem) +} + +func (tp *summaryByItemPool) Put(t *[]query.SummaryByItem) { + *t = (*t)[:0] + tp.p.Put(t) +} + type InstanceItemsPool struct { p sync.Pool } diff --git a/component/topsql/topsql_test.go b/component/topsql/topsql_test.go index e8567f3..0607fde 100644 --- a/component/topsql/topsql_test.go +++ b/component/topsql/topsql_test.go @@ -5,6 +5,8 @@ import ( "io/ioutil" "os" "sort" + "strconv" + "strings" "testing" "time" @@ -486,6 +488,126 @@ func (s *testTopSQLSuite) TestTiDBSummary() { }}) } +func (s *testTopSQLSuite) TestTiKVSummaryBy() { + type tikvItem struct { + ts []uint64 + cpu []uint64 + reads []uint64 + } + type tikvTestPlan map[string]tikvItem + type tikvTestData map[string]map[string]tikvTestPlan // sql -> table -> data + + instance := "127.0.0.3:20180" + tikvInstanceType := "tikv" + + data := tikvTestData{ + "sql-0": { + "table-0": { + "index": { + ts: []uint64{testBaseTs + 0, testBaseTs + 10, testBaseTs + 20, testBaseTs + 30, testBaseTs + 40}, + cpu: []uint64{85, 64, 43, 19, 31}, + reads: []uint64{11, 69, 58, 21, 56}, + }, + }, + "table-1": { + "record": { + ts: []uint64{testBaseTs + 0, testBaseTs + 10, testBaseTs + 20, testBaseTs + 30, testBaseTs + 40}, + cpu: []uint64{67, 19, 54, 53, 71}, + reads: []uint64{97, 82, 24, 44, 88}, + }, + "index": { + ts: []uint64{testBaseTs + 0, testBaseTs + 10, testBaseTs + 20, testBaseTs + 30, testBaseTs + 40}, + cpu: []uint64{49, 93, 12, 43, 95}, + reads: []uint64{80, 44, 12, 10, 40}, + }, + }, + }, + "sql-1": { + "table-0": { + "index": { + ts: []uint64{testBaseTs + 0, testBaseTs + 10, testBaseTs + 20, testBaseTs + 30, testBaseTs + 40}, + cpu: []uint64{97, 46, 29, 22, 35}, + reads: []uint64{90, 59, 46, 80, 16}, + }, + }, + "table-1": { + "record": { + ts: []uint64{testBaseTs + 0, testBaseTs + 10, testBaseTs + 20, testBaseTs + 30, testBaseTs + 40}, + cpu: []uint64{51, 99, 14, 65, 27}, + reads: []uint64{22, 11, 77, 84, 33}, + }, + }, + "table-2": { + "record": { + ts: []uint64{testBaseTs + 0, testBaseTs + 10, testBaseTs + 20, testBaseTs + 30, testBaseTs + 40}, + cpu: []uint64{61, 64, 83, 99, 43}, + reads: []uint64{51, 93, 42, 27, 21}, + }, + }, + }, + "sql-2": { + "table-2": { + "record": { + ts: []uint64{testBaseTs + 0, testBaseTs + 10, testBaseTs + 20, testBaseTs + 30, testBaseTs + 40}, + cpu: []uint64{61, 87, 37, 55, 53}, + reads: []uint64{50, 46, 19, 63, 81}, + }, + }, + }, + } + for sql, ndata := range data { + for table, rawdata := range ndata { + for typ, item := range rawdata { + tag := tipb.ResourceGroupTag{ + SqlDigest: []byte(sql), + PlanDigest: []byte(sql), + } + if typ == "record" { + tag.Label = tipb.ResourceGroupTagLabel_ResourceGroupTagLabelRow.Enum() + } else if typ == "index" { + tag.Label = tipb.ResourceGroupTagLabel_ResourceGroupTagLabelIndex.Enum() + } + tid, err := strconv.Atoi(strings.Split(table, "-")[1]) + s.NoError(err) + tag.TableId = int64(tid) + var items []*rsmetering.GroupTagRecordItem + for i := range item.ts { + items = append(items, &rsmetering.GroupTagRecordItem{ + TimestampSec: item.ts[i], + CpuTimeMs: uint32(item.cpu[i]), + ReadKeys: uint32(item.reads[i]), + }) + } + + tagBytes, err := tag.Marshal() + s.NoError(err) + s.NoError(s.ds.ResourceMeteringRecord(instance, tikvInstanceType, &rsmetering.ResourceUsageRecord{ + RecordOneof: &rsmetering.ResourceUsageRecord_Record{Record: &rsmetering.GroupTagRecord{ + ResourceGroupTag: tagBytes, + Items: items}, + }}, nil)) + } + } + } + vmstorage.Storage.DebugFlush() + + // normal case + var res []query.SummaryByItem + err := s.dq.SummaryBy(int(testBaseTs), int(testBaseTs+40), 10, 5, instance, tikvInstanceType, query.AggLevelTable, &res) + s.NoError(err) + sortListSeq := []int{1, 2, 0} + resList := make([]int, 0, len(res)) + for _, item := range res { + tid, err := strconv.Atoi(item.Text) + s.NoError(err) + resList = append(resList, tid) + } + s.Equal(sortListSeq, resList) + res = res[:0] + err = s.dq.SummaryBy(int(testBaseTs), int(testBaseTs+40), 10, 5, instance, tikvInstanceType, query.AggLevelDB, &res) + s.NoError(err) + s.Assert().Len(res, 1) +} func (s *testTopSQLSuite) TestTiKVSummary() { type tikvPlanItem struct { ts []uint64