Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add -ingest-storage.kafka.target-consumer-lag-at-startup support #8579

Merged
merged 6 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* [CHANGE] Querier: return only samples within the queried start/end time range when executing a remote read request using "SAMPLES" mode. Previously, samples outside of the range could have been returned. Samples outside of the queried time range may still be returned when executing a remote read request using "STREAMED_XOR_CHUNKS" mode. #8463
* [CHANGE] Store-gateway: enabled `-blocks-storage.bucket-store.max-concurrent-queue-timeout` by default with a timeout of 5 seconds. #8496
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
* New configuration options:
Expand Down
12 changes: 11 additions & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6578,11 +6578,21 @@
"fieldFlag": "ingest-storage.kafka.consume-from-timestamp-at-startup",
"fieldType": "int"
},
{
"kind": "field",
"name": "target_consumer_lag_at_startup",
"required": false,
"desc": "The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup.",
"fieldValue": null,
"fieldDefaultValue": 2000000000,
"fieldFlag": "ingest-storage.kafka.target-consumer-lag-at-startup",
"fieldType": "duration"
},
{
"kind": "field",
"name": "max_consumer_lag_at_startup",
"required": false,
"desc": "The maximum tolerated lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set 0 to disable waiting for maximum consumer lag being honored at startup.",
"desc": "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup.",
"fieldValue": null,
"fieldDefaultValue": 15000000000,
"fieldFlag": "ingest-storage.kafka.max-consumer-lag-at-startup",
Expand Down
4 changes: 3 additions & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1332,9 +1332,11 @@ Usage of ./cmd/mimir/mimir:
-ingest-storage.kafka.last-produced-offset-retry-timeout duration
How long to retry a failed request to get the last produced offset. (default 10s)
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The maximum tolerated lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.producer-max-record-size-bytes int
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
The Kafka topic name.
-ingest-storage.kafka.wait-strong-read-consistency-timeout duration
Expand Down
4 changes: 3 additions & 1 deletion cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,11 @@ Usage of ./cmd/mimir/mimir:
-ingest-storage.kafka.last-produced-offset-retry-timeout duration
How long to retry a failed request to get the last produced offset. (default 10s)
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The maximum tolerated lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.producer-max-record-size-bytes int
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
The Kafka topic name.
-ingest-storage.kafka.wait-strong-read-consistency-timeout duration
Expand Down
15 changes: 12 additions & 3 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3763,10 +3763,19 @@ kafka:
# CLI flag: -ingest-storage.kafka.consume-from-timestamp-at-startup
[consume_from_timestamp_at_startup: <int> | default = 0]

# The maximum tolerated lag before a consumer is considered to have caught up
# The best-effort maximum lag a consumer tries to achieve at startup. Set both
# -ingest-storage.kafka.target-consumer-lag-at-startup and
# -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting
# for maximum consumer lag being honored at startup.
# CLI flag: -ingest-storage.kafka.target-consumer-lag-at-startup
[target_consumer_lag_at_startup: <duration> | default = 2s]

# The guaranteed maximum lag before a consumer is considered to have caught up
# reading from a partition at startup, becomes ACTIVE in the hash ring and
# passes the readiness check. Set 0 to disable waiting for maximum consumer
# lag being honored at startup.
# passes the readiness check. Set both
# -ingest-storage.kafka.target-consumer-lag-at-startup and
# -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting
# for maximum consumer lag being honored at startup.
# CLI flag: -ingest-storage.kafka.max-consumer-lag-at-startup
[max_consumer_lag_at_startup: <duration> | default = 15s]

Expand Down
4 changes: 3 additions & 1 deletion docs/sources/mimir/manage/mimir-runbooks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,9 @@ This alert fires when "receive delay" reported by ingester during "starting" pha

How it **works**:

- When ingester is starting, it needs to fetch and process records from Kafka until preconfigured consumption lag is honored. The maximum tolerated lag before an ingester is considered to have caught up reading from a partition at startup can be configured via `-ingest-storage.kafka.max-consumer-lag-at-startup`.
- When an ingester starts, it needs to fetch and process records from Kafka until a preconfigured consumption lag is honored. There are two configuration options that control the lag before an ingester is considered to have caught up reading from a partition at startup:
- `-ingest-storage.kafka.max-consumer-lag-at-startup`: this is the guaranteed maximum lag before an ingester is considered to have caught up. The ingester doesn't become ACTIVE in the hash ring and doesn't pass the readiness check until the measured lag is below this setting.
- `-ingest-storage.kafka.target-consumer-lag-at-startup`: this is the desired maximum lag that an ingester sets to achieve at startup. This setting is a best-effort. The ingester is granted a "grace period" to have the measured lag below this setting. However, the ingester still starts if the target lag hasn't been reached within this "grace period", as long as the max lag is honored. The "grace period" is equal to the configured `-ingest-storage.kafka.max-consumer-lag-at-startup`.
- Each record has a timestamp when it was sent to Kafka by the distributor. When ingester reads the record, it computes "receive delay" as a difference between current time (when record was read) and time when record was sent to Kafka. This receive delay is reported in the metric `cortex_ingest_storage_reader_receive_delay_seconds`. You can see receive delay on `Mimir / Writes` dashboard, in section "Ingester (ingest storage – end-to-end latency)".
- Under normal conditions when ingester is processing records faster than records are appearing, receive delay should be decreasing, until `-ingest-storage.kafka.max-consumer-lag-at-startup` is honored.
- When ingester is starting, and observed "receive delay" is increasing, alert is raised.
Expand Down
21 changes: 19 additions & 2 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ const (
consumeFromStart = "start"
consumeFromEnd = "end"
consumeFromTimestamp = "timestamp"

kafkaConfigFlagPrefix = "ingest-storage.kafka"
targetConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + ".target-consumer-lag-at-startup"
maxConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + ".max-consumer-lag-at-startup"
)

var (
Expand All @@ -25,6 +29,8 @@ var (
ErrInvalidWriteClients = errors.New("the configured number of write clients is invalid (must be greater than 0)")
ErrInvalidConsumePosition = errors.New("the configured consume position is invalid")
ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit)
ErrInconsistentConsumerLagAtStartup = fmt.Errorf("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0")
ErrInvalidMaxConsumerLagAtStartup = fmt.Errorf("the configured max consumer lag at startup must greater or equal than the configured target consumer lag")

consumeFromPositionOptions = []string{consumeFromLastOffset, consumeFromStart, consumeFromEnd, consumeFromTimestamp}
)
Expand All @@ -38,7 +44,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "ingest-storage.enabled", false, "True to enable the ingestion via object storage.")

cfg.KafkaConfig.RegisterFlagsWithPrefix("ingest-storage.kafka", f)
cfg.KafkaConfig.RegisterFlagsWithPrefix(kafkaConfigFlagPrefix, f)
cfg.Migration.RegisterFlagsWithPrefix("ingest-storage.migration", f)
}

Expand Down Expand Up @@ -73,6 +79,7 @@ type KafkaConfig struct {

ConsumeFromPositionAtStartup string `yaml:"consume_from_position_at_startup"`
ConsumeFromTimestampAtStartup int64 `yaml:"consume_from_timestamp_at_startup"`
TargetConsumerLagAtStartup time.Duration `yaml:"target_consumer_lag_at_startup"`
MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`

AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"`
Expand Down Expand Up @@ -106,7 +113,11 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

f.StringVar(&cfg.ConsumeFromPositionAtStartup, prefix+".consume-from-position-at-startup", consumeFromLastOffset, fmt.Sprintf("From which position to start consuming the partition at startup. Supported options: %s.", strings.Join(consumeFromPositionOptions, ", ")))
f.Int64Var(&cfg.ConsumeFromTimestampAtStartup, prefix+".consume-from-timestamp-at-startup", 0, fmt.Sprintf("Milliseconds timestamp after which the consumption of the partition starts at startup. Only applies when consume-from-position-at-startup is %s", consumeFromTimestamp))
f.DurationVar(&cfg.MaxConsumerLagAtStartup, prefix+".max-consumer-lag-at-startup", 15*time.Second, "The maximum tolerated lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set 0 to disable waiting for maximum consumer lag being honored at startup.")

howToDisableConsumerLagAtStartup := fmt.Sprintf("Set both -%s and -%s to 0 to disable waiting for maximum consumer lag being honored at startup.", targetConsumerLagAtStartupFlag, maxConsumerLagAtStartupFlag)
f.DurationVar(&cfg.TargetConsumerLagAtStartup, targetConsumerLagAtStartupFlag, 2*time.Second, "The best-effort maximum lag a consumer tries to achieve at startup. "+howToDisableConsumerLagAtStartup)
f.DurationVar(&cfg.MaxConsumerLagAtStartup, maxConsumerLagAtStartupFlag, 15*time.Second, "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. "+howToDisableConsumerLagAtStartup)

f.BoolVar(&cfg.AutoCreateTopicEnabled, prefix+".auto-create-topic-enabled", true, "Enable auto-creation of Kafka topic if it doesn't exist.")
f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", 0, "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.")

Expand Down Expand Up @@ -141,6 +152,12 @@ func (cfg *KafkaConfig) Validate() error {
if cfg.ProducerMaxRecordSizeBytes < minProducerRecordDataBytesLimit || cfg.ProducerMaxRecordSizeBytes > maxProducerRecordDataBytesLimit {
return ErrInvalidProducerMaxRecordSizeBytes
}
if (cfg.TargetConsumerLagAtStartup != 0) != (cfg.MaxConsumerLagAtStartup != 0) {
return ErrInconsistentConsumerLagAtStartup
}
if cfg.MaxConsumerLagAtStartup < cfg.TargetConsumerLagAtStartup {
return ErrInvalidMaxConsumerLagAtStartup
}

return nil
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/storage/ingest/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,36 @@ func TestConfig_Validate(t *testing.T) {
},
expectedErr: ErrInvalidProducerMaxRecordSizeBytes,
},
"should fail if target consumer lag is enabled but max consumer lag is not": {
setup: func(cfg *Config) {
cfg.Enabled = true
cfg.KafkaConfig.Address = "localhost"
cfg.KafkaConfig.Topic = "test"
cfg.KafkaConfig.TargetConsumerLagAtStartup = 2 * time.Second
cfg.KafkaConfig.MaxConsumerLagAtStartup = 0
},
expectedErr: ErrInconsistentConsumerLagAtStartup,
},
"should fail if max consumer lag is enabled but target consumer lag is not": {
setup: func(cfg *Config) {
cfg.Enabled = true
cfg.KafkaConfig.Address = "localhost"
cfg.KafkaConfig.Topic = "test"
cfg.KafkaConfig.TargetConsumerLagAtStartup = 0
cfg.KafkaConfig.MaxConsumerLagAtStartup = 2 * time.Second
},
expectedErr: ErrInconsistentConsumerLagAtStartup,
},
"should fail if target consumer lag is > max consumer lag": {
setup: func(cfg *Config) {
cfg.Enabled = true
cfg.KafkaConfig.Address = "localhost"
cfg.KafkaConfig.Topic = "test"
cfg.KafkaConfig.TargetConsumerLagAtStartup = 2 * time.Second
cfg.KafkaConfig.MaxConsumerLagAtStartup = 1 * time.Second
},
expectedErr: ErrInvalidMaxConsumerLagAtStartup,
},
}

for testName, testData := range tests {
Expand Down
Loading
Loading