Skip to content

Commit

Permalink
chore: Allow configuring fetch.message.max.bytes and queued.max.messa…
Browse files Browse the repository at this point in the history
…ges.kbytes for kafka consumer
  • Loading branch information
huantt committed Dec 15, 2022
1 parent efed011 commit afd1550
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Usage:

Flags:
--concurrent-consumers int Number of concurrent consumers (default 1)
--fetch-message-max-bytes int Maximum number of bytes per topic+partition to request when fetching messages from the broker. (default 1048576)
-f, --file string Output file path (required)
--gcs-bucket string Google Cloud Storage bucket name
--gcs-project-id string Google Cloud Storage Project ID
Expand All @@ -35,6 +36,7 @@ Flags:
--kafka-username string Kafka username
--limit uint Supports file splitting. Files are split by the number of messages specified
--max-waiting-seconds-for-new-message int Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever. (default 30)
--queued-max-messages-kbytes int Maximum number of kilobytes per topic+partition in the local consumer queue. This value may be overshot by fetch.message.max.bytes (default 128000)
--storage string Storage type: local file (file) or Google cloud storage (gcs) (default "file")

Global Flags:
Expand Down
4 changes: 4 additions & 0 deletions cmd/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func CreateExportCommand() (*cobra.Command, error) {
var kafkaPassword string
var kafkaSecurityProtocol string
var kafkaSASKMechanism string
var queuedMaxMessagesKbytes int64
var fetchMessageMaxBytes int64
var kafkaGroupID string
var topics *[]string
var exportLimitPerFile uint64
Expand Down Expand Up @@ -116,6 +118,8 @@ func CreateExportCommand() (*cobra.Command, error) {
command.Flags().Uint64Var(&exportLimitPerFile, "limit", 0, "Supports file splitting. Files are split by the number of messages specified")
command.Flags().IntVar(&maxWaitingSecondsForNewMessage, "max-waiting-seconds-for-new-message", 30, "Max waiting seconds for new message, then this process will be marked as finish. Set -1 to wait forever.")
command.Flags().IntVar(&concurrentConsumers, "concurrent-consumers", 1, "Number of concurrent consumers")
command.Flags().Int64Var(&queuedMaxMessagesKbytes, "queued-max-messages-kbytes", 128000, "Maximum number of kilobytes per topic+partition in the local consumer queue. This value may be overshot by fetch.message.max.bytes")
command.Flags().Int64Var(&fetchMessageMaxBytes, "fetch-message-max-bytes", 1048576, "Maximum number of bytes per topic+partition to request when fetching messages from the broker.")
topics = command.Flags().StringArray("kafka-topics", nil, "Kafka topics")
command.MarkFlagsRequiredTogether("kafka-username", "kafka-password", "kafka-sasl-mechanism", "kafka-security-protocol")
command.MarkFlagsRequiredTogether("google-credentials", "gcs-bucket", "gcs-project-id")
Expand Down
2 changes: 2 additions & 0 deletions pkg/kafka_utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ type Config struct {
ReadTimeoutSeconds int16 `json:"read_timeout_seconds" mapstructure:"read_timeout_seconds"`
GroupId string `json:"group_id" mapstructure:"group_id"`
QueueBufferingMaxMessages int `json:"queue_buffering_max_messages" mapstructure:"queue_buffering_max_messages"`
QueuedMaxMessagesKbytes int64 `json:"queued_max_messages_kbytes" mapstructure:"queued_max_messages_kbytes"`
FetchMessageMaxBytes int64 `json:"fetch_message_max_bytes" mapstructure:"fetch_message_max_bytes"`
}
12 changes: 12 additions & 0 deletions pkg/kafka_utils/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ func NewConsumer(cfg Config) (*kafka.Consumer, error) {
return nil, err
}
}
if cfg.QueuedMaxMessagesKbytes > 0 {
err := config.SetKey("fetch.message.max.bytes", cfg.FetchMessageMaxBytes)
if err != nil {
return nil, err
}
}
if cfg.FetchMessageMaxBytes > 0 {
err := config.SetKey("queued.max.messages.kbytes", cfg.QueuedMaxMessagesKbytes)
if err != nil {
return nil, err
}
}
consumer, err := kafka.NewConsumer(config)
if err != nil {
return nil, err
Expand Down

0 comments on commit afd1550

Please sign in to comment.