Skip to content
This repository has been archived by the owner on Oct 7, 2022. It is now read-only.

Add deadline on consumer to send non-empty record batch #84

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ To create new injectors for your topics, you should create a new kubernetes depl
- `ELASTICSEARCH_DISABLE_SNIFFING` if set to "true", the client will not sniff Elasticsearch nodes during the node discovery process. Defaults to false. **OPTIONAL**
- `KAFKA_CONSUMER_CONCURRENCY` Number of parallel goroutines working as a consumer. Default value is 1 **OPTIONAL**
- `KAFKA_CONSUMER_BATCH_SIZE` Number of records to accumulate before sending them to Elasticsearch (for each goroutine). Default value is 100 **OPTIONAL**
- `KAFKA_CONSUMER_BATCH_DEADLINE` If no new records are added to the batch after this time duration, the batch will be sent to Elasticsearch. Default value is 1m **OPTIONAL**
- `ES_INDEX_COLUMN` Record field to append to index name. Ex: to create one ES index per campaign, use "campaign_id" here **OPTIONAL**
- `ES_BLACKLISTED_COLUMNS` Comma separated list of record fields to filter before sending to Elasticsearch. Defaults to empty string. **OPTIONAL**
- `ES_DOC_ID_COLUMN` Record field to be the document ID of Elasticsearch. Defaults to "kafkaRecordPartition:kafkaRecordOffset". **OPTIONAL**
Expand Down
1 change: 1 addition & 0 deletions cmd/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func main() {
ConsumerGroup: os.Getenv("KAFKA_CONSUMER_GROUP"),
Concurrency: os.Getenv("KAFKA_CONSUMER_CONCURRENCY"),
BatchSize: os.Getenv("KAFKA_CONSUMER_BATCH_SIZE"),
BatchDeadline: os.Getenv("KAFKA_CONSUMER_BATCH_DEADLINE"),
BufferSize: os.Getenv("KAFKA_CONSUMER_BUFFER_SIZE"),
MetricsUpdateInterval: os.Getenv("KAFKA_CONSUMER_METRICS_UPDATE_INTERVAL"),
RecordType: os.Getenv("KAFKA_CONSUMER_RECORD_TYPE"),
Expand Down
Loading