Skip to content

Commit

Permalink
output/kafka: support other compression types
Browse files Browse the repository at this point in the history
  • Loading branch information
parsa97 committed Jun 29, 2024
1 parent 3128671 commit bff943b
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 6 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ Note that command line arguments are case-insensitive as of v0.9.5
# Interval between sending results to Kafka if Batch size is not filled
--kafkabatchdelay=1s

# Compress Kafka connection
--kafkacompress
# Compress Kafka connection [snappy, gzip, lz4, zstd] empty value or 'false' disables compression.
--kafkacompress=false

# Use TLS for kafka connection
--kafkasecure
Expand Down
3 changes: 3 additions & 0 deletions config-sample.ini
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ kafkabatchdelay = 1s
; Compress Kafka connection
kafkacompress = false

; Compression Type[gzip, snappy, lz4, zstd] default is snappy
kafkacompressiontype = snappy

; Use TLS for kafka connection
kafkasecure = false

Expand Down
2 changes: 1 addition & 1 deletion docs/content/en/docs/Outputs/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ KafkaTimeout = 3
KafkaBatchDelay = 1s

; Compress Kafka connection
KafkaCompress = false
KafkaCompress = gzip

; Use TLS for kafka connection
KafkaSecure = false
Expand Down
18 changes: 15 additions & 3 deletions internal/output/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type kafkaConfig struct {
KafkaTimeout uint `long:"kafkatimeout" ini-name:"kafkatimeout" env:"DNSMONSTER_KAFKATIMEOUT" default:"3" description:"Kafka connection timeout in seconds"`
KafkaBatchDelay time.Duration `long:"kafkabatchdelay" ini-name:"kafkabatchdelay" env:"DNSMONSTER_KAFKABATCHDELAY" default:"1s" description:"Interval between sending results to Kafka if Batch size is not filled"`
KafkaCompress bool `long:"kafkacompress" ini-name:"kafkacompress" env:"DNSMONSTER_KAFKACOMPRESS" description:"Compress Kafka connection"`
KafkaCompressionType string `long:"kafkacompressiontype" ini-name:"kafkacompressiontype" env:"DNSMONSTER_KAFKACOMPRESSIONTYPE" default:"snappy" description:"Compression Type Kafka connection [snappy gzip lz4 zstd]; default(snappy)." choice:"snappy" choice:"gzip" choice:"lz4" choice:"zstd"`
KafkaSecure bool `long:"kafkasecure" ini-name:"kafkasecure" env:"DNSMONSTER_KAFKASECURE" description:"Use TLS for kafka connection"`
KafkaCACertificatePath string `long:"kafkacacertificatepath" ini-name:"kafkacacertificatepath" env:"DNSMONSTER_KAFKACACERTIFICATEPATH" default:"" description:"Path of CA certificate that signs Kafka broker certificate"`
KafkaTLSCertificatePath string `long:"kafkatlscertificatepath" ini-name:"kafkatlscertificatepath" env:"DNSMONSTER_KAFKATLSCERTIFICATEPATH" default:"" description:"Path of TLS certificate to present to broker"`
Expand Down Expand Up @@ -146,11 +147,22 @@ func (kafConfig kafkaConfig) getWriter() *kafka.Writer {
Topic: kafConfig.KafkaOutputTopic,
Transport: transport,
}

if kafConfig.KafkaCompress {
kWriter.Compression = kafka.Snappy
switch kafConfig.KafkaCompressionType {
case "gzip":
kWriter.Compression = kafka.Gzip
log.Info("Kafka using compression: gzip")
case "snappy":
kWriter.Compression = kafka.Snappy
log.Info("Kafka using compression: snappy")
case "lz4":
kWriter.Compression = kafka.Lz4
log.Info("Kafka using compression: lz4")
case "zstd":
kWriter.Compression = kafka.Zstd
log.Info("Kafka using compression: zstd")
}
}

return kWriter
}

Expand Down

0 comments on commit bff943b

Please sign in to comment.