Skip to content

Commit

Permalink
Merge branch 'main' into poyzannur/remove-minTableAgeCheck
Browse files Browse the repository at this point in the history
  • Loading branch information
poyzannur authored Jan 4, 2024
2 parents 9c5553b + ce57448 commit c4ac05c
Show file tree
Hide file tree
Showing 49 changed files with 1,913 additions and 319 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: Checks
on: [push]
jobs:
checks:
runs-on: ubuntu-latest
env:
BUILD_IN_CONTAINER: false
container:
image: grafana/loki-build-image:0.32.0
steps:
- uses: actions/checkout@v4
- run: git config --global --add safe.directory "$GITHUB_WORKSPACE"
- run: make lint
- run: make check-doc
- run: make check-mod
- run: make validate-example-configs
- run: make check-example-config-doc
- run: make check-drone-drift
- run: make check-generated-files
- run: make test
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [10956](https://github.com/grafana/loki/pull/10956) **jeschkies** do not wrap requests but send pure Protobuf from frontend v2 via scheduler to querier when `-frontend.encoding=protobuf`.
* [10417](https://github.com/grafana/loki/pull/10417) **jeschkies** shard `quantile_over_time` range queries using probabilistic data structures.
* [11284](https://github.com/grafana/loki/pull/11284) **ashwanthgoli** Config: Adds `frontend.max-query-capacity` to tune per-tenant query capacity.
* [11539](https://github.com/grafana/loki/pull/11539) **kaviraj,ashwanthgoli** Support caching /series and /labels query results
* [11545](https://github.com/grafana/loki/pull/11545) **dannykopping** Force correct memcached timeout when fetching chunks.

##### Fixes
Expand Down
87 changes: 87 additions & 0 deletions cmd/loki/loki-local-with-memcached.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
auth_enabled: false

server:
http_listen_port: 3100
grpc_listen_port: 9096

common:
instance_addr: 127.0.0.1
path_prefix: /tmp/loki
storage:
filesystem:
chunks_directory: /tmp/loki/chunks
rules_directory: /tmp/loki/rules
replication_factor: 1
ring:
kvstore:
store: inmemory

query_range:
align_queries_with_step: true
cache_index_stats_results: true
cache_results: true
cache_volume_results: true
cache_series_results: true
series_results_cache:
cache:
default_validity: 12h
memcached_client:
consistent_hash: true
addresses: "dns+localhost:11211"
max_idle_conns: 16
timeout: 500ms
update_interval: 1m
index_stats_results_cache:
cache:
default_validity: 12h
memcached_client:
consistent_hash: true
addresses: "dns+localhost:11211"
max_idle_conns: 16
timeout: 500ms
update_interval: 1m
max_retries: 5
results_cache:
cache:
default_validity: 12h
memcached_client:
consistent_hash: true
addresses: "dns+localhost:11211"
max_idle_conns: 16
timeout: 500ms
update_interval: 1m
volume_results_cache:
cache:
default_validity: 12h
memcached_client:
consistent_hash: true
addresses: "dns+localhost:11211"
max_idle_conns: 16
timeout: 500ms
update_interval: 1m

schema_config:
configs:
- from: 2020-10-24
store: tsdb
object_store: filesystem
schema: v12
index:
prefix: index_
period: 24h

ruler:
alertmanager_url: http://localhost:9093

# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
#
# Statistics help us better understand how Loki is used, and they show us performance
# levels for most users. This helps us prioritize features and documentation.
# For more information on what's sent, look at
# https://github.com/grafana/loki/blob/main/pkg/analytics/stats.go
# Refer to the buildReport method to see what goes into a report.
#
# If you would like to disable reporting, uncomment the following lines:
#analytics:
# reporting_enabled: false
42 changes: 42 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,40 @@ volume_results_cache:
# compression. Supported values are: 'snappy' and ''.
# CLI flag: -frontend.volume-results-cache.compression
[compression: <string> | default = ""]
# Cache series query results.
# CLI flag: -querier.cache-series-results
[cache_series_results: <boolean> | default = false]
# If series_results_cache is not configured and cache_series_results is true,
# the config for the results cache is used.
series_results_cache:
# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is:
# frontend.series-results-cache
[cache: <cache_config>]
# Use compression in cache. The default is an empty value '', which disables
# compression. Supported values are: 'snappy' and ''.
# CLI flag: -frontend.series-results-cache.compression
[compression: <string> | default = ""]
# Cache label query results.
# CLI flag: -querier.cache-label-results
[cache_label_results: <boolean> | default = false]
# If label_results_cache is not configured and cache_label_results is true, the
# config for the results cache is used.
label_results_cache:
# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is:
# frontend.label-results-cache
[cache: <cache_config>]
# Use compression in cache. The default is an empty value '', which disables
# compression. Supported values are: 'snappy' and ''.
# CLI flag: -frontend.label-results-cache.compression
[compression: <string> | default = ""]
```

### ruler
Expand Down Expand Up @@ -2844,6 +2878,12 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 1h]

# Split metadata queries by a time interval and execute in parallel. The value 0
# disables splitting metadata queries by time. This also determines how cache
# keys are chosen when label/series result caching is enabled.
# CLI flag: -querier.split-metadata-queries-by-interval
[split_metadata_queries_by_interval: <duration> | default = 1d]

# Limit queries that can be sharded. Queries within the time range of now and
# now minus this sharding lookback are not sharded. The default value of 0s
# disables the lookback, causing sharding of all queries at all times.
Expand Down Expand Up @@ -4276,6 +4316,8 @@ The cache block configures the cache backend. The supported CLI flags `<prefix>`
- `bloom-gateway-client.cache`
- `frontend`
- `frontend.index-stats-results-cache`
- `frontend.label-results-cache`
- `frontend.series-results-cache`
- `frontend.volume-results-cache`
- `store.chunks-cache`
- `store.index-cache-read`
Expand Down
4 changes: 2 additions & 2 deletions docs/sources/send-data/promtail/cloud/ecs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ Our [task definition][task] will be made of two containers, the [Firelens][Firel
Let's download the task definition, we'll go through the most important parts.

```bash
curl https://raw.githubusercontent.com/grafana/loki/main/docs/sources/clients/aws/ecs/ecs-task.json > ecs-task.json
curl https://raw.githubusercontent.com/grafana/loki/main/docs/sources/send-data/promtail/cloud/ecs/ecs-task.json > ecs-task.json
```

```json
{
"essential": true,
"image": "grafana/fluent-bit-plugin-loki:2.0.0-amd64",
"image": "grafana/fluent-bit-plugin-loki:2.9.3-amd64",
"name": "log_router",
"firelensConfiguration": {
"type": "fluentbit",
Expand Down
2 changes: 1 addition & 1 deletion docs/sources/send-data/promtail/cloud/ecs/ecs-task.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"containerDefinitions": [
{
"essential": true,
"image": "grafana/fluent-bit-plugin-loki:1.6.0-amd64",
"image": "grafana/fluent-bit-plugin-loki:2.9.3-amd64",
"name": "log_router",
"firelensConfiguration": {
"type": "fluentbit",
Expand Down
8 changes: 4 additions & 4 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,10 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}
metaSearchParams := bloomshipper.MetaSearchParams{
TenantID: job.tenantID,
MinFingerprint: uint64(job.minFp),
MaxFingerprint: uint64(job.maxFp),
StartTimestamp: int64(job.from),
EndTimestamp: int64(job.through),
MinFingerprint: job.minFp,
MaxFingerprint: job.maxFp,
StartTimestamp: job.from,
EndTimestamp: job.through,
}
var metas []bloomshipper.Meta
//TODO Configure pool for these to avoid allocations
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomcompactor/chunkcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ func buildBlockFromBlooms(
TableName: job.tableName,
MinFingerprint: uint64(job.minFp),
MaxFingerprint: uint64(job.maxFp),
StartTimestamp: int64(job.from),
EndTimestamp: int64(job.through),
StartTimestamp: job.from,
EndTimestamp: job.through,
Checksum: checksum,
},
IndexPath: job.indexPath,
Expand All @@ -148,7 +148,7 @@ func buildBlockFromBlooms(
}

func createLocalDirName(workingDir string, job Job) string {
dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%s-%s", job.tableName, job.tenantID, job.minFp, job.maxFp, job.from, job.through)
dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%d-%d", job.tableName, job.tenantID, job.minFp, job.maxFp, job.from, job.through)
return filepath.Join(workingDir, dir)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/chunkcompactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func TestChunkCompactor_CompactNewChunks(t *testing.T) {
require.Equal(t, job.tableName, compactedBlock.TableName)
require.Equal(t, uint64(fp1), compactedBlock.MinFingerprint)
require.Equal(t, uint64(fp2), compactedBlock.MaxFingerprint)
require.Equal(t, chunkRef1.MinTime, compactedBlock.StartTimestamp)
require.Equal(t, chunkRef2.MaxTime, compactedBlock.EndTimestamp)
require.Equal(t, model.Time(chunkRef1.MinTime), compactedBlock.StartTimestamp)
require.Equal(t, model.Time(chunkRef2.MaxTime), compactedBlock.EndTimestamp)
require.Equal(t, indexPath, compactedBlock.IndexPath)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/mergecompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func mergeCompactChunks(logger log.Logger,
TableName: job.tableName,
MinFingerprint: uint64(job.minFp),
MaxFingerprint: uint64(job.maxFp),
StartTimestamp: int64(job.from),
EndTimestamp: int64(job.through),
StartTimestamp: job.from,
EndTimestamp: job.through,
Checksum: checksum,
},
IndexPath: job.indexPath,
Expand Down
16 changes: 14 additions & 2 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,13 @@ func TestBloomGateway_StartStopService(t *testing.T) {
t.Cleanup(cm.Unregister)

p := config.PeriodConfig{
From: parseDayTime("2023-09-01"),
From: parseDayTime("2023-09-01"),
IndexTables: config.IndexPeriodicTableConfig{
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: 24 * time.Hour,
},
},
IndexType: config.TSDBType,
ObjectType: config.StorageTypeFileSystem,
Schema: "v13",
Expand Down Expand Up @@ -137,7 +143,13 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Cleanup(cm.Unregister)

p := config.PeriodConfig{
From: parseDayTime("2023-09-01"),
From: parseDayTime("2023-09-01"),
IndexTables: config.IndexPeriodicTableConfig{
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: 24 * time.Hour,
},
},
IndexType: config.TSDBType,
ObjectType: config.StorageTypeFileSystem,
Schema: "v13",
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// Nil check for performance reasons, to avoid dynamic lookup and/or no-op
// function calls that cannot be inlined.
if d.tee != nil {
d.tee.Duplicate(streams)
d.tee.Duplicate(tenantID, streams)
}

const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
Expand Down
6 changes: 5 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,12 +1251,14 @@ func (s *fakeRateStore) RateFor(_ string, _ uint64) (int64, float64) {
type mockTee struct {
mu sync.Mutex
duplicated [][]KeyedStream
tenant string
}

func (mt *mockTee) Duplicate(streams []KeyedStream) {
func (mt *mockTee) Duplicate(tenant string, streams []KeyedStream) {
mt.mu.Lock()
defer mt.mu.Unlock()
mt.duplicated = append(mt.duplicated, streams)
mt.tenant = tenant
}

func TestDistributorTee(t *testing.T) {
Expand Down Expand Up @@ -1307,5 +1309,7 @@ func TestDistributorTee(t *testing.T) {
for j, streams := range td.Streams {
assert.Equal(t, tee.duplicated[i][j].Stream.Entries, streams.Entries)
}

require.Equal(t, "test", tee.tenant)
}
}
4 changes: 2 additions & 2 deletions pkg/distributor/tee.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package distributor

// Tee imlpementations can duplicate the log streams to another endpoint.
// Tee implementations can duplicate the log streams to another endpoint.
type Tee interface {
Duplicate([]KeyedStream)
Duplicate(tenant string, streams []KeyedStream)
}
28 changes: 14 additions & 14 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ type DownstreamQuery struct {
Params Params
}

type Resp struct {
I int
Res logqlmodel.Result
Err error
}

// Downstreamer is an interface for deferring responsibility for query execution.
// It is decoupled from but consumed by a downStreamEvaluator to dispatch ASTs.
type Downstreamer interface {
Expand Down Expand Up @@ -375,24 +381,18 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(

results, err := ev.Downstream(ctx, queries)
if err != nil {
return nil, fmt.Errorf("error running quantile sketch downstream query: %w", err)
return nil, err
}

xs := make([]StepEvaluator, 0, len(queries))
for _, res := range results {
if res.Data.Type() != QuantileSketchMatrixType {
return nil, fmt.Errorf("unexpected matrix data type: got (%s), want (%s)", res.Data.Type(), QuantileSketchMatrixType)
}
data, ok := res.Data.(ProbabilisticQuantileMatrix)
if !ok {
return nil, fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", res.Data)
}
stepper := NewQuantileSketchMatrixStepEvaluator(data, params)
xs = append(xs, stepper)
if len(results) != 1 {
return nil, fmt.Errorf("unexpected results length for sharded quantile: got (%d), want (1)", len(results))
}

inner := NewQuantileSketchMergeStepEvaluator(xs)

matrix, ok := results[0].Data.(ProbabilisticQuantileMatrix)
if !ok {
return nil, fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", results[0].Data)
}
inner := NewQuantileSketchMatrixStepEvaluator(matrix, params)
return NewQuantileSketchVectorStepEvaluator(inner, *e.quantile), nil

default:
Expand Down
Loading

0 comments on commit c4ac05c

Please sign in to comment.