From 7a872fd251b3929ac522757070a82bc7b8cdd8c5 Mon Sep 17 00:00:00 2001 From: Gabriel Mendes Date: Fri, 23 Apr 2021 15:18:47 -0300 Subject: [PATCH] feat(consumer): add batch send deadline --- README.md | 1 + cmd/injector.go | 1 + src/injector/injector.go | 6 ++++++ src/kafka/config.go | 1 + src/kafka/consumer.go | 24 +++++++++++++++++++----- 5 files changed, 28 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 9b1d42b..92d8cf2 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ To create new injectors for your topics, you should create a new kubernetes depl - `ELASTICSEARCH_DISABLE_SNIFFING` if set to "true", the client will not sniff Elasticsearch nodes during the node discovery process. Defaults to false. **OPTIONAL** - `KAFKA_CONSUMER_CONCURRENCY` Number of parallel goroutines working as a consumer. Default value is 1 **OPTIONAL** - `KAFKA_CONSUMER_BATCH_SIZE` Number of records to accumulate before sending them to Elasticsearch (for each goroutine). Default value is 100 **OPTIONAL** +- `KAFKA_CONSUMER_BATCH_DEADLINE` If no new records are added to the batch after this time duration, the batch will be sent to Elasticsearch. Default value is 1m **OPTIONAL** - `ES_INDEX_COLUMN` Record field to append to index name. Ex: to create one ES index per campaign, use "campaign_id" here **OPTIONAL** - `ES_BLACKLISTED_COLUMNS` Comma separated list of record fields to filter before sending to Elasticsearch. Defaults to empty string. **OPTIONAL** - `ES_DOC_ID_COLUMN` Record field to be the document ID of Elasticsearch. Defaults to "kafkaRecordPartition:kafkaRecordOffset". **OPTIONAL** diff --git a/cmd/injector.go b/cmd/injector.go index 9e3f94b..c996bac 100644 --- a/cmd/injector.go +++ b/cmd/injector.go @@ -42,6 +42,7 @@ func main() { ConsumerGroup: os.Getenv("KAFKA_CONSUMER_GROUP"), Concurrency: os.Getenv("KAFKA_CONSUMER_CONCURRENCY"), BatchSize: os.Getenv("KAFKA_CONSUMER_BATCH_SIZE"), + BatchDeadline: os.Getenv("KAFKA_CONSUMER_BATCH_DEADLINE"), BufferSize: os.Getenv("KAFKA_CONSUMER_BUFFER_SIZE"), MetricsUpdateInterval: os.Getenv("KAFKA_CONSUMER_METRICS_UPDATE_INTERVAL"), RecordType: os.Getenv("KAFKA_CONSUMER_RECORD_TYPE"), diff --git a/src/injector/injector.go b/src/injector/injector.go index b26d3d9..7d2b63e 100644 --- a/src/injector/injector.go +++ b/src/injector/injector.go @@ -22,6 +22,11 @@ func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *s level.Warn(logger).Log("err", err, "message", "failed to get consumer batch size") batchSize = 100 } + batchDeadline, err := time.ParseDuration(kafkaConfig.BatchDeadline) + if err != nil { + level.Warn(logger).Log("err", err, "message", "failed to get consumer batch deadline") + batchDeadline = time.Minute + } metricsUpdateInterval, err := time.ParseDuration(kafkaConfig.MetricsUpdateInterval) if err != nil { level.Warn(logger).Log("err", err, "message", "failed to get consumer metrics update interval") @@ -54,6 +59,7 @@ func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *s Logger: logger, Concurrency: concurrency, BatchSize: batchSize, + BatchDeadline: batchDeadline, MetricsUpdateInterval: metricsUpdateInterval, BufferSize: bufferSize, IncludeKey: includeKey, diff --git a/src/kafka/config.go b/src/kafka/config.go index f41351b..4b719c1 100644 --- a/src/kafka/config.go +++ b/src/kafka/config.go @@ -10,6 +10,7 @@ type Config struct { ConsumerGroup string Concurrency string BatchSize string + BatchDeadline string MetricsUpdateInterval string BufferSize string RecordType string diff --git a/src/kafka/consumer.go b/src/kafka/consumer.go index df942a9..1d46584 100644 --- a/src/kafka/consumer.go +++ b/src/kafka/consumer.go @@ -2,8 +2,8 @@ package kafka import ( "context" - "os" "errors" + "os" "time" @@ -12,9 +12,9 @@ import ( "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + e "github.com/inloco/kafka-elasticsearch-injector/src/errors" "github.com/inloco/kafka-elasticsearch-injector/src/metrics" "github.com/inloco/kafka-elasticsearch-injector/src/models" - e "github.com/inloco/kafka-elasticsearch-injector/src/errors" ) type Notification int32 @@ -41,6 +41,7 @@ type Consumer struct { Logger log.Logger Concurrency int BatchSize int + BatchDeadline time.Duration MetricsUpdateInterval time.Duration BufferSize int IncludeKey bool @@ -80,8 +81,9 @@ func (k *kafka) Start(signals chan os.Signal, notifications chan<- Notification) defer consumer.Close() buffSize := k.consumer.BatchSize + batchDeadline := k.consumer.BatchDeadline for i := 0; i < concurrency; i++ { - go k.worker(consumer, buffSize, notifications) + go k.worker(consumer, buffSize, batchDeadline, notifications) } go func() { for { @@ -134,15 +136,24 @@ func (k *kafka) Start(signals chan os.Signal, notifications chan<- Notification) } } -func (k *kafka) worker(consumer *cluster.Consumer, buffSize int, notifications chan<- Notification) { +func batchDeadlineExceeded(lastReceivedMsg *time.Time, batchDeadline time.Duration) bool { + if lastReceivedMsg == nil { + return false + } + + return time.Now().Sub(*lastReceivedMsg) > batchDeadline +} + +func (k *kafka) worker(consumer *cluster.Consumer, buffSize int, batchDeadline time.Duration, notifications chan<- Notification) { buf := make([]*sarama.ConsumerMessage, buffSize) var decoded []*models.Record + var lastReceivedMsg *time.Time idx := 0 for { kafkaMsg := <-k.consumerCh buf[idx] = kafkaMsg idx++ - for idx == buffSize { + if idx == buffSize || batchDeadlineExceeded(lastReceivedMsg, batchDeadline) { if decoded == nil { for _, msg := range buf { req, err := k.consumer.Decoder(nil, msg, k.consumer.IncludeKey) @@ -172,7 +183,10 @@ func (k *kafka) worker(consumer *cluster.Consumer, buffSize int, notifications c consumer.MarkOffset(msg, "") // mark message as processed } decoded = nil + lastReceivedMsg = nil idx = 0 + } else { + *lastReceivedMsg = time.Now() } } }