From d9a0af93fe1f50b68041728a0034a1515c134d5c Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 27 Jun 2024 13:48:39 +0200 Subject: [PATCH 1/6] Add -ingest-storage.kafka.target-consumer-lag-at-startup support Signed-off-by: Marco Pracucci --- cmd/mimir/config-descriptor.json | 12 +- cmd/mimir/help-all.txt.tmpl | 4 +- cmd/mimir/help.txt.tmpl | 4 +- .../configuration-parameters/index.md | 15 +- pkg/storage/ingest/config.go | 21 +- pkg/storage/ingest/config_test.go | 30 +++ pkg/storage/ingest/reader.go | 118 +++++++++-- pkg/storage/ingest/reader_test.go | 185 +++++++++++++++--- 8 files changed, 338 insertions(+), 51 deletions(-) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index d0d1d7bf05f..056999ebe27 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -6578,11 +6578,21 @@ "fieldFlag": "ingest-storage.kafka.consume-from-timestamp-at-startup", "fieldType": "int" }, + { + "kind": "field", + "name": "target_consumer_lag_at_startup", + "required": false, + "desc": "The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup.", + "fieldValue": null, + "fieldDefaultValue": 2000000000, + "fieldFlag": "ingest-storage.kafka.target-consumer-lag-at-startup", + "fieldType": "duration" + }, { "kind": "field", "name": "max_consumer_lag_at_startup", "required": false, - "desc": "The maximum tolerated lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set 0 to disable waiting for maximum consumer lag being honored at startup.", + "desc": "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup.", "fieldValue": null, "fieldDefaultValue": 15000000000, "fieldFlag": "ingest-storage.kafka.max-consumer-lag-at-startup", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 9a57205158d..36853739f8d 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1332,9 +1332,11 @@ Usage of ./cmd/mimir/mimir: -ingest-storage.kafka.last-produced-offset-retry-timeout duration How long to retry a failed request to get the last produced offset. (default 10s) -ingest-storage.kafka.max-consumer-lag-at-startup duration - The maximum tolerated lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) + The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) -ingest-storage.kafka.producer-max-record-size-bytes int The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616) + -ingest-storage.kafka.target-consumer-lag-at-startup duration + The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s) -ingest-storage.kafka.topic string The Kafka topic name. -ingest-storage.kafka.wait-strong-read-consistency-timeout duration diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index fea0cb61f09..c38af96352a 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -414,9 +414,11 @@ Usage of ./cmd/mimir/mimir: -ingest-storage.kafka.last-produced-offset-retry-timeout duration How long to retry a failed request to get the last produced offset. (default 10s) -ingest-storage.kafka.max-consumer-lag-at-startup duration - The maximum tolerated lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) + The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) -ingest-storage.kafka.producer-max-record-size-bytes int The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616) + -ingest-storage.kafka.target-consumer-lag-at-startup duration + The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s) -ingest-storage.kafka.topic string The Kafka topic name. -ingest-storage.kafka.wait-strong-read-consistency-timeout duration diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index fb7dc93f074..5f9c4efd3ee 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3763,10 +3763,19 @@ kafka: # CLI flag: -ingest-storage.kafka.consume-from-timestamp-at-startup [consume_from_timestamp_at_startup: | default = 0] - # The maximum tolerated lag before a consumer is considered to have caught up + # The best-effort maximum lag a consumer tries to achieve at startup. Set both + # -ingest-storage.kafka.target-consumer-lag-at-startup and + # -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting + # for maximum consumer lag being honored at startup. + # CLI flag: -ingest-storage.kafka.target-consumer-lag-at-startup + [target_consumer_lag_at_startup: | default = 2s] + + # The guaranteed maximum lag before a consumer is considered to have caught up # reading from a partition at startup, becomes ACTIVE in the hash ring and - # passes the readiness check. Set 0 to disable waiting for maximum consumer - # lag being honored at startup. + # passes the readiness check. Set both + # -ingest-storage.kafka.target-consumer-lag-at-startup and + # -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting + # for maximum consumer lag being honored at startup. # CLI flag: -ingest-storage.kafka.max-consumer-lag-at-startup [max_consumer_lag_at_startup: | default = 15s] diff --git a/pkg/storage/ingest/config.go b/pkg/storage/ingest/config.go index de5484aa50d..e96726cc3cd 100644 --- a/pkg/storage/ingest/config.go +++ b/pkg/storage/ingest/config.go @@ -17,6 +17,10 @@ const ( consumeFromStart = "start" consumeFromEnd = "end" consumeFromTimestamp = "timestamp" + + kafkaConfigFlagPrefix = "ingest-storage.kafka" + targetConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + ".target-consumer-lag-at-startup" + maxConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + ".max-consumer-lag-at-startup" ) var ( @@ -25,6 +29,8 @@ var ( ErrInvalidWriteClients = errors.New("the configured number of write clients is invalid (must be greater than 0)") ErrInvalidConsumePosition = errors.New("the configured consume position is invalid") ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit) + ErrInconsistentConsumerLagAtStartup = fmt.Errorf("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0") + ErrInvalidMaxConsumerLagAtStartup = fmt.Errorf("the configured max consumer lag at startup must greater or equal than the configured target consumer lag") consumeFromPositionOptions = []string{consumeFromLastOffset, consumeFromStart, consumeFromEnd, consumeFromTimestamp} ) @@ -38,7 +44,7 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, "ingest-storage.enabled", false, "True to enable the ingestion via object storage.") - cfg.KafkaConfig.RegisterFlagsWithPrefix("ingest-storage.kafka", f) + cfg.KafkaConfig.RegisterFlagsWithPrefix(kafkaConfigFlagPrefix, f) cfg.Migration.RegisterFlagsWithPrefix("ingest-storage.migration", f) } @@ -73,6 +79,7 @@ type KafkaConfig struct { ConsumeFromPositionAtStartup string `yaml:"consume_from_position_at_startup"` ConsumeFromTimestampAtStartup int64 `yaml:"consume_from_timestamp_at_startup"` + TargetConsumerLagAtStartup time.Duration `yaml:"target_consumer_lag_at_startup"` MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"` AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"` @@ -106,7 +113,11 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) f.StringVar(&cfg.ConsumeFromPositionAtStartup, prefix+".consume-from-position-at-startup", consumeFromLastOffset, fmt.Sprintf("From which position to start consuming the partition at startup. Supported options: %s.", strings.Join(consumeFromPositionOptions, ", "))) f.Int64Var(&cfg.ConsumeFromTimestampAtStartup, prefix+".consume-from-timestamp-at-startup", 0, fmt.Sprintf("Milliseconds timestamp after which the consumption of the partition starts at startup. Only applies when consume-from-position-at-startup is %s", consumeFromTimestamp)) - f.DurationVar(&cfg.MaxConsumerLagAtStartup, prefix+".max-consumer-lag-at-startup", 15*time.Second, "The maximum tolerated lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set 0 to disable waiting for maximum consumer lag being honored at startup.") + + howToDisableConsumerLagAtStartup := fmt.Sprintf("Set both -%s and -%s to 0 to disable waiting for maximum consumer lag being honored at startup.", targetConsumerLagAtStartupFlag, maxConsumerLagAtStartupFlag) + f.DurationVar(&cfg.TargetConsumerLagAtStartup, targetConsumerLagAtStartupFlag, 2*time.Second, "The best-effort maximum lag a consumer tries to achieve at startup. "+howToDisableConsumerLagAtStartup) + f.DurationVar(&cfg.MaxConsumerLagAtStartup, maxConsumerLagAtStartupFlag, 15*time.Second, "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. "+howToDisableConsumerLagAtStartup) + f.BoolVar(&cfg.AutoCreateTopicEnabled, prefix+".auto-create-topic-enabled", true, "Enable auto-creation of Kafka topic if it doesn't exist.") f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", 0, "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.") @@ -141,6 +152,12 @@ func (cfg *KafkaConfig) Validate() error { if cfg.ProducerMaxRecordSizeBytes < minProducerRecordDataBytesLimit || cfg.ProducerMaxRecordSizeBytes > maxProducerRecordDataBytesLimit { return ErrInvalidProducerMaxRecordSizeBytes } + if (cfg.TargetConsumerLagAtStartup != 0) != (cfg.MaxConsumerLagAtStartup != 0) { + return ErrInconsistentConsumerLagAtStartup + } + if cfg.MaxConsumerLagAtStartup < cfg.TargetConsumerLagAtStartup { + return ErrInvalidMaxConsumerLagAtStartup + } return nil } diff --git a/pkg/storage/ingest/config_test.go b/pkg/storage/ingest/config_test.go index 9e23fb7b0af..375179c37e3 100644 --- a/pkg/storage/ingest/config_test.go +++ b/pkg/storage/ingest/config_test.go @@ -95,6 +95,36 @@ func TestConfig_Validate(t *testing.T) { }, expectedErr: ErrInvalidProducerMaxRecordSizeBytes, }, + "should fail if target consumer lag is enabled but max consumer lag is not": { + setup: func(cfg *Config) { + cfg.Enabled = true + cfg.KafkaConfig.Address = "localhost" + cfg.KafkaConfig.Topic = "test" + cfg.KafkaConfig.TargetConsumerLagAtStartup = 2 * time.Second + cfg.KafkaConfig.MaxConsumerLagAtStartup = 0 + }, + expectedErr: ErrInconsistentConsumerLagAtStartup, + }, + "should fail if max consumer lag is enabled but target consumer lag is not": { + setup: func(cfg *Config) { + cfg.Enabled = true + cfg.KafkaConfig.Address = "localhost" + cfg.KafkaConfig.Topic = "test" + cfg.KafkaConfig.TargetConsumerLagAtStartup = 0 + cfg.KafkaConfig.MaxConsumerLagAtStartup = 2 * time.Second + }, + expectedErr: ErrInconsistentConsumerLagAtStartup, + }, + "should fail if target consumer lag is > max consumer lag": { + setup: func(cfg *Config) { + cfg.Enabled = true + cfg.KafkaConfig.Address = "localhost" + cfg.KafkaConfig.Topic = "test" + cfg.KafkaConfig.TargetConsumerLagAtStartup = 2 * time.Second + cfg.KafkaConfig.MaxConsumerLagAtStartup = 1 * time.Second + }, + expectedErr: ErrInvalidMaxConsumerLagAtStartup, + }, } for testName, testData := range tests { diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 6019bed27ef..678f13127c3 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -37,6 +37,7 @@ const ( var ( errWaitStrongReadConsistencyTimeoutExceeded = errors.Wrap(context.DeadlineExceeded, "wait strong read consistency timeout exceeded") + errWaitTargetLagDeadlineExceeded = errors.Wrap(context.DeadlineExceeded, "target lag deadline exceeded") ) type record struct { @@ -136,9 +137,9 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { } // Enforce the max consumer lag (if enabled). - if maxLag := r.kafkaCfg.MaxConsumerLagAtStartup; maxLag > 0 { + if targetLag, maxLag := r.kafkaCfg.TargetConsumerLagAtStartup, r.kafkaCfg.MaxConsumerLagAtStartup; targetLag > 0 && maxLag > 0 { if startOffset != kafkaOffsetEnd { - if err := r.processNextFetchesUntilMaxLagHonored(ctx, maxLag); err != nil { + if err := r.processNextFetchesUntilTargetOrMaxLagHonored(ctx, targetLag, maxLag); err != nil { return err } } else { @@ -191,20 +192,78 @@ func (r *PartitionReader) processNextFetches(ctx context.Context, delayObserver r.notifyLastConsumedOffset(fetches) } -func (r *PartitionReader) processNextFetchesUntilMaxLagHonored(ctx context.Context, maxLag time.Duration) error { - level.Info(r.logger).Log("msg", "partition reader is starting to consume partition until max consumer lag is honored", "max_lag", maxLag) +// processNextFetchesUntilTargetOrMaxLagHonored process records from Kafka until at least the maxLag is honored. +// This function does a best-effort to get lag below targetLag, but it's not guaranteed that it will be +// reached once this function successfully returns (only maxLag is guaranteed). +func (r *PartitionReader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Context, targetLag, maxLag time.Duration) error { + logger := log.With(r.logger, "target_lag", targetLag, "max_lag", maxLag) + level.Info(logger).Log("msg", "partition reader is starting to consume partition until target and max consumer lag is honored") + attempts := []func() (currLag time.Duration, _ error){ + // First process fetches until at least the max lag is honored. + func() (time.Duration, error) { + return r.processNextFetchesUntilLagHonored(ctx, maxLag, logger) + }, + + // If the target lag hasn't been reached with the first attempt (which stops once at least the max lag + // is honored) then we try to reach the (lower) target lag within a fixed time (best-effort). + // The timeout is equal to the max lag. This is done because we expect at least a 1x replay speed + // from Kafka (which means at most it takes 1s to ingest 1s of data). + func() (time.Duration, error) { + timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded) + defer cancel() + + return r.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger) + }, + + // If the target lag hasn't been reached with the previous attempt that we'll move on. However, + // we still need to guarantee that in the meanwhile the lag didn't increase and max lag is still honored. + func() (time.Duration, error) { + return r.processNextFetchesUntilLagHonored(ctx, maxLag, logger) + }, + } + + var currLag time.Duration + for _, attempt := range attempts { + var err error + + currLag, err = attempt() + if errors.Is(err, errWaitTargetLagDeadlineExceeded) { + continue + } + if err != nil { + return err + } + if currLag <= targetLag { + level.Info(logger).Log( + "msg", "partition reader consumed partition and current lag is lower than configured target consumer lag", + "last_consumed_offset", r.consumedOffsetWatcher.LastConsumedOffset(), + "current_lag", currLag, + ) + return nil + } + } + + level.Warn(logger).Log( + "msg", "partition reader consumed partition and current lag is lower than configured max consumer lag but higher than target consumer lag", + "last_consumed_offset", r.consumedOffsetWatcher.LastConsumedOffset(), + "current_lag", currLag, + ) + return nil +} + +func (r *PartitionReader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger) (currLag time.Duration, _ error) { boff := backoff.New(ctx, backoff.Config{ - MinBackoff: 250 * time.Millisecond, - MaxBackoff: 2 * time.Second, - MaxRetries: 0, // retry forever + MinBackoff: 100 * time.Millisecond, + MaxBackoff: time.Second, + MaxRetries: 0, // Retry forever (unless context is canceled / deadline exceeded). }) for boff.Ongoing() { // Send a direct request to the Kafka backend to fetch the partition start offset. partitionStartOffset, err := r.offsetReader.FetchPartitionStartOffset(ctx) if err != nil { - level.Warn(r.logger).Log("msg", "partition reader failed to fetch partition start offset", "err", err) + level.Warn(logger).Log("msg", "partition reader failed to fetch partition start offset", "err", err) boff.Wait() continue } @@ -212,25 +271,25 @@ func (r *PartitionReader) processNextFetchesUntilMaxLagHonored(ctx context.Conte // Send a direct request to the Kafka backend to fetch the last produced offset. // We intentionally don't use WaitNextFetchLastProducedOffset() to not introduce further // latency. + lastProducedOffsetRequestedAt := time.Now() lastProducedOffset, err := r.offsetReader.FetchLastProducedOffset(ctx) if err != nil { - level.Warn(r.logger).Log("msg", "partition reader failed to fetch last produced offset", "err", err) + level.Warn(logger).Log("msg", "partition reader failed to fetch last produced offset", "err", err) boff.Wait() continue } - lastProducedOffsetFetchedAt := time.Now() - - // Ensure there're some records to consume. For example, if the partition has been inactive for a long + // Ensure there are some records to consume. For example, if the partition has been inactive for a long // time and all its records have been deleted, the partition start offset may be > 0 but there are no // records to actually consume. if partitionStartOffset > lastProducedOffset { - level.Info(r.logger).Log("msg", "partition reader found no records to consume because partition is empty", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) - return nil + level.Info(logger).Log("msg", "partition reader found no records to consume because partition is empty", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) + return 0, nil } - // This message is NOT expected to be logged with a very high rate. - level.Info(r.logger).Log("msg", "partition reader is consuming records to honor max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) + // This message is NOT expected to be logged with a very high rate. In this log we display the last measured + // lag. If we don't have it (lag is zero value), then it will not be logged. + level.Info(loggerWithCurrentLag(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) for boff.Ongoing() { // Continue reading until we reached the desired offset. @@ -243,18 +302,27 @@ func (r *PartitionReader) processNextFetchesUntilMaxLagHonored(ctx context.Conte } if boff.Err() != nil { - return boff.Err() + // TODO should be moved to dskit's backoff + if ctx.Err() != nil { + return 0, context.Cause(ctx) + } + + return 0, boff.Err() } // If it took less than the max desired lag to replay the partition // then we can stop here, otherwise we'll have to redo it. - if currLag := time.Since(lastProducedOffsetFetchedAt); currLag <= maxLag { - level.Info(r.logger).Log("msg", "partition reader consumed partition and current lag is less than configured max consumer lag", "last_consumed_offset", r.consumedOffsetWatcher.LastConsumedOffset(), "current_lag", currLag, "max_lag", maxLag) - return nil + if currLag = time.Since(lastProducedOffsetRequestedAt); currLag <= maxLag { + return currLag, nil } } - return boff.Err() + // TODO should be moved to dskit's backoff + if ctx.Err() != nil { + return 0, context.Cause(ctx) + } + + return 0, boff.Err() } func filterOutErrFetches(fetches kgo.Fetches) kgo.Fetches { @@ -279,6 +347,14 @@ func isErrFetch(fetch kgo.Fetch) bool { return false } +func loggerWithCurrentLag(logger log.Logger, currLag time.Duration) log.Logger { + if currLag <= 0 { + return logger + } + + return log.With(logger, "current_lag", currLag) +} + func (r *PartitionReader) logFetchErrors(fetches kgo.Fetches) { mErr := multierror.New() fetches.EachError(func(topic string, partition int32, err error) { diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 6401f15c3da..183fc532425 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/services" "github.com/grafana/dskit/test" "github.com/prometheus/client_golang/prometheus" @@ -351,7 +352,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { ) // Create and start the reader. We expect the reader to start even if partition is empty. - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second), withRegistry(reg)) + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second), withRegistry(reg)) require.NoError(t, services.StartAndAwaitRunning(ctx, reader)) require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -363,7 +364,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { `), "cortex_ingest_storage_reader_last_consumed_offset")) }) - t.Run("should immediately switch to Running state if configured max lag is 0", func(t *testing.T) { + t.Run("should immediately switch to Running state if configured target / max lag is 0", func(t *testing.T) { t.Parallel() var ( @@ -386,7 +387,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { t.Log("produced 2 records") // Create and start the reader. We expect the reader to start even if Fetch is failing. - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(0), withRegistry(reg)) + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withTargetAndMaxConsumerLagAtStartup(0, 0), withRegistry(reg)) require.NoError(t, services.StartAndAwaitRunning(ctx, reader)) require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -398,7 +399,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { `), "cortex_ingest_storage_reader_last_consumed_offset")) }) - t.Run("should consume partition from start if last committed offset is missing and wait until max lag is honored", func(t *testing.T) { + t.Run("should consume partition from start if last committed offset is missing and wait until target lag is honored", func(t *testing.T) { t.Parallel() var ( @@ -432,7 +433,11 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { // Create and start the reader. reg := prometheus.NewPedanticRegistry() - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second), withRegistry(reg)) + logs := &concurrency.SyncBuffer{} + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs))) require.NoError(t, reader.StartAsync(ctx)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -456,6 +461,9 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { return reader.State() }) + // We expect the reader to have switched to running because target consumer lag has been honored. + assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag") + assert.Equal(t, int64(2), consumedRecordsCount.Load()) // We expect the last consumed offset to be tracked in a metric. @@ -468,7 +476,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { }) }) - t.Run("should consume partition from start if last committed offset is missing and wait until max lag is honored and retry if a failure occurs when fetching last produced offset", func(t *testing.T) { + t.Run("should consume partition from start if last committed offset is missing and wait until target lag is honored and retry if a failure occurs when fetching last produced offset", func(t *testing.T) { t.Parallel() var ( @@ -502,7 +510,11 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { // Create and start the reader. reg := prometheus.NewPedanticRegistry() - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second), withRegistry(reg)) + logs := &concurrency.SyncBuffer{} + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs))) require.NoError(t, reader.StartAsync(ctx)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -526,6 +538,9 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { return reader.State() }) + // We expect the reader to have switched to running because target consumer lag has been honored. + assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag") + assert.Equal(t, int64(2), consumedRecordsCount.Load()) // We expect the last consumed offset to be tracked in a metric. @@ -538,7 +553,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { }) }) - t.Run("should consume partition from end if position=end, and skip honoring max lag", func(t *testing.T) { + t.Run("should consume partition from end if position=end, and skip honoring target / max lag", func(t *testing.T) { t.Parallel() var ( @@ -578,7 +593,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { t.Log("produced 2 records before starting the reader") // Create and start the reader. - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withConsumeFromPositionAtStartup(consumeFromEnd), withMaxConsumerLagAtStartup(time.Second), withRegistry(reg)) + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withConsumeFromPositionAtStartup(consumeFromEnd), withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second), withRegistry(reg)) require.NoError(t, reader.StartAsync(ctx)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -620,7 +635,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { }) }) - t.Run("should consume partition from start if position=start, and wait until max lag is honored", func(t *testing.T) { + t.Run("should consume partition from start if position=start, and wait until target lag is honored", func(t *testing.T) { t.Parallel() var ( @@ -670,7 +685,12 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { // Create and start the reader. reg := prometheus.NewPedanticRegistry() - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withConsumeFromPositionAtStartup(consumeFromStart), withMaxConsumerLagAtStartup(time.Second), withRegistry(reg)) + logs := &concurrency.SyncBuffer{} + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, + withConsumeFromPositionAtStartup(consumeFromStart), + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs))) require.NoError(t, reader.StartAsync(ctx)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -693,6 +713,9 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { return reader.State() }) + // We expect the reader to have switched to running because target consumer lag has been honored. + assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag") + // We expect the reader to have consumed the partition from start. test.Poll(t, time.Second, []string{"record-1", "record-2"}, func() interface{} { consumedRecordsMx.Lock() @@ -712,7 +735,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { } }) - t.Run("should consume partition from the timestamp if position=timestamp, and wait until max lag is honored", func(t *testing.T) { + t.Run("should consume partition from the timestamp if position=timestamp, and wait until target lag is honored", func(t *testing.T) { t.Parallel() var ( @@ -764,7 +787,12 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { consumedRecordsMx.Unlock() reg := prometheus.NewPedanticRegistry() - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withConsumeFromTimestampAtStartup(consumeFromTs.UnixMilli()), withMaxConsumerLagAtStartup(time.Second), withRegistry(reg)) + logs := &concurrency.SyncBuffer{} + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, + withConsumeFromTimestampAtStartup(consumeFromTs.UnixMilli()), + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs))) require.NoError(t, reader.StartAsync(ctx)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -787,6 +815,9 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { return reader.State() }) + // We expect the reader to have switched to running because target consumer lag has been honored. + assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag") + // We expect the reader to have consumed the partition from the third record. test.Poll(t, time.Second, []string{"record-3", "record-4"}, func() interface{} { consumedRecordsMx.Lock() @@ -805,7 +836,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { }) }) - t.Run("should consume partition from last committed offset if position=last-offset, and wait until max lag is honored", func(t *testing.T) { + t.Run("should consume partition from last committed offset if position=last-offset, and wait until target lag is honored", func(t *testing.T) { t.Parallel() var ( @@ -854,7 +885,12 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { // Create and start the reader. reg := prometheus.NewPedanticRegistry() - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withConsumeFromPositionAtStartup(consumeFromLastOffset), withMaxConsumerLagAtStartup(time.Second), withRegistry(reg)) + logs := &concurrency.SyncBuffer{} + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, + withConsumeFromPositionAtStartup(consumeFromLastOffset), + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs))) require.NoError(t, reader.StartAsync(ctx)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -897,6 +933,99 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { } }) + t.Run("should consume partition from last committed offset if position=last-offset, and wait until max lag is honored if can't honor target lag", func(t *testing.T) { + t.Parallel() + + var ( + cluster, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName) + writeClient = newKafkaProduceClient(t, clusterAddr) + nextRecordID = atomic.NewInt32(0) + targetLag = 500 * time.Millisecond + maxLag = 2 * time.Second + ) + + // Wait until all goroutines used in this test have done. + testRoutines := sync.WaitGroup{} + t.Cleanup(testRoutines.Wait) + + // Create a channel to signal goroutines once the test has done. + testDone := make(chan struct{}) + t.Cleanup(func() { + close(testDone) + }) + + consumer := consumerFunc(func(_ context.Context, _ []record) error { + return nil + }) + + cluster.ControlKey(int16(kmsg.ListOffsets), func(kreq kmsg.Request) (kmsg.Response, error, bool) { + cluster.KeepControl() + + // Slow down each ListOffsets request to take longer than the target lag. + req := kreq.(*kmsg.ListOffsetsRequest) + if len(req.Topics) > 0 && len(req.Topics[0].Partitions) > 0 && req.Topics[0].Partitions[0].Timestamp == kafkaOffsetEnd { + cluster.SleepControl(func() { + testRoutines.Add(1) + defer testRoutines.Done() + + delay := time.Duration(float64(targetLag) * 1.1) + t.Logf("artificially slowing down OffsetFetch request by %s", delay.String()) + + select { + case <-testDone: + case <-time.After(delay): + } + }) + } + + return nil, nil, false + }) + + // Produce a record. + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte(fmt.Sprintf("record-%d", nextRecordID.Inc()))) + t.Log("produced 1 record") + + // Continue to produce records at a high pace, so that we simulate the case there are always new + // records to fetch. + testRoutines.Add(1) + go func() { + defer testRoutines.Done() + + for { + select { + case <-testDone: + return + + case <-time.After(targetLag / 2): + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte(fmt.Sprintf("record-%d", nextRecordID.Inc()))) + t.Log("produced 1 record") + } + } + }() + + // Create and start the reader. + reg := prometheus.NewPedanticRegistry() + logs := &concurrency.SyncBuffer{} + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, + withConsumeFromPositionAtStartup(consumeFromLastOffset), + withTargetAndMaxConsumerLagAtStartup(targetLag, maxLag), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs))) + require.NoError(t, reader.StartAsync(ctx)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) + }) + + // We expect the reader to catch up, and then switch to Running state. + test.Poll(t, maxLag*5, services.Running, func() interface{} { + return reader.State() + }) + + // We expect the reader to have switched to running because max consumer lag has been honored + // but target lag has not. + assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured max consumer lag but higher than target consumer lag") + }) + t.Run("should not wait indefinitely if context is cancelled while fetching last produced offset", func(t *testing.T) { t.Parallel() @@ -915,7 +1044,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { }) // Create and start the reader. - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second)) + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second)) readerCtx, cancelReaderCtx := context.WithCancel(ctx) require.NoError(t, reader.StartAsync(readerCtx)) @@ -958,7 +1087,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { t.Log("produced 2 records") // Create and start the reader. - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second)) + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second)) readerCtx, cancelReaderCtx := context.WithCancel(ctx) require.NoError(t, reader.StartAsync(readerCtx)) @@ -1033,7 +1162,8 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { reg := prometheus.NewPedanticRegistry() reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withConsumeFromPositionAtStartup(consumeFromPosition), - withMaxConsumerLagAtStartup(time.Second), + withConsumeFromTimestampAtStartup(time.Now().UnixMilli()), // For the test where position=timestamp. + withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second), withRegistry(reg)) require.NoError(t, services.StartAndAwaitRunning(ctx, reader)) @@ -1072,7 +1202,7 @@ func TestPartitionReader_fetchLastCommittedOffset(t *testing.T) { var ( cluster, clusterAddr = testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, partitionID+1, topicName) consumer = consumerFunc(func(context.Context, []record) error { return nil }) - reader = createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second)) + reader = createReader(t, clusterAddr, topicName, partitionID, consumer, withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second)) ) cluster.ControlKey(int16(kmsg.OffsetFetch), func(request kmsg.Request) (kmsg.Response, error, bool) { @@ -1103,7 +1233,7 @@ func TestPartitionReader_fetchLastCommittedOffset(t *testing.T) { var ( cluster, clusterAddr = testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, partitionID+1, topicName) consumer = consumerFunc(func(context.Context, []record) error { return nil }) - reader = createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second)) + reader = createReader(t, clusterAddr, topicName, partitionID, consumer, withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second)) ) cluster.ControlKey(int16(kmsg.OffsetFetch), func(request kmsg.Request) (kmsg.Response, error, bool) { @@ -1144,7 +1274,7 @@ func TestPartitionReader_fetchLastCommittedOffset(t *testing.T) { var ( cluster, clusterAddr = testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, partitionID+1, topicName) consumer = consumerFunc(func(context.Context, []record) error { return nil }) - reader = createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second)) + reader = createReader(t, clusterAddr, topicName, partitionID, consumer, withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second)) ) cluster.ControlKey(int16(kmsg.OffsetFetch), func(request kmsg.Request) (kmsg.Response, error, bool) { @@ -1402,8 +1532,9 @@ func withLastProducedOffsetPollInterval(i time.Duration) func(cfg *readerTestCfg } } -func withMaxConsumerLagAtStartup(maxLag time.Duration) func(cfg *readerTestCfg) { +func withTargetAndMaxConsumerLagAtStartup(targetLag, maxLag time.Duration) func(cfg *readerTestCfg) { return func(cfg *readerTestCfg) { + cfg.kafka.TargetConsumerLagAtStartup = targetLag cfg.kafka.MaxConsumerLagAtStartup = maxLag } } @@ -1433,6 +1564,12 @@ func withRegistry(reg *prometheus.Registry) func(cfg *readerTestCfg) { } } +func withLogger(logger log.Logger) func(cfg *readerTestCfg) { + return func(cfg *readerTestCfg) { + cfg.logger = logger + } +} + func defaultReaderTestConfig(t *testing.T, addr string, topicName string, partitionID int32, consumer recordConsumer) *readerTestCfg { return &readerTestCfg{ registry: prometheus.NewPedanticRegistry(), @@ -1448,6 +1585,10 @@ func createReader(t *testing.T, addr string, topicName string, partitionID int32 for _, o := range opts { o(cfg) } + + // Ensure the config is valid. + require.NoError(t, cfg.kafka.Validate()) + reader, err := newPartitionReader(cfg.kafka, cfg.partitionID, "test-group", cfg.consumer, cfg.logger, cfg.registry) require.NoError(t, err) From 887015dcac49abfb87627497c2b09518e461af41 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Tue, 2 Jul 2024 14:47:40 +0200 Subject: [PATCH 2/6] Updated doc Signed-off-by: Marco Pracucci --- CHANGELOG.md | 1 + docs/sources/mimir/manage/mimir-runbooks/_index.md | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b0f8ee6249f..074629dc58a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * [CHANGE] Store-gateway: enabled `-blocks-storage.bucket-store.max-concurrent-queue-timeout` by default with a timeout of 5 seconds. #8496 * [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 * [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 +* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. * New configuration options: diff --git a/docs/sources/mimir/manage/mimir-runbooks/_index.md b/docs/sources/mimir/manage/mimir-runbooks/_index.md index f14a1fd973e..3a89ead6ff9 100644 --- a/docs/sources/mimir/manage/mimir-runbooks/_index.md +++ b/docs/sources/mimir/manage/mimir-runbooks/_index.md @@ -1388,7 +1388,9 @@ This alert fires when "receive delay" reported by ingester during "starting" pha How it **works**: -- When ingester is starting, it needs to fetch and process records from Kafka until preconfigured consumption lag is honored. The maximum tolerated lag before an ingester is considered to have caught up reading from a partition at startup can be configured via `-ingest-storage.kafka.max-consumer-lag-at-startup`. +- When ingester is starting, it needs to fetch and process records from Kafka until preconfigured consumption lag is honored. The are two configuration options controlling the desired and tolerated lag before a ingester is considered to have caught up reading from a partition at startup: + - `-ingest-storage.kafka.max-consumer-lag-at-startup`: this is the guaranteed maximum lag before a ingester is considered to have caught up. The ingester doesn't become ACTIVE in the hash ring and doesn't passes the readiness check until the measured lag is below this setting. + - `-ingest-storage.kafka.target-consumer-lag-at-startup`: this is the desired maximum lag that a ingester should try to achieve at startup. This setting is a best-effort. The ingester is granted a "grace period" to have the measured lag below this setting, but the ingester starts anyway if the target lag hasn't been reached within the "grace period", as far as the max lag is honored. - Each record has a timestamp when it was sent to Kafka by the distributor. When ingester reads the record, it computes "receive delay" as a difference between current time (when record was read) and time when record was sent to Kafka. This receive delay is reported in the metric `cortex_ingest_storage_reader_receive_delay_seconds`. You can see receive delay on `Mimir / Writes` dashboard, in section "Ingester (ingest storage – end-to-end latency)". - Under normal conditions when ingester is processing records faster than records are appearing, receive delay should be decreasing, until `-ingest-storage.kafka.max-consumer-lag-at-startup` is honored. - When ingester is starting, and observed "receive delay" is increasing, alert is raised. From 583dbaaf02f8a5c373fcf12f7752fd639ff28016 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Tue, 2 Jul 2024 15:38:24 +0200 Subject: [PATCH 3/6] Renamed loggerWithCurrentLag() to loggerWithCurrentLagIfSet() Signed-off-by: Marco Pracucci --- pkg/storage/ingest/reader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 678f13127c3..86ca6164bde 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -289,7 +289,7 @@ func (r *PartitionReader) processNextFetchesUntilLagHonored(ctx context.Context, // This message is NOT expected to be logged with a very high rate. In this log we display the last measured // lag. If we don't have it (lag is zero value), then it will not be logged. - level.Info(loggerWithCurrentLag(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) + level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) for boff.Ongoing() { // Continue reading until we reached the desired offset. @@ -347,7 +347,7 @@ func isErrFetch(fetch kgo.Fetch) bool { return false } -func loggerWithCurrentLag(logger log.Logger, currLag time.Duration) log.Logger { +func loggerWithCurrentLagIfSet(logger log.Logger, currLag time.Duration) log.Logger { if currLag <= 0 { return logger } From 6de7564285fe36df77a8a0d7bd12bfd9214153fc Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Tue, 2 Jul 2024 15:48:52 +0200 Subject: [PATCH 4/6] Clarify comment and doc Signed-off-by: Marco Pracucci --- docs/sources/mimir/manage/mimir-runbooks/_index.md | 2 +- pkg/storage/ingest/reader.go | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/sources/mimir/manage/mimir-runbooks/_index.md b/docs/sources/mimir/manage/mimir-runbooks/_index.md index 3a89ead6ff9..c66dc52993f 100644 --- a/docs/sources/mimir/manage/mimir-runbooks/_index.md +++ b/docs/sources/mimir/manage/mimir-runbooks/_index.md @@ -1390,7 +1390,7 @@ How it **works**: - When ingester is starting, it needs to fetch and process records from Kafka until preconfigured consumption lag is honored. The are two configuration options controlling the desired and tolerated lag before a ingester is considered to have caught up reading from a partition at startup: - `-ingest-storage.kafka.max-consumer-lag-at-startup`: this is the guaranteed maximum lag before a ingester is considered to have caught up. The ingester doesn't become ACTIVE in the hash ring and doesn't passes the readiness check until the measured lag is below this setting. - - `-ingest-storage.kafka.target-consumer-lag-at-startup`: this is the desired maximum lag that a ingester should try to achieve at startup. This setting is a best-effort. The ingester is granted a "grace period" to have the measured lag below this setting, but the ingester starts anyway if the target lag hasn't been reached within the "grace period", as far as the max lag is honored. + - `-ingest-storage.kafka.target-consumer-lag-at-startup`: this is the desired maximum lag that a ingester should try to achieve at startup. This setting is a best-effort. The ingester is granted a "grace period" to have the measured lag below this setting, but the ingester starts anyway if the target lag hasn't been reached within the "grace period", as far as the max lag is honored. The "grace period" is equal to the configured `-ingest-storage.kafka.max-consumer-lag-at-startup`. - Each record has a timestamp when it was sent to Kafka by the distributor. When ingester reads the record, it computes "receive delay" as a difference between current time (when record was read) and time when record was sent to Kafka. This receive delay is reported in the metric `cortex_ingest_storage_reader_receive_delay_seconds`. You can see receive delay on `Mimir / Writes` dashboard, in section "Ingester (ingest storage – end-to-end latency)". - Under normal conditions when ingester is processing records faster than records are appearing, receive delay should be decreasing, until `-ingest-storage.kafka.max-consumer-lag-at-startup` is honored. - When ingester is starting, and observed "receive delay" is increasing, alert is raised. diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 86ca6164bde..e3e95d79924 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -207,8 +207,10 @@ func (r *PartitionReader) processNextFetchesUntilTargetOrMaxLagHonored(ctx conte // If the target lag hasn't been reached with the first attempt (which stops once at least the max lag // is honored) then we try to reach the (lower) target lag within a fixed time (best-effort). - // The timeout is equal to the max lag. This is done because we expect at least a 1x replay speed - // from Kafka (which means at most it takes 1s to ingest 1s of data). + // The timeout is equal to the max lag. This is done because we expect at least a 2x replay speed + // from Kafka (which means at most it takes 1s to ingest 2s of data): assuming new data is continuously + // written to the partition, we give the reader maxLag time to replay the backlog + ingest the new data + // written in the meanwhile. func() (time.Duration, error) { timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded) defer cancel() From 2a6a7af09dc6cd4369cb7da86923416d7803c297 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 3 Jul 2024 08:33:48 +0200 Subject: [PATCH 5/6] Fix CHANGELOG Signed-off-by: Marco Pracucci --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 074629dc58a..4b7385ea14e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,6 @@ * [CHANGE] Querier: return only samples within the queried start/end time range when executing a remote read request using "SAMPLES" mode. Previously, samples outside of the range could have been returned. Samples outside of the queried time range may still be returned when executing a remote read request using "STREAMED_XOR_CHUNKS" mode. #8463 * [CHANGE] Store-gateway: enabled `-blocks-storage.bucket-store.max-concurrent-queue-timeout` by default with a timeout of 5 seconds. #8496 * [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 -* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 * [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. From 64fdea7d4ffe7773da5129e35e39153b94cee809 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 3 Jul 2024 08:35:04 +0200 Subject: [PATCH 6/6] Apply suggestions from code review Co-authored-by: Taylor C <41653732+tacole02@users.noreply.github.com> --- docs/sources/mimir/manage/mimir-runbooks/_index.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/sources/mimir/manage/mimir-runbooks/_index.md b/docs/sources/mimir/manage/mimir-runbooks/_index.md index c66dc52993f..7372015f202 100644 --- a/docs/sources/mimir/manage/mimir-runbooks/_index.md +++ b/docs/sources/mimir/manage/mimir-runbooks/_index.md @@ -1388,9 +1388,9 @@ This alert fires when "receive delay" reported by ingester during "starting" pha How it **works**: -- When ingester is starting, it needs to fetch and process records from Kafka until preconfigured consumption lag is honored. The are two configuration options controlling the desired and tolerated lag before a ingester is considered to have caught up reading from a partition at startup: - - `-ingest-storage.kafka.max-consumer-lag-at-startup`: this is the guaranteed maximum lag before a ingester is considered to have caught up. The ingester doesn't become ACTIVE in the hash ring and doesn't passes the readiness check until the measured lag is below this setting. - - `-ingest-storage.kafka.target-consumer-lag-at-startup`: this is the desired maximum lag that a ingester should try to achieve at startup. This setting is a best-effort. The ingester is granted a "grace period" to have the measured lag below this setting, but the ingester starts anyway if the target lag hasn't been reached within the "grace period", as far as the max lag is honored. The "grace period" is equal to the configured `-ingest-storage.kafka.max-consumer-lag-at-startup`. +- When an ingester starts, it needs to fetch and process records from Kafka until a preconfigured consumption lag is honored. There are two configuration options that control the lag before an ingester is considered to have caught up reading from a partition at startup: + - `-ingest-storage.kafka.max-consumer-lag-at-startup`: this is the guaranteed maximum lag before an ingester is considered to have caught up. The ingester doesn't become ACTIVE in the hash ring and doesn't pass the readiness check until the measured lag is below this setting. + - `-ingest-storage.kafka.target-consumer-lag-at-startup`: this is the desired maximum lag that an ingester sets to achieve at startup. This setting is a best-effort. The ingester is granted a "grace period" to have the measured lag below this setting. However, the ingester still starts if the target lag hasn't been reached within this "grace period", as long as the max lag is honored. The "grace period" is equal to the configured `-ingest-storage.kafka.max-consumer-lag-at-startup`. - Each record has a timestamp when it was sent to Kafka by the distributor. When ingester reads the record, it computes "receive delay" as a difference between current time (when record was read) and time when record was sent to Kafka. This receive delay is reported in the metric `cortex_ingest_storage_reader_receive_delay_seconds`. You can see receive delay on `Mimir / Writes` dashboard, in section "Ingester (ingest storage – end-to-end latency)". - Under normal conditions when ingester is processing records faster than records are appearing, receive delay should be decreasing, until `-ingest-storage.kafka.max-consumer-lag-at-startup` is honored. - When ingester is starting, and observed "receive delay" is increasing, alert is raised.