Skip to content

Commit

Permalink
API refactroing
Browse files Browse the repository at this point in the history
Deduplicate "pointsQuery" fields
Deduplicate "preparedTagValuesQuery" fields
  • Loading branch information
alpinskiy committed Nov 27, 2024
1 parent cef328b commit 8b52554
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 100 deletions.
18 changes: 9 additions & 9 deletions internal/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1825,13 +1825,13 @@ func (h *requestHandler) handleGetMetricTagValues(ctx context.Context, req getMe
}

pq := &preparedTagValuesQuery{
metricID: metricMeta.MetricID,
preKeyTagX: format.TagIndex(metricMeta.PreKeyTagID),
preKeyTagID: metricMeta.PreKeyTagID,
tagID: tagID,
numResults: numResults,
filterIn: mappedFilterIn,
filterNotIn: mappedFilterNotIn,
queryFilter: queryFilter{
metric: metricMeta,
filterIn: mappedFilterIn,
filterNotIn: mappedFilterNotIn,
},
tagID: tagID,
numResults: numResults,
}

tagInfo := map[selectRow]float64{}
Expand Down Expand Up @@ -2962,7 +2962,7 @@ func loadPoints(ctx context.Context, h *requestHandler, pq *pointsQuery, lod dat
isFast := lod.IsFast()
isLight := cols.isLight()
IsHardware := cols.IsHardware()
metric := pq.metricID
metric := pq.metricID()
table := lod.Table
kind := pq.kind
start := time.Now()
Expand Down Expand Up @@ -3028,7 +3028,7 @@ func loadPoint(ctx context.Context, h *requestHandler, pq *pointsQuery, lod data
isFast := lod.IsFast()
isLight := cols.isLight()
isHardware := cols.IsHardware()
metric := pq.metricID
metric := pq.metricID()
table := lod.Table
kind := pq.kind
err = h.doSelect(ctx, util.QueryMetaInto{
Expand Down
26 changes: 11 additions & 15 deletions internal/api/promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,15 +459,15 @@ func (h *requestHandler) QuerySeries(ctx context.Context, qry *promql.SeriesQuer
var tx int // time index
for _, lod := range lods {
pq := pointsQuery{
version: h.version,
user: h.accessInfo.user,
metricID: qry.Metric.MetricID,
preKeyTagX: format.TagIndex(qry.Metric.PreKeyTagID),
preKeyTagID: qry.Metric.PreKeyTagID,
kind: kind,
by: qry.GroupBy,
filterIn: qry.FilterIn,
filterNotIn: qry.FilterNotIn,
queryFilter: queryFilter{
metric: qry.Metric,
filterIn: qry.FilterIn,
filterNotIn: qry.FilterNotIn,
},
version: h.version,
user: h.accessInfo.user,
kind: kind,
by: qry.GroupBy,
}
switch qry.Options.Mode {
case data_model.PointQuery:
Expand Down Expand Up @@ -651,9 +651,7 @@ func (h *requestHandler) QuerySeries(ctx context.Context, qry *promql.SeriesQuer
func (h *requestHandler) QueryTagValueIDs(ctx context.Context, qry promql.TagValuesQuery) ([]int32, error) {
var (
pq = &preparedTagValuesQuery{
metricID: qry.Metric.MetricID,
preKeyTagX: format.TagIndex(qry.Metric.PreKeyTagID),
preKeyTagID: qry.Metric.PreKeyTagID,
queryFilter: queryFilter{metric: qry.Metric},
tagID: format.TagID(qry.TagIndex),
numResults: math.MaxInt - 1,
}
Expand Down Expand Up @@ -692,9 +690,7 @@ func (h *requestHandler) QueryTagValueIDs(ctx context.Context, qry promql.TagVal
func (h *requestHandler) QueryStringTop(ctx context.Context, qry promql.TagValuesQuery) ([]string, error) {
var (
pq = &preparedTagValuesQuery{
metricID: qry.Metric.MetricID,
preKeyTagX: format.TagIndex(qry.Metric.PreKeyTagID),
preKeyTagID: qry.Metric.PreKeyTagID,
queryFilter: queryFilter{metric: qry.Metric},
tagID: format.StringTopTagID,
numResults: math.MaxInt - 1,
}
Expand Down
110 changes: 67 additions & 43 deletions internal/api/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,28 @@ import (
)

type preparedTagValuesQuery struct {
metricID int32
preKeyTagX int
preKeyTagID string
tagID string
numResults int
filterIn data_model.TagFilters
filterNotIn data_model.TagFilters
queryFilter
tagID string
numResults int
}

func (q *preparedTagValuesQuery) stringTag() bool {
return q.tagID == format.StringTopTagID
}

type pointsQuery struct {
version string
user string
metricID int32
preKeyTagX int
preKeyTagID string
isStringTop bool
kind data_model.DigestKind
by []string
type queryFilter struct {
metric *format.MetricMetaValue
filterIn data_model.TagFilters
filterNotIn data_model.TagFilters
sort pointsQuerySort // for table view requests
}

type pointsQuery struct {
queryFilter
version string
user string
kind data_model.DigestKind
by []string
sort pointsQuerySort // for table view requests
}

type pointsQuerySort int
Expand All @@ -63,11 +60,11 @@ func (pq *pointsQuery) cacheKey() string {
sb.WriteString(Version3)
}
sb.WriteString(";m=")
sb.WriteString(fmt.Sprint(pq.metricID))
sb.WriteString(fmt.Sprint(pq.metricID()))
sb.WriteString(";pk=")
sb.WriteString(fmt.Sprint(pq.preKeyTagX))
sb.WriteString(pq.preKeyTagID())
sb.WriteString(";st=")
sb.WriteString(fmt.Sprint(pq.isStringTop))
sb.WriteString(fmt.Sprint(pq.isStringTop()))
sb.WriteString(";kind=")
sb.WriteString(fmt.Sprint(int(pq.kind)))
sb.WriteString(";by=")
Expand Down Expand Up @@ -177,20 +174,20 @@ func (pq *preparedTagValuesQuery) tagValuesQueryV2(lod data_model.LOD) (string,
// no need to escape anything as long as table and tag names are fixed
var sb strings.Builder
sb.WriteString("SELECT ")
sb.WriteString(mappedColumnName(lod.HasPreKey, pq.tagID, pq.preKeyTagID))
sb.WriteString(mappedColumnName(lod.HasPreKey, pq.tagID, pq.preKeyTagID()))
sb.WriteString(" AS ")
sb.WriteString(valueName)
sb.WriteString(",toFloat64(")
sb.WriteString(sqlAggFn(lod.Version, "sum"))
sb.WriteString("(count)) AS _count FROM ")
sb.WriteString(pq.preKeyTableName(lod))
writeWhereTimeFilter(&sb, &lod)
writeMetricFilter(&sb, pq.metricID, pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeMetricFilter(&sb, pq.metricID(), pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeV1DateFilter(&sb, &lod)
for i, ids := range pq.filterIn.Tags {
if len(ids.Values) > 0 {
sb.WriteString(" AND ")
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID))
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID()))
sb.WriteString(" IN (")
expandTagsMapped(&sb, ids.Values)
sb.WriteString(")")
Expand All @@ -204,7 +201,7 @@ func (pq *preparedTagValuesQuery) tagValuesQueryV2(lod data_model.LOD) (string,
for i, ids := range pq.filterNotIn.Tags {
if len(ids.Values) > 0 {
sb.WriteString(" AND ")
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID))
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID()))
sb.WriteString(" NOT IN (")
expandTagsMapped(&sb, ids.Values)
sb.WriteString(")")
Expand All @@ -216,7 +213,7 @@ func (pq *preparedTagValuesQuery) tagValuesQueryV2(lod data_model.LOD) (string,
sb.WriteString(")")
}
sb.WriteString(" GROUP BY ")
sb.WriteString(mappedColumnName(lod.HasPreKey, pq.tagID, pq.preKeyTagID))
sb.WriteString(mappedColumnName(lod.HasPreKey, pq.tagID, pq.preKeyTagID()))
sb.WriteString(" HAVING _count>0 ORDER BY _count DESC,")
sb.WriteString(valueName)
sb.WriteString(" LIMIT ")
Expand All @@ -234,7 +231,7 @@ func (pq *preparedTagValuesQuery) tagValuesQueryV3(lod data_model.LOD) (string,
sb.WriteString(" AS _unmapped,toFloat64(sum(count)) AS _count FROM ")
sb.WriteString(pq.preKeyTableName(lod))
writeWhereTimeFilter(&sb, &lod)
writeMetricFilter(&sb, pq.metricID, pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeMetricFilter(&sb, pq.metricID(), pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeTagCond(&sb, pq.filterIn, true)
writeTagCond(&sb, pq.filterNotIn, false)
sb.WriteString(" GROUP BY _mapped,_unmapped HAVING _count>0 ORDER BY _count,_mapped,_unmapped DESC LIMIT ")
Expand All @@ -259,7 +256,7 @@ func (pq *preparedTagValuesQuery) tagValueIDsQueryV3(lod data_model.LOD) (string
sb.WriteString(" AS _mapped,toFloat64(sum(count)) AS _count FROM ")
sb.WriteString(pq.preKeyTableName(lod))
writeWhereTimeFilter(&sb, &lod)
writeMetricFilter(&sb, pq.metricID, pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeMetricFilter(&sb, pq.metricID(), pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeTagCond(&sb, pq.filterIn, true)
writeTagCond(&sb, pq.filterNotIn, false)
sb.WriteString(" GROUP BY _mapped HAVING _count>0 ORDER BY _count,_mapped DESC LIMIT ")
Expand Down Expand Up @@ -414,7 +411,7 @@ type pointsQueryMeta struct {

func loadPointsSelectWhat(sb *strings.Builder, pq *pointsQuery, version string) (int, error) {
var (
isStringTop = pq.isStringTop
isStringTop = pq.isStringTop()
kind = pq.kind
)
if version == Version1 && isStringTop {
Expand Down Expand Up @@ -492,7 +489,7 @@ func (pq *pointsQuery) loadPointsQueryV2(lod data_model.LOD, utcOffset int64, us
sb.WriteString(fmt.Sprintf("toInt64(%d) AS _stepSec", lod.StepSec))
}
for _, b := range pq.by {
sb.WriteString(fmt.Sprintf(",%s AS key%s", mappedColumnName(lod.HasPreKey, b, pq.preKeyTagID), b))
sb.WriteString(fmt.Sprintf(",%s AS key%s", mappedColumnName(lod.HasPreKey, b, pq.preKeyTagID()), b))
}
cnt, err := loadPointsSelectWhat(&sb, pq, lod.Version)
if err != nil {
Expand All @@ -501,12 +498,12 @@ func (pq *pointsQuery) loadPointsQueryV2(lod data_model.LOD, utcOffset int64, us
sb.WriteString(" FROM ")
sb.WriteString(pq.preKeyTableName(lod))
writeWhereTimeFilter(&sb, &lod)
writeMetricFilter(&sb, pq.metricID, pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeMetricFilter(&sb, pq.metricID(), pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeV1DateFilter(&sb, &lod)
for i, ids := range pq.filterIn.Tags {
if len(ids.Values) > 0 {
sb.WriteString(" AND ")
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID))
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID()))
sb.WriteString(" IN (")
expandTagsMapped(&sb, ids.Values)
sb.WriteString(")")
Expand All @@ -520,7 +517,7 @@ func (pq *pointsQuery) loadPointsQueryV2(lod data_model.LOD, utcOffset int64, us
for i, ids := range pq.filterNotIn.Tags {
if len(ids.Values) > 0 {
sb.WriteString(" AND ")
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID))
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID()))
sb.WriteString(" NOT IN (")
expandTagsMapped(&sb, ids.Values)
sb.WriteString(")")
Expand All @@ -533,7 +530,7 @@ func (pq *pointsQuery) loadPointsQueryV2(lod data_model.LOD, utcOffset int64, us
}
sb.WriteString(" GROUP BY _time")
for _, b := range pq.by {
sb.WriteString(fmt.Sprintf(",%s AS key%s", mappedColumnName(lod.HasPreKey, b, pq.preKeyTagID), b))
sb.WriteString(fmt.Sprintf(",%s AS key%s", mappedColumnName(lod.HasPreKey, b, pq.preKeyTagID()), b))
}
var having bool
switch pq.kind {
Expand All @@ -554,7 +551,7 @@ func (pq *pointsQuery) loadPointsQueryV2(lod data_model.LOD, utcOffset int64, us
}
sb.WriteString(" ORDER BY _time")
for _, b := range pq.by {
sb.WriteString(fmt.Sprintf(",%s AS key%s", mappedColumnName(lod.HasPreKey, b, pq.preKeyTagID), b))
sb.WriteString(fmt.Sprintf(",%s AS key%s", mappedColumnName(lod.HasPreKey, b, pq.preKeyTagID()), b))
}
if pq.sort == sortDescending {
sb.WriteString(" DESC")
Expand All @@ -563,7 +560,7 @@ func (pq *pointsQuery) loadPointsQueryV2(lod data_model.LOD, utcOffset int64, us
sb.WriteString(fmt.Sprintf(" LIMIT %v SETTINGS optimize_aggregation_in_order=1", limit))
cols := newPointsSelectColsV2(pointsQueryMeta{
queryMeta: queryMeta{
metricID: pq.metricID,
metricID: pq.metricID(),
kind: pq.kind,
user: pq.user,
},
Expand Down Expand Up @@ -599,7 +596,7 @@ func (pq *pointsQuery) loadPointsQueryV3(lod data_model.LOD, utcOffset int64, us
sb.WriteString(pq.preKeyTableName(lod))
writeWhereTimeFilter(&sb, &lod)
sb.WriteString(" AND index_type=0")
writeMetricFilter(&sb, pq.metricID, pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeMetricFilter(&sb, pq.metricID(), pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeTagCond(&sb, pq.filterIn, true)
writeTagCond(&sb, pq.filterNotIn, false)
sb.WriteString(" GROUP BY _time")
Expand Down Expand Up @@ -642,7 +639,7 @@ func (pq *pointsQuery) loadPointsQueryV3(lod data_model.LOD, utcOffset int64, us
sb.WriteString(" SETTINGS optimize_aggregation_in_order=1")
cols := newPointsSelectColsV3(pointsQueryMeta{
queryMeta: queryMeta{
metricID: pq.metricID,
metricID: pq.metricID(),
kind: pq.kind,
user: pq.user,
},
Expand All @@ -654,6 +651,31 @@ func (pq *pointsQuery) loadPointsQueryV3(lod data_model.LOD, utcOffset int64, us
return sb.String(), cols, err
}

func (pq *queryFilter) isStringTop() bool {
return pq.metric != nil && pq.metric.StringTopDescription != ""
}

func (pq *queryFilter) metricID() int32 {
if pq.metric == nil {
return 0
}
return pq.metric.MetricID
}

func (pq *queryFilter) preKeyTagID() string {
if pq.metric == nil {
return ""
}
return pq.metric.PreKeyTagID
}

func (pq *queryFilter) preKeyTagX() int {
if pq.metric == nil {
return -1
}
return format.TagIndex(pq.metric.PreKeyTagID)
}

func sqlAggFn(version string, fn string) string {
if version == Version1 {
return fn + "Merge"
Expand Down Expand Up @@ -755,12 +777,13 @@ func (s *stringFixed) String() string {
func (pq *pointsQuery) preKeyTableName(lod data_model.LOD) string {
var usePreKey bool
if lod.HasPreKey {
preKeyTagX := pq.preKeyTagX()
usePreKey = lod.PreKeyOnly ||
pq.filterIn.Contains(pq.preKeyTagX) ||
pq.filterNotIn.Contains(pq.preKeyTagX)
pq.filterIn.Contains(preKeyTagX) ||
pq.filterNotIn.Contains(preKeyTagX)
if !usePreKey {
for _, v := range pq.by {
if v == format.TagID(pq.preKeyTagX) {
if v == pq.preKeyTagID() {
usePreKey = true
break
}
Expand All @@ -774,11 +797,12 @@ func (pq *pointsQuery) preKeyTableName(lod data_model.LOD) string {
}

func (pq *preparedTagValuesQuery) preKeyTableName(lod data_model.LOD) string {
preKeyTagX := pq.preKeyTagX()
usePreKey := (lod.HasPreKey &&
(lod.PreKeyOnly ||
(pq.tagID != "" && pq.tagID == pq.preKeyTagID) ||
pq.filterIn.Contains(pq.preKeyTagX) ||
pq.filterNotIn.Contains(pq.preKeyTagX)))
(pq.tagID != "" && pq.tagID == pq.preKeyTagID()) ||
pq.filterIn.Contains(preKeyTagX) ||
pq.filterNotIn.Contains(preKeyTagX)))
if usePreKey {
return preKeyTableNames[lod.Table]
}
Expand Down
Loading

0 comments on commit 8b52554

Please sign in to comment.