Skip to content

Commit

Permalink
[WIP] Fixed series matching
Browse files Browse the repository at this point in the history
  • Loading branch information
alpinskiy committed Oct 11, 2023
1 parent 1ef8bbb commit 8ec36fa
Show file tree
Hide file tree
Showing 7 changed files with 921 additions and 1,027 deletions.
99 changes: 48 additions & 51 deletions internal/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2122,75 +2122,72 @@ func (h *Handler) handlePromqlQuery(ctx context.Context, ai accessInfo, req seri
if err != nil {
return nil, nil, err
}
bag, ok := parserV.(*promql.SeriesBag)
ts, ok := parserV.(*promql.TimeSeries)
if !ok {
return nil, nil, fmt.Errorf("string literals are not supported")
}
res := &SeriesResponse{
Series: querySeries{
Time: bag.Time,
SeriesData: bag.Data,
SeriesMeta: make([]QuerySeriesMetaV2, 0, len(bag.Data)),
Time: ts.Time,
SeriesData: make([]*[]float64, 0, len(ts.Series.Data)),
SeriesMeta: make([]QuerySeriesMetaV2, 0, len(ts.Series.Data)),
},
MetricMeta: metricMeta,
DebugQueries: traces,
}
for i := range bag.Data {
for _, s := range ts.Series.Data {
meta := QuerySeriesMetaV2{
Name: metricName,
Tags: make(map[string]SeriesMetaTag),
MaxHosts: bag.GetSMaxHostsAt(i, h),
}
if i < len(bag.Meta) {
s := bag.Meta[i]
if promqlGenerated {
meta.What = queryFn(s.What)
} else {
meta.What = queryFn(bag.Query.What)
}
meta.TimeShift = -s.Offset
meta.Total = s.Total
if bag.Query.Metric != nil {
meta.MetricType = bag.Query.Metric.MetricType
}
if meta.Total == 0 {
meta.Total = len(bag.Data)
Name: metricName,
Tags: make(map[string]SeriesMetaTag, len(s.Tags.ID2Tag)),
MaxHosts: s.Tags.GetSMaxHosts(h),
TimeShift: -s.Offset,
Total: s.Total,
}
if promqlGenerated {
meta.What = queryFn(s.What)
} else {
meta.What = queryFn(ts.Series.Meta.What)
}
if ts.Series.Meta.Metric != nil {
meta.MetricType = ts.Series.Meta.Metric.MetricType
}
if meta.Total == 0 {
meta.Total = len(ts.Series.Data)
}
for id, tag := range s.Tags.ID2Tag {
if len(tag.SValue) == 0 || tag.ID == labels.MetricName {
continue
}
meta.Tags = make(map[string]SeriesMetaTag, len(s.Tags))
for id, t := range s.Tags {
if !t.SValueSet || t.ID == labels.MetricName {
continue
}
var (
k = id
v = SeriesMetaTag{Value: tag.SValue}
)
if ts.Series.Meta.Metric != nil && tag.Index != 0 {
var (
key = id
tag = SeriesMetaTag{Value: t.SValue}
name string
index = tag.Index - promql.SeriesTagIndexOffset
)
if bag.Query.Metric != nil && t.Index != 0 {
var (
name string
index = t.Index - promql.SeriesTagIndexOffset
)
if index == format.StringTopTagIndex {
key = format.LegacyStringTopTagID
name = format.StringTopTagID
} else {
key = format.TagIDLegacy(index)
name = format.TagID(index)
}
if meta, ok := bag.Query.Metric.Name2Tag[name]; ok {
tag.Comment = meta.ValueComments[tag.Value]
tag.Raw = meta.Raw
tag.RawKind = meta.RawKind
}
if index == format.StringTopTagIndex {
k = format.LegacyStringTopTagID
name = format.StringTopTagID
} else {
k = format.TagIDLegacy(index)
name = format.TagID(index)
}
if meta, ok := ts.Series.Meta.Metric.Name2Tag[name]; ok {
v.Comment = meta.ValueComments[v.Value]
v.Raw = meta.Raw
v.RawKind = meta.RawKind
}
meta.Tags[key] = tag
}
meta.Tags[k] = v
}
res.Series.SeriesMeta = append(res.Series.SeriesMeta, meta)
res.Series.SeriesData = append(res.Series.SeriesData, s.Values)
}
if len(bag.Time) != 0 {
res.ExcessPointLeft = bag.Time[0] < req.from.Unix()
res.ExcessPointRight = req.to.Unix() < bag.Time[len(bag.Time)-1]
if len(ts.Time) != 0 {
res.ExcessPointLeft = ts.Time[0] < req.from.Unix()
res.ExcessPointRight = req.to.Unix() < ts.Time[len(ts.Time)-1]
}
if res.Series.SeriesData == nil {
// frontend expects not "null" value
Expand Down
94 changes: 44 additions & 50 deletions internal/api/promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,29 +412,32 @@ func (h *Handler) GetTagValueID(qry promql.TagValueIDQuery) (int32, error) {
return res, err
}

func (h *Handler) QuerySeries(ctx context.Context, qry *promql.SeriesQuery) (promql.SeriesBag, func(), error) {
func (h *Handler) QuerySeries(ctx context.Context, qry *promql.SeriesQuery) (promql.Series, func(), error) {
ai := getAccessInfo(ctx)
if ai == nil {
panic("metric access violation") // should not happen
}
if !ai.canViewMetric(qry.Metric.Name) {
return promql.SeriesBag{}, func() {}, httpErr(http.StatusForbidden, fmt.Errorf("metric %q forbidden", qry.Metric.Name))
return promql.Series{}, func() {}, httpErr(http.StatusForbidden, fmt.Errorf("metric %q forbidden", qry.Metric.Name))
}
var (
version = promqlVersionOrDefault(qry.Options.Version)
what, qs, pq = getHandlerArgs(qry, ai)
lods = getHandlerLODs(qry, h.location)
shift = qry.Timescale.Offset - qry.Offset
timeLen = len(qry.Timescale.Time)
data, buffers []*[]float64
maxHost [][]int32
tagX = make(map[tsTags]int, len(qry.GroupBy))
cleanup = func() {
version = promqlVersionOrDefault(qry.Options.Version)
what, qs, pq = getHandlerArgs(qry, ai)
lods = getHandlerLODs(qry, h.location)
shift = qry.Timescale.Offset - qry.Offset
timeLen = len(qry.Timescale.Time)
buffers []*[]float64
tagX = make(map[tsTags]int, len(qry.GroupBy))
cleanup = func() {
for _, s := range buffers {
h.putFloatsSlice(s)
}
}
tx int // time index
tx int // time index
res = promql.Series{Meta: promql.SeriesMeta{
Metric: qry.Metric,
What: int(what),
}}
)
var step int64
if qry.Range != 0 {
Expand Down Expand Up @@ -465,7 +468,7 @@ func (h *Handler) QuerySeries(ctx context.Context, qry *promql.SeriesQuery) (pro
m, err := h.cache.Get(ctx, version, qs, &pq, li, qry.Options.AvoidCache)
if err != nil {
cleanup()
return promql.SeriesBag{}, nil, err
return promql.Series{}, nil, err
}
for _, col := range m {
for _, d := range col {
Expand All @@ -474,66 +477,57 @@ func (h *Handler) QuerySeries(ctx context.Context, qry *promql.SeriesQuery) (pro
j = tx + lod.lodInfo.getIndexForTimestamp(d.time, shift)
)
if !ok {
i = len(data)
i = len(res.Data)
tagX[d.tsTags] = i
s := h.getFloatsSlice(timeLen)
for k := range *s {
(*s)[k] = promql.NilValue
}
buffers = append(buffers, s)
data = append(data, s)
values := h.getFloatsSlice(timeLen)
buffers = append(buffers, values)
data := promql.SeriesData{Values: values}
if qry.MaxHost {
maxHost = append(maxHost, make([]int32, timeLen))
data.Tags.MaxHost = make([]int32, timeLen)
}
for k := range *data.Values {
(*data.Values)[k] = promql.NilValue
}
res.Data = append(res.Data, data)
}
(*data[i])[j] = selectTSValue(what, qry.MaxHost, qryRaw, step, &d)
(*res.Data[i].Values)[j] = selectTSValue(what, qry.MaxHost, qryRaw, step, &d)
if qry.MaxHost {
maxHost[i][j] = d.maxHost
res.Data[i].Tags.MaxHost[j] = d.maxHost
}
}
}
tx += lod.len
}
meta := make([]promql.SeriesMeta, len(tagX))
for t, i := range tagX {
for _, k := range qry.GroupBy {
switch k {
case format.StringTopTagID, qry.Metric.StringTopName:
meta[i].SetTag(promql.SeriesTag{
Index: format.StringTopTagIndex + promql.SeriesTagIndexOffset,
ID: format.StringTopTagID,
Name: qry.Metric.StringTopName,
SValue: emptyToUnspecified(t.tagStr.String()),
SValueSet: true,
res.AddTagAt(i, &promql.SeriesTag{
Metric: qry.Metric,
Index: format.StringTopTagIndex + promql.SeriesTagIndexOffset,
ID: format.StringTopTagID,
Name: qry.Metric.StringTopName,
SValue: emptyToUnspecified(t.tagStr.String()),
})
case format.ShardTagID:
meta[i].SetTag(promql.SeriesTag{
ID: promql.LabelShard,
Value: int32(t.shardNum),
ValueSet: true,
res.AddTagAt(i, &promql.SeriesTag{
Metric: qry.Metric,
ID: promql.LabelShard,
Value: int32(t.shardNum),
})
default:
if m, ok := qry.Metric.Name2Tag[k]; ok && m.Index < len(t.tag) {
meta[i].SetTag(promql.SeriesTag{
Index: m.Index + promql.SeriesTagIndexOffset,
ID: format.TagID(m.Index),
Name: m.Name,
Value: t.tag[m.Index],
ValueSet: true,
res.AddTagAt(i, &promql.SeriesTag{
Metric: qry.Metric,
Index: m.Index + promql.SeriesTagIndexOffset,
ID: format.TagID(m.Index),
Name: m.Name,
Value: t.tag[m.Index],
})
}
}
}
}
res := promql.SeriesBag{
Data: data,
Meta: meta,
MaxHost: maxHost,
Query: promql.SeriesQueryMeta{
Metric: qry.Metric,
What: int(what),
},
}
return res, cleanup, nil
}

Expand Down Expand Up @@ -602,7 +596,7 @@ func (h *Handler) QueryTagValueIDs(ctx context.Context, qry promql.TagValuesQuer
return res, nil
}

func (h *Handler) QuerySTagValues(ctx context.Context, qry promql.TagValuesQuery) ([]string, error) {
func (h *Handler) QueryStringTop(ctx context.Context, qry promql.TagValuesQuery) ([]string, error) {
ai := getAccessInfo(ctx)
if ai == nil {
panic("metric access violation") // should not happen
Expand Down
Loading

0 comments on commit 8ec36fa

Please sign in to comment.