Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into add_grpc_pooling
Browse files Browse the repository at this point in the history
Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Sep 4, 2024
2 parents 068b462 + 956fe47 commit bde2819
Show file tree
Hide file tree
Showing 20 changed files with 687 additions and 199 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7592](https://github.com/thanos-io/thanos/pull/7592) Ruler: Only increment `thanos_rule_evaluation_with_warnings_total` metric for non PromQL warnings.
- [#7614](https://github.com/thanos-io/thanos/pull/7614) *: fix debug log formatting.
- [#7492](https://github.com/thanos-io/thanos/pull/7492) Compactor: update filtered blocks list before second downsample pass.
- [#7643](https://github.com/thanos-io/thanos/pull/7643) Receive: fix thanos_receive_write_{timeseries,samples} stats
- [#7644](https://github.com/thanos-io/thanos/pull/7644) fix(ui): add null check to find overlapping blocks logic
- [#7679](https://github.com/thanos-io/thanos/pull/7679) Query: respect store.limit.* flags when evaluating queries

Expand All @@ -24,6 +25,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7609](https://github.com/thanos-io/thanos/pull/7609) API: Add limit param to metadata APIs (series, label names, label values).
- [#7429](https://github.com/thanos-io/thanos/pull/7429): Reloader: introduce `TolerateEnvVarExpansionErrors` to allow suppressing errors when expanding environment variables in the configuration file. When set, this will ensure that the reloader won't consider the operation to fail when an unset environment variable is encountered. Note that all unset environment variables are left as is, whereas all set environment variables are expanded as usual.
- [#7560](https://github.com/thanos-io/thanos/pull/7560) Query: Added the possibility of filtering rules by rule_name, rule_group or file to HTTP api.
- [#7652](https://github.com/thanos-io/thanos/pull/7652) Store: Implement metadata API limit in stores.

### Changed

Expand Down
2 changes: 1 addition & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ Please see the metric `thanos_receive_forward_delay_seconds` to see if you need

The following formula is used for calculating quorum:

```go mdox-exec="sed -n '990,999p' pkg/receive/handler.go"
```go mdox-exec="sed -n '999,1008p' pkg/receive/handler.go"
func (h *Handler) writeQuorum() int {
// NOTE(GiedriusS): this is here because otherwise RF=2 doesn't make sense as all writes
// would need to succeed all the time. Another way to think about it is when migrating
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/fatih/structtag v1.2.0
github.com/felixge/fgprof v0.9.4
github.com/felixge/fgprof v0.9.5
github.com/fortytw2/leaktest v1.3.0
github.com/fsnotify/fsnotify v1.7.0
github.com/go-kit/log v0.2.1
Expand Down Expand Up @@ -55,9 +55,9 @@ require (
github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/alertmanager v0.27.0
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_golang v1.20.2
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.54.1-0.20240615204547-04635d2962f9
github.com/prometheus/common v0.58.0
github.com/prometheus/exporter-toolkit v0.11.0
// Prometheus maps version 2.x.y to tags v0.x.y.
github.com/prometheus/prometheus v0.53.2-0.20240718123124-e9dec5fc537b
Expand All @@ -75,7 +75,7 @@ require (
go.opentelemetry.io/otel v1.29.0
go.opentelemetry.io/otel/bridge/opentracing v1.28.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.29.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.29.0
go.opentelemetry.io/otel/sdk v1.29.0
go.opentelemetry.io/otel/trace v1.29.0
Expand Down Expand Up @@ -109,7 +109,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.9.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
go.opentelemetry.io/contrib/samplers/jaegerremote v0.22.0
go.opentelemetry.io/contrib/samplers/jaegerremote v0.23.0
go.opentelemetry.io/otel/exporters/jaeger v1.16.0
)

Expand Down
21 changes: 13 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -756,8 +756,8 @@ github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/felixge/fgprof v0.9.4 h1:ocDNwMFlnA0NU0zSB3I52xkO4sFXk80VK9lXjLClu88=
github.com/felixge/fgprof v0.9.4/go.mod h1:yKl+ERSa++RYOs32d8K6WEXCB4uXdLls4ZaZPpayhMM=
github.com/felixge/fgprof v0.9.5 h1:8+vR6yu2vvSKn08urWyEuxx75NWPEvybbkBirEpsbVY=
github.com/felixge/fgprof v0.9.5/go.mod h1:yKl+ERSa++RYOs32d8K6WEXCB4uXdLls4ZaZPpayhMM=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
Expand Down Expand Up @@ -1263,8 +1263,8 @@ github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrb
github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ=
github.com/prometheus/client_golang v1.14.0/go.mod h1:8vpkKitgIVNcqrRBWh1C4TIUQgYNtG/XQE4E/Zae36Y=
github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_golang v1.20.2 h1:5ctymQzZlyOON1666svgwn3s6IKWgfbjsejTMiXIyjg=
github.com/prometheus/client_golang v1.20.2/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand All @@ -1280,8 +1280,8 @@ github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA=
github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
github.com/prometheus/common v0.54.1-0.20240615204547-04635d2962f9 h1:WTZ/GBRTImL1HgRTEnJJcM2FuII7PXX1idCIEUJ8/r8=
github.com/prometheus/common v0.54.1-0.20240615204547-04635d2962f9/go.mod h1:1Yn/UzXoahbVLk1sn6wsGiSiemz3XQejcaz9FIA1r+I=
github.com/prometheus/common v0.58.0 h1:N+N8vY4/23r6iYfD3UQZUoJPnUYAo7v6LG5XZxjZTXo=
github.com/prometheus/common v0.58.0/go.mod h1:GpWM7dewqmVYcd7SmRaiWVe9SSqjf0UrwnYnpEZNuT0=
github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4=
github.com/prometheus/common/sigv4 v0.1.0/go.mod h1:2Jkxxk9yYvCkE5G1sQT7GuEXm57JrvHu9k5YwTjsNtI=
github.com/prometheus/exporter-toolkit v0.8.2/go.mod h1:00shzmJL7KxcsabLWcONwpyNEuWhREOnFqZW7vadFS0=
Expand Down Expand Up @@ -1446,6 +1446,11 @@ go.opentelemetry.io/contrib/propagators/ot v1.28.0 h1:rmlG+2pc5k5M7Y7izDrxAHZUIw
go.opentelemetry.io/contrib/propagators/ot v1.28.0/go.mod h1:MNgXIn+UrMbNGpd7xyckyo2LCHIgCdmdjEE7YNZGG+w=
go.opentelemetry.io/contrib/samplers/jaegerremote v0.22.0 h1:OYxqumWcd1yaV/qvCt1B7Sru9OeUNGjeXq/oldx3AGk=
go.opentelemetry.io/contrib/samplers/jaegerremote v0.22.0/go.mod h1:2tZTRqCbvx7nG57wUwd5NQpNVujOWnR84iPLllIH0Ok=
go.opentelemetry.io/contrib/samplers/jaegerremote v0.23.0 h1:qKi9ntCcronqWqfuKxqrxZlZd82jXJEgGiAWH1+phxo=
go.opentelemetry.io/contrib/samplers/jaegerremote v0.23.0/go.mod h1:1kbAgQa5lgYC3rC6cE3jSxQ/Q13l33wv/WI8U+htwag=
go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY=
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel v1.22.0/go.mod h1:eoV4iAi3Ea8LkAEI9+GFT44O6T/D0GWAVFyZVCC6pMI=
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=
go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8=
go.opentelemetry.io/otel/bridge/opentracing v1.28.0 h1:erHvOxIUFnSXj/HuS5SqaKe2CbWSBskONXm2bEBxYgc=
Expand All @@ -1454,8 +1459,8 @@ go.opentelemetry.io/otel/exporters/jaeger v1.16.0 h1:YhxxmXZ011C0aDZKoNw+juVWAmE
go.opentelemetry.io/otel/exporters/jaeger v1.16.0/go.mod h1:grYbBo/5afWlPpdPZYhyn78Bk04hnvxn2+hvxQhKIQM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.29.0 h1:dIIDULZJpgdiHz5tXrTgKIMLkus6jEFa7x5SOKcyR7E=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.29.0/go.mod h1:jlRVBe7+Z1wyxFSUs48L6OBQZ5JwH2Hg/Vbl+t9rAgI=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 h1:R3X6ZXmNPRR8ul6i3WgFURCHzaXjHdm0karRG/+dj3s=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0/go.mod h1:QWFXnDavXWwMx2EEcZsf3yxgEKAqsxQ+Syjp+seyInw=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0 h1:nSiV3s7wiCam610XcLbYOmMfJxB9gO4uK3Xgv5gmTgg=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0/go.mod h1:hKn/e/Nmd19/x1gvIHwtOwVWM+VhuITSWip3JUDghj0=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.29.0 h1:JAv0Jwtl01UFiyWZEMiJZBiTlv5A50zNs8lsthXqIio=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.29.0/go.mod h1:QNKLmUEAq2QUbPQUfvw4fmv0bgbK7UlOSFCnXyfvSNc=
go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc=
Expand Down
9 changes: 6 additions & 3 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,14 +734,15 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string

// SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]map[string]string, error) {
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]map[string]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/series")
q := u.Query()

q.Add("match[]", storepb.PromMatchersToString(matchers...))
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand All @@ -753,7 +754,7 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*la

// LabelNamesInGRPC returns all known label names constrained by the given matchers. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/labels")
q := u.Query()
Expand All @@ -763,6 +764,7 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand All @@ -773,7 +775,7 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [

// LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values")
q := u.Query()
Expand All @@ -783,6 +785,7 @@ func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label str
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand Down
16 changes: 14 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
req := storepb.SeriesRequest{
MinTime: hints.Start,
MaxTime: hints.End,
Limit: int64(hints.Limit),
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: aggrs,
Expand Down Expand Up @@ -373,7 +374,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
}

// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
func (q *querier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
span, ctx := tracing.StartSpan(ctx, "querier_label_values")
defer span.Finish()

Expand All @@ -384,12 +385,18 @@ func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.Label
if err != nil {
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

if hints == nil {
hints = &storage.LabelHints{}
}

req := &storepb.LabelValuesRequest{
Label: name,
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
Limit: int64(hints.Limit),
}

if q.isDedupEnabled() {
Expand All @@ -411,7 +418,7 @@ func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.Label

// LabelNames returns all the unique label names present in the block in sorted order constrained
// by the given matchers.
func (q *querier) LabelNames(ctx context.Context, _ *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
func (q *querier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
span, ctx := tracing.StartSpan(ctx, "querier_label_names")
defer span.Finish()

Expand All @@ -423,11 +430,16 @@ func (q *querier) LabelNames(ctx context.Context, _ *storage.LabelHints, matcher
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

if hints == nil {
hints = &storage.LabelHints{}
}

req := &storepb.LabelNamesRequest{
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
Limit: int64(hints.Limit),
}

if q.isDedupEnabled() {
Expand Down
41 changes: 25 additions & 16 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,31 +678,40 @@ type remoteWriteParams struct {
alreadyReplicated bool
}

func (h *Handler) gatherWriteStats(localWrites map[endpointReplica]map[string]trackedSeries) tenantRequestStats {
func (h *Handler) gatherWriteStats(rf int, writes ...map[endpointReplica]map[string]trackedSeries) tenantRequestStats {
var stats tenantRequestStats = make(tenantRequestStats)

for er := range localWrites {
for tenant, series := range localWrites[er] {
samples := 0
for _, write := range writes {
for er := range write {
for tenant, series := range write[er] {
samples := 0

for _, ts := range series.timeSeries {
samples += len(ts.Samples)
}
for _, ts := range series.timeSeries {
samples += len(ts.Samples)
}

if st, ok := stats[tenant]; ok {
st.timeseries += len(series.timeSeries)
st.totalSamples += samples
if st, ok := stats[tenant]; ok {
st.timeseries += len(series.timeSeries)
st.totalSamples += samples

stats[tenant] = st
} else {
stats[tenant] = requestStats{
timeseries: len(series.timeSeries),
totalSamples: samples,
stats[tenant] = st
} else {
stats[tenant] = requestStats{
timeseries: len(series.timeSeries),
totalSamples: samples,
}
}
}
}
}

// adjust counters by the replication factor
for tenant, st := range stats {
st.timeseries /= rf
st.totalSamples /= rf
stats[tenant] = st
}

return stats
}

Expand Down Expand Up @@ -733,7 +742,7 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) (
return stats, err
}

stats = h.gatherWriteStats(localWrites)
stats = h.gatherWriteStats(len(params.replicas), localWrites, remoteWrites)

// Prepare a buffered channel to receive the responses from the local and remote writes. Remote writes will all go
// asynchronously and with this capacity we will never block on writing to the channel.
Expand Down
6 changes: 5 additions & 1 deletion pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,10 +462,14 @@ func TestMultiTSDBPrune(t *testing.T) {
testutil.Equals(t, 3, len(m.TSDBLocalClients()))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

g := sync.WaitGroup{}
defer func() { cancel(); g.Wait() }()

if test.bucket != nil {
g.Add(1)
go func() {
defer g.Done()
testutil.Ok(t, syncTSDBs(ctx, m, 10*time.Millisecond))
}()
}
Expand Down
Loading

0 comments on commit bde2819

Please sign in to comment.