Skip to content

Commit

Permalink
Verify main and retry topics by default
Browse files Browse the repository at this point in the history
  • Loading branch information
kutay-o committed Jan 2, 2025
1 parent d234e31 commit 9327066
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 58 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `retryEnabled` | Retry/Exception consumer is working or not | false |
| `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true |
| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s |
| `verifyTopicOnStartup` | it checks existence of the given topic(s) on the kafka cluster. | false |
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | |
| `messageGroupDuration` | Maximum time to wait for a batch | 1s |
Expand Down Expand Up @@ -265,7 +264,6 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | |
| `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | |
| `retryConfiguration.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil |
| `retryConfiguration.verifyTopicOnStartup` | it checks existence of the given retry topic on the kafka cluster. | false |
| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | 100 |
| `batchConfiguration.messageGroupByteSizeLimit` | Maximum number of bytes in a batch | |
| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | |
Expand Down
32 changes: 19 additions & 13 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,12 @@ func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {
log := NewZapLogger(cfg.LogLevel)

if cfg.VerifyTopicOnStartup {
kclient, err := newKafkaClient(cfg)
if err != nil {
return nil, err
}
exist, err := verifyTopics(kclient, cfg)
if err != nil {
return nil, err
}
if !exist {
return nil, fmt.Errorf("topics %s does not exist, please check cluster authority etc", cfg.getTopics())
}
log.Infof("Topic [%s] verified successfully!", cfg.getTopics())
if err := verifyTopicOnStartup(cfg); err != nil {
return nil, err
}

log.Infof("Topic [%s] verified successfully!", cfg.getTopics())

reader, err := cfg.newKafkaReader()
if err != nil {
log.Errorf("Error when initializing kafka reader %v", err)
Expand Down Expand Up @@ -151,6 +142,21 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {
return &c, nil
}

func verifyTopicOnStartup(cfg *ConsumerConfig) error {
kclient, err := newKafkaClient(cfg)
if err != nil {
return err
}
exist, err := verifyTopics(kclient, cfg)
if err != nil {
return err
}
if !exist {
return fmt.Errorf("topics %s does not exist, please check cluster authority etc", cfg.getTopics())
}
return nil
}

func (c *base) setupCronsumer(cfg *ConsumerConfig, retryFn func(kcronsumer.Message) error) {
c.logger.Debug("Initializing Cronsumer")
c.retryTopic = cfg.RetryConfiguration.Topic
Expand Down
47 changes: 22 additions & 25 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type ConsumerConfig struct {
RetryConfiguration RetryConfiguration
LogLevel LogLevel
Rack string
VerifyTopicOnStartup bool
ClientID string
Reader ReaderConfig
CommitInterval time.Duration
Expand All @@ -68,9 +67,9 @@ type ConsumerConfig struct {

func (cfg RetryConfiguration) JSON() string {
return fmt.Sprintf(`{"Brokers": ["%s"], "Topic": %q, "StartTimeCron": %q, "WorkDuration": %q, `+
`"MaxRetry": %d, "VerifyTopicOnStartup": %t, "Rack": %q}`,
`"MaxRetry": %d, "Rack": %q}`,
strings.Join(cfg.Brokers, "\", \""), cfg.Topic, cfg.StartTimeCron,
cfg.WorkDuration, cfg.MaxRetry, cfg.VerifyTopicOnStartup, cfg.Rack)
cfg.WorkDuration, cfg.MaxRetry, cfg.Rack)
}

func (cfg *BatchConfiguration) JSON() string {
Expand Down Expand Up @@ -99,10 +98,10 @@ func (cfg *ConsumerConfig) JSON() string {
}
return fmt.Sprintf(`{"ClientID": %q, "Reader": %s, "BatchConfiguration": %s, "MessageGroupDuration": %q, `+
`"TransactionalRetry": %t, "Concurrency": %d, "RetryEnabled": %t, "RetryConfiguration": %s, `+
`"VerifyTopicOnStartup": %t, "Rack": %q, "SASL": %s, "TLS": %s}`,
`"Rack": %q, "SASL": %s, "TLS": %s}`,
cfg.ClientID, cfg.Reader.JSON(), cfg.BatchConfiguration.JSON(),
cfg.MessageGroupDuration, *cfg.TransactionalRetry, cfg.Concurrency,
cfg.RetryEnabled, cfg.RetryConfiguration.JSON(), cfg.VerifyTopicOnStartup,
cfg.RetryEnabled, cfg.RetryConfiguration.JSON(),
cfg.Rack, cfg.SASL.JSON(), cfg.TLS.JSON())
}

Expand All @@ -123,25 +122,24 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
ClientID: cfg.RetryConfiguration.ClientID,
Brokers: cfg.RetryConfiguration.Brokers,
Consumer: kcronsumer.ConsumerConfig{
ClientID: cfg.ClientID,
GroupID: cfg.Reader.GroupID,
Topic: cfg.RetryConfiguration.Topic,
DeadLetterTopic: cfg.RetryConfiguration.DeadLetterTopic,
Cron: cfg.RetryConfiguration.StartTimeCron,
Duration: cfg.RetryConfiguration.WorkDuration,
MaxRetry: cfg.RetryConfiguration.MaxRetry,
VerifyTopicOnStartup: cfg.RetryConfiguration.VerifyTopicOnStartup,
Concurrency: cfg.RetryConfiguration.Concurrency,
QueueCapacity: cfg.RetryConfiguration.QueueCapacity,
MinBytes: cfg.Reader.MinBytes,
MaxBytes: cfg.Reader.MaxBytes,
MaxWait: cfg.Reader.MaxWait,
CommitInterval: cfg.Reader.CommitInterval,
HeartbeatInterval: cfg.Reader.HeartbeatInterval,
SessionTimeout: cfg.Reader.SessionTimeout,
RebalanceTimeout: cfg.Reader.RebalanceTimeout,
StartOffset: kcronsumer.ToStringOffset(cfg.Reader.StartOffset),
RetentionTime: cfg.Reader.RetentionTime,
ClientID: cfg.ClientID,
GroupID: cfg.Reader.GroupID,
Topic: cfg.RetryConfiguration.Topic,
DeadLetterTopic: cfg.RetryConfiguration.DeadLetterTopic,
Cron: cfg.RetryConfiguration.StartTimeCron,
Duration: cfg.RetryConfiguration.WorkDuration,
MaxRetry: cfg.RetryConfiguration.MaxRetry,
Concurrency: cfg.RetryConfiguration.Concurrency,
QueueCapacity: cfg.RetryConfiguration.QueueCapacity,
MinBytes: cfg.Reader.MinBytes,
MaxBytes: cfg.Reader.MaxBytes,
MaxWait: cfg.Reader.MaxWait,
CommitInterval: cfg.Reader.CommitInterval,
HeartbeatInterval: cfg.Reader.HeartbeatInterval,
SessionTimeout: cfg.Reader.SessionTimeout,
RebalanceTimeout: cfg.Reader.RebalanceTimeout,
StartOffset: kcronsumer.ToStringOffset(cfg.Reader.StartOffset),
RetentionTime: cfg.Reader.RetentionTime,
},
Producer: kcronsumer.ProducerConfig{
Balancer: cfg.RetryConfiguration.Balancer,
Expand Down Expand Up @@ -226,7 +224,6 @@ type RetryConfiguration struct {
Topic string
DeadLetterTopic string
Rack string
VerifyTopicOnStartup bool
LogLevel LogLevel
Brokers []string
Balancer Balancer
Expand Down
31 changes: 13 additions & 18 deletions consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ func TestConsumerConfig_JSON(t *testing.T) {
"\"CommitInterval\": \"1s\", \"StartOffset\": \"earliest\"}, \"BatchConfiguration\": {\"MessageGroupLimit\": 100}, " +
"\"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, \"RetryEnabled\": true, " +
"\"RetryConfiguration\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"Topic\": \"test-exception.0\", " +
"\"StartTimeCron\": \"*/2 * * * *\", \"WorkDuration\": \"1m0s\", \"MaxRetry\": 3, \"VerifyTopicOnStartup\": true, \"Rack\": \"\"}, " +
"\"VerifyTopicOnStartup\": true, \"Rack\": \"stage\", " +
"\"StartTimeCron\": \"*/2 * * * *\", \"WorkDuration\": \"1m0s\", \"MaxRetry\": 3, \"Rack\": \"\"}, " +
"\"Rack\": \"stage\", " +
"\"SASL\": {\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}, " +
"\"TLS\": {\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}}"
// When
Expand All @@ -270,7 +270,7 @@ func TestConsumerConfig_JSON(t *testing.T) {
"\"GroupTopics\": [\"\"], \"MaxWait\": \"0s\", \"CommitInterval\": \"0s\", \"StartOffset\": \"earliest\"}, " +
"\"BatchConfiguration\": {}, \"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, " +
"\"RetryEnabled\": true, \"RetryConfiguration\": {\"Brokers\": [\"\"], \"Topic\": \"\", \"StartTimeCron\": \"\", " +
"\"WorkDuration\": \"0s\", \"MaxRetry\": 0, \"VerifyTopicOnStartup\": false, \"Rack\": \"\"}, \"VerifyTopicOnStartup\": true, " +
"\"WorkDuration\": \"0s\", \"MaxRetry\": 0, \"Rack\": \"\"}, " +
"\"Rack\": \"stage\", \"SASL\": {}, \"TLS\": {}}"
// When
result := getConsumerConfigWithoutInnerObjectExample().JSON()
Expand All @@ -289,8 +289,8 @@ func TestConsumerConfig_String(t *testing.T) {
"StartOffset: \"earliest\"}, BatchConfiguration: {MessageGroupLimit: 100}, MessageGroupDuration: \"20ns\", " +
"TransactionalRetry: false, Concurrency: 10, RetryEnabled: true, " +
"RetryConfiguration: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], Topic: \"test-exception.0\", " +
"StartTimeCron: \"*/2 * * * *\", WorkDuration: \"1m0s\", MaxRetry: 3, VerifyTopicOnStartup: true, Rack: \"\"}, " +
"VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, " +
"StartTimeCron: \"*/2 * * * *\", WorkDuration: \"1m0s\", MaxRetry: 3, Rack: \"\"}, " +
"Rack: \"stage\", SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, " +
"TLS: {RootCAPath: \"resources/ca\", IntermediateCAPath: \"resources/intCa\"}"
// When
result := getConsumerConfigExample().String()
Expand All @@ -305,7 +305,7 @@ func TestConsumerConfig_String(t *testing.T) {
"GroupTopics: [\"\"], MaxWait: \"0s\", CommitInterval: \"0s\", StartOffset: \"earliest\"}, " +
"BatchConfiguration: {}, MessageGroupDuration: \"20ns\", TransactionalRetry: false, Concurrency: 10, " +
"RetryEnabled: true, RetryConfiguration: {Brokers: [\"\"], Topic: \"\", StartTimeCron: \"\", WorkDuration: \"0s\", " +
"MaxRetry: 0, VerifyTopicOnStartup: false, Rack: \"\"}, VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {}, TLS: {}"
"MaxRetry: 0, Rack: \"\"}, Rack: \"stage\", SASL: {}, TLS: {}"
// When
result := getConsumerConfigWithoutInnerObjectExample().String()
// Then
Expand All @@ -326,8 +326,7 @@ func TestConsumerConfig_JSONPretty(t *testing.T) {
"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"RetryEnabled\": true,\n\t\"" +
"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"" +
"Topic\": \"test-exception.0\",\n\t\t\"StartTimeCron\": \"*/2 * * * *\",\n\t\t\"WorkDuration\": \"1m0s\",\n\t\t\"" +
"MaxRetry\": 3,\n\t\t\"VerifyTopicOnStartup\": true,\n\t\t\"Rack\": \"\"\n\t},\n\t\"" +
"VerifyTopicOnStartup\": true,\n\t\"Rack\": \"stage\",\n\t\"" +
"MaxRetry\": 3,\n\t\t\"Rack\": \"\"\n\t},\n\t\"Rack\": \"stage\",\n\t\"" +
"SASL\": {\n\t\t\"Mechanism\": \"scram\",\n\t\t\"Username\": \"user\",\n\t\t\"Password\": \"pass\"\n\t},\n\t\"" +
"TLS\": {\n\t\t\"RootCAPath\": \"resources/ca\",\n\t\t\"IntermediateCAPath\": \"resources/intCa\"\n\t}\n}"
// When
Expand All @@ -346,8 +345,7 @@ func TestConsumerConfig_JSONPretty(t *testing.T) {
"MessageGroupDuration\": \"20ns\",\n\t\"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"" +
"RetryEnabled\": true,\n\t\"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"" +
"Topic\": \"\",\n\t\t\"StartTimeCron\": \"\",\n\t\t\"WorkDuration\": \"0s\",\n\t\t\"MaxRetry\": 0,\n\t\t\"" +
"VerifyTopicOnStartup\": false,\n\t\t\"Rack\": \"\"\n\t},\n\t\"VerifyTopicOnStartup\": true,\n\t\"" +
"Rack\": \"stage\",\n\t\"SASL\": {},\n\t\"TLS\": {}\n}"
"Rack\": \"\"\n\t},\n\t\"Rack\": \"stage\",\n\t\"SASL\": {},\n\t\"TLS\": {}\n}"
// When
result := getConsumerConfigWithoutInnerObjectExample().JSONPretty()
// Then
Expand Down Expand Up @@ -418,14 +416,12 @@ func getConsumerConfigExample() *ConsumerConfig {
Concurrency: 10,
RetryEnabled: true,
RetryConfiguration: RetryConfiguration{
Brokers: []string{"broker-1.test.com", "broker-2.test.com"},
Topic: "test-exception.0",
StartTimeCron: "*/2 * * * *",
WorkDuration: time.Minute * 1,
MaxRetry: 3,
VerifyTopicOnStartup: true,
Brokers: []string{"broker-1.test.com", "broker-2.test.com"},
Topic: "test-exception.0",
StartTimeCron: "*/2 * * * *",
WorkDuration: time.Minute * 1,
MaxRetry: 3,
},
VerifyTopicOnStartup: true,
TLS: &TLSConfig{
RootCAPath: "resources/ca",
IntermediateCAPath: "resources/intCa",
Expand All @@ -448,6 +444,5 @@ func getConsumerConfigWithoutInnerObjectExample() *ConsumerConfig {
Concurrency: 10,
RetryEnabled: true,
RetryConfiguration: RetryConfiguration{},
VerifyTopicOnStartup: true,
}
}

0 comments on commit 9327066

Please sign in to comment.