diff --git a/README.md b/README.md index 4ff2434..0f0910a 100644 --- a/README.md +++ b/README.md @@ -234,7 +234,6 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap | `retryEnabled` | Retry/Exception consumer is working or not | false | | `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true | | `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s | -| `verifyTopicOnStartup` | it checks existence of the given topic(s) on the kafka cluster. | false | | `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | | | `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | | | `messageGroupDuration` | Maximum time to wait for a batch | 1s | @@ -265,7 +264,6 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap | `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | | | `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | | | `retryConfiguration.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | -| `retryConfiguration.verifyTopicOnStartup` | it checks existence of the given retry topic on the kafka cluster. | false | | `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | 100 | | `batchConfiguration.messageGroupByteSizeLimit` | Maximum number of bytes in a batch | | | `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | | diff --git a/consumer_base.go b/consumer_base.go index 083e2df..c0abb2d 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -98,21 +98,13 @@ func NewConsumer(cfg *ConsumerConfig) (Consumer, error) { func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { log := NewZapLogger(cfg.LogLevel) - if cfg.VerifyTopicOnStartup { - kclient, err := newKafkaClient(cfg) - if err != nil { - return nil, err - } - exist, err := verifyTopics(kclient, cfg) - if err != nil { - return nil, err - } - if !exist { - return nil, fmt.Errorf("topics %s does not exist, please check cluster authority etc", cfg.getTopics()) - } - log.Infof("Topic [%s] verified successfully!", cfg.getTopics()) + err := verifyTopicOnStartup(cfg) + if err != nil { + return nil, err } + log.Infof("Topic [%s] verified successfully!", cfg.getTopics()) + reader, err := cfg.newKafkaReader() if err != nil { log.Errorf("Error when initializing kafka reader %v", err) @@ -151,6 +143,38 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { return &c, nil } +func verifyTopicOnStartup(cfg *ConsumerConfig) error { + if err := verifyTopicsWithClient(cfg.Reader.Brokers, cfg.getTopics(), cfg.SASL, cfg.TLS); err != nil { + return err + } + + if cfg.RetryEnabled { + if err := verifyTopicsWithClient(cfg.RetryConfiguration.Brokers, []string{cfg.getRetryTopic()}, cfg.SASL, cfg.TLS); err != nil { + return err + } + } + + return nil +} + +func verifyTopicsWithClient(brokers []string, topics []string, sasl *SASLConfig, tls *TLSConfig) error { + kclient, err := newKafkaClient(brokers, topics, sasl, tls) + if err != nil { + return err + } + + exists, err := verifyTopics(kclient, topics) + if err != nil { + return err + } + + if !exists { + return fmt.Errorf("topics %s does not exist, please check cluster authority etc", topics) + } + + return nil +} + func (c *base) setupCronsumer(cfg *ConsumerConfig, retryFn func(kcronsumer.Message) error) { c.logger.Debug("Initializing Cronsumer") c.retryTopic = cfg.RetryConfiguration.Topic diff --git a/consumer_config.go b/consumer_config.go index 8c70133..ecb3751 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -48,7 +48,6 @@ type ConsumerConfig struct { RetryConfiguration RetryConfiguration LogLevel LogLevel Rack string - VerifyTopicOnStartup bool ClientID string Reader ReaderConfig CommitInterval time.Duration @@ -68,9 +67,9 @@ type ConsumerConfig struct { func (cfg RetryConfiguration) JSON() string { return fmt.Sprintf(`{"Brokers": ["%s"], "Topic": %q, "StartTimeCron": %q, "WorkDuration": %q, `+ - `"MaxRetry": %d, "VerifyTopicOnStartup": %t, "Rack": %q}`, + `"MaxRetry": %d, "Rack": %q}`, strings.Join(cfg.Brokers, "\", \""), cfg.Topic, cfg.StartTimeCron, - cfg.WorkDuration, cfg.MaxRetry, cfg.VerifyTopicOnStartup, cfg.Rack) + cfg.WorkDuration, cfg.MaxRetry, cfg.Rack) } func (cfg *BatchConfiguration) JSON() string { @@ -99,10 +98,10 @@ func (cfg *ConsumerConfig) JSON() string { } return fmt.Sprintf(`{"ClientID": %q, "Reader": %s, "BatchConfiguration": %s, "MessageGroupDuration": %q, `+ `"TransactionalRetry": %t, "Concurrency": %d, "RetryEnabled": %t, "RetryConfiguration": %s, `+ - `"VerifyTopicOnStartup": %t, "Rack": %q, "SASL": %s, "TLS": %s}`, + `"Rack": %q, "SASL": %s, "TLS": %s}`, cfg.ClientID, cfg.Reader.JSON(), cfg.BatchConfiguration.JSON(), cfg.MessageGroupDuration, *cfg.TransactionalRetry, cfg.Concurrency, - cfg.RetryEnabled, cfg.RetryConfiguration.JSON(), cfg.VerifyTopicOnStartup, + cfg.RetryEnabled, cfg.RetryConfiguration.JSON(), cfg.Rack, cfg.SASL.JSON(), cfg.TLS.JSON()) } @@ -123,25 +122,24 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { ClientID: cfg.RetryConfiguration.ClientID, Brokers: cfg.RetryConfiguration.Brokers, Consumer: kcronsumer.ConsumerConfig{ - ClientID: cfg.ClientID, - GroupID: cfg.Reader.GroupID, - Topic: cfg.RetryConfiguration.Topic, - DeadLetterTopic: cfg.RetryConfiguration.DeadLetterTopic, - Cron: cfg.RetryConfiguration.StartTimeCron, - Duration: cfg.RetryConfiguration.WorkDuration, - MaxRetry: cfg.RetryConfiguration.MaxRetry, - VerifyTopicOnStartup: cfg.RetryConfiguration.VerifyTopicOnStartup, - Concurrency: cfg.RetryConfiguration.Concurrency, - QueueCapacity: cfg.RetryConfiguration.QueueCapacity, - MinBytes: cfg.Reader.MinBytes, - MaxBytes: cfg.Reader.MaxBytes, - MaxWait: cfg.Reader.MaxWait, - CommitInterval: cfg.Reader.CommitInterval, - HeartbeatInterval: cfg.Reader.HeartbeatInterval, - SessionTimeout: cfg.Reader.SessionTimeout, - RebalanceTimeout: cfg.Reader.RebalanceTimeout, - StartOffset: kcronsumer.ToStringOffset(cfg.Reader.StartOffset), - RetentionTime: cfg.Reader.RetentionTime, + ClientID: cfg.ClientID, + GroupID: cfg.Reader.GroupID, + Topic: cfg.RetryConfiguration.Topic, + DeadLetterTopic: cfg.RetryConfiguration.DeadLetterTopic, + Cron: cfg.RetryConfiguration.StartTimeCron, + Duration: cfg.RetryConfiguration.WorkDuration, + MaxRetry: cfg.RetryConfiguration.MaxRetry, + Concurrency: cfg.RetryConfiguration.Concurrency, + QueueCapacity: cfg.RetryConfiguration.QueueCapacity, + MinBytes: cfg.Reader.MinBytes, + MaxBytes: cfg.Reader.MaxBytes, + MaxWait: cfg.Reader.MaxWait, + CommitInterval: cfg.Reader.CommitInterval, + HeartbeatInterval: cfg.Reader.HeartbeatInterval, + SessionTimeout: cfg.Reader.SessionTimeout, + RebalanceTimeout: cfg.Reader.RebalanceTimeout, + StartOffset: kcronsumer.ToStringOffset(cfg.Reader.StartOffset), + RetentionTime: cfg.Reader.RetentionTime, }, Producer: kcronsumer.ProducerConfig{ Balancer: cfg.RetryConfiguration.Balancer, @@ -182,6 +180,10 @@ func (cfg *ConsumerConfig) getTopics() []string { return []string{cfg.Reader.Topic} } +func (cfg *ConsumerConfig) getRetryTopic() string { + return cfg.RetryConfiguration.Topic +} + type APIConfiguration struct { // Port default is 8090 Port *int @@ -226,7 +228,6 @@ type RetryConfiguration struct { Topic string DeadLetterTopic string Rack string - VerifyTopicOnStartup bool LogLevel LogLevel Brokers []string Balancer Balancer diff --git a/consumer_config_test.go b/consumer_config_test.go index 559065e..f0d282c 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -253,8 +253,8 @@ func TestConsumerConfig_JSON(t *testing.T) { "\"CommitInterval\": \"1s\", \"StartOffset\": \"earliest\"}, \"BatchConfiguration\": {\"MessageGroupLimit\": 100}, " + "\"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, \"RetryEnabled\": true, " + "\"RetryConfiguration\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"Topic\": \"test-exception.0\", " + - "\"StartTimeCron\": \"*/2 * * * *\", \"WorkDuration\": \"1m0s\", \"MaxRetry\": 3, \"VerifyTopicOnStartup\": true, \"Rack\": \"\"}, " + - "\"VerifyTopicOnStartup\": true, \"Rack\": \"stage\", " + + "\"StartTimeCron\": \"*/2 * * * *\", \"WorkDuration\": \"1m0s\", \"MaxRetry\": 3, \"Rack\": \"\"}, " + + "\"Rack\": \"stage\", " + "\"SASL\": {\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}, " + "\"TLS\": {\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}}" // When @@ -270,7 +270,7 @@ func TestConsumerConfig_JSON(t *testing.T) { "\"GroupTopics\": [\"\"], \"MaxWait\": \"0s\", \"CommitInterval\": \"0s\", \"StartOffset\": \"earliest\"}, " + "\"BatchConfiguration\": {}, \"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, " + "\"RetryEnabled\": true, \"RetryConfiguration\": {\"Brokers\": [\"\"], \"Topic\": \"\", \"StartTimeCron\": \"\", " + - "\"WorkDuration\": \"0s\", \"MaxRetry\": 0, \"VerifyTopicOnStartup\": false, \"Rack\": \"\"}, \"VerifyTopicOnStartup\": true, " + + "\"WorkDuration\": \"0s\", \"MaxRetry\": 0, \"Rack\": \"\"}, " + "\"Rack\": \"stage\", \"SASL\": {}, \"TLS\": {}}" // When result := getConsumerConfigWithoutInnerObjectExample().JSON() @@ -289,8 +289,8 @@ func TestConsumerConfig_String(t *testing.T) { "StartOffset: \"earliest\"}, BatchConfiguration: {MessageGroupLimit: 100}, MessageGroupDuration: \"20ns\", " + "TransactionalRetry: false, Concurrency: 10, RetryEnabled: true, " + "RetryConfiguration: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], Topic: \"test-exception.0\", " + - "StartTimeCron: \"*/2 * * * *\", WorkDuration: \"1m0s\", MaxRetry: 3, VerifyTopicOnStartup: true, Rack: \"\"}, " + - "VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, " + + "StartTimeCron: \"*/2 * * * *\", WorkDuration: \"1m0s\", MaxRetry: 3, Rack: \"\"}, " + + "Rack: \"stage\", SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, " + "TLS: {RootCAPath: \"resources/ca\", IntermediateCAPath: \"resources/intCa\"}" // When result := getConsumerConfigExample().String() @@ -305,7 +305,7 @@ func TestConsumerConfig_String(t *testing.T) { "GroupTopics: [\"\"], MaxWait: \"0s\", CommitInterval: \"0s\", StartOffset: \"earliest\"}, " + "BatchConfiguration: {}, MessageGroupDuration: \"20ns\", TransactionalRetry: false, Concurrency: 10, " + "RetryEnabled: true, RetryConfiguration: {Brokers: [\"\"], Topic: \"\", StartTimeCron: \"\", WorkDuration: \"0s\", " + - "MaxRetry: 0, VerifyTopicOnStartup: false, Rack: \"\"}, VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {}, TLS: {}" + "MaxRetry: 0, Rack: \"\"}, Rack: \"stage\", SASL: {}, TLS: {}" // When result := getConsumerConfigWithoutInnerObjectExample().String() // Then @@ -326,8 +326,7 @@ func TestConsumerConfig_JSONPretty(t *testing.T) { "TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"RetryEnabled\": true,\n\t\"" + "RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"" + "Topic\": \"test-exception.0\",\n\t\t\"StartTimeCron\": \"*/2 * * * *\",\n\t\t\"WorkDuration\": \"1m0s\",\n\t\t\"" + - "MaxRetry\": 3,\n\t\t\"VerifyTopicOnStartup\": true,\n\t\t\"Rack\": \"\"\n\t},\n\t\"" + - "VerifyTopicOnStartup\": true,\n\t\"Rack\": \"stage\",\n\t\"" + + "MaxRetry\": 3,\n\t\t\"Rack\": \"\"\n\t},\n\t\"Rack\": \"stage\",\n\t\"" + "SASL\": {\n\t\t\"Mechanism\": \"scram\",\n\t\t\"Username\": \"user\",\n\t\t\"Password\": \"pass\"\n\t},\n\t\"" + "TLS\": {\n\t\t\"RootCAPath\": \"resources/ca\",\n\t\t\"IntermediateCAPath\": \"resources/intCa\"\n\t}\n}" // When @@ -346,8 +345,7 @@ func TestConsumerConfig_JSONPretty(t *testing.T) { "MessageGroupDuration\": \"20ns\",\n\t\"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"" + "RetryEnabled\": true,\n\t\"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"" + "Topic\": \"\",\n\t\t\"StartTimeCron\": \"\",\n\t\t\"WorkDuration\": \"0s\",\n\t\t\"MaxRetry\": 0,\n\t\t\"" + - "VerifyTopicOnStartup\": false,\n\t\t\"Rack\": \"\"\n\t},\n\t\"VerifyTopicOnStartup\": true,\n\t\"" + - "Rack\": \"stage\",\n\t\"SASL\": {},\n\t\"TLS\": {}\n}" + "Rack\": \"\"\n\t},\n\t\"Rack\": \"stage\",\n\t\"SASL\": {},\n\t\"TLS\": {}\n}" // When result := getConsumerConfigWithoutInnerObjectExample().JSONPretty() // Then @@ -418,14 +416,12 @@ func getConsumerConfigExample() *ConsumerConfig { Concurrency: 10, RetryEnabled: true, RetryConfiguration: RetryConfiguration{ - Brokers: []string{"broker-1.test.com", "broker-2.test.com"}, - Topic: "test-exception.0", - StartTimeCron: "*/2 * * * *", - WorkDuration: time.Minute * 1, - MaxRetry: 3, - VerifyTopicOnStartup: true, + Brokers: []string{"broker-1.test.com", "broker-2.test.com"}, + Topic: "test-exception.0", + StartTimeCron: "*/2 * * * *", + WorkDuration: time.Minute * 1, + MaxRetry: 3, }, - VerifyTopicOnStartup: true, TLS: &TLSConfig{ RootCAPath: "resources/ca", IntermediateCAPath: "resources/intCa", @@ -448,6 +444,5 @@ func getConsumerConfigWithoutInnerObjectExample() *ConsumerConfig { Concurrency: 10, RetryEnabled: true, RetryConfiguration: RetryConfiguration{}, - VerifyTopicOnStartup: true, } } diff --git a/verify_topic.go b/verify_topic.go index 089f672..f048535 100644 --- a/verify_topic.go +++ b/verify_topic.go @@ -16,20 +16,20 @@ type client struct { *kafka.Client } -func newKafkaClient(cfg *ConsumerConfig) (kafkaClient, error) { +func newKafkaClient(brokers []string, topics []string, sasl *SASLConfig, tls *TLSConfig) (kafkaClient, error) { var err error kc := &client{ Client: &kafka.Client{ - Addr: kafka.TCP(cfg.Reader.Brokers...), + Addr: kafka.TCP(brokers...), }, } transport := &Transport{ Transport: &kafka.Transport{ - MetadataTopics: cfg.getTopics(), + MetadataTopics: topics, }, } - if err = fillLayer(transport, cfg.SASL, cfg.TLS); err != nil { + if err = fillLayer(transport, sasl, tls); err != nil { err = fmt.Errorf("error when initializing kafka client for verify topic purpose %w", err) return nil, err } @@ -47,9 +47,7 @@ func (k *client) GetClient() *kafka.Client { return k.Client } -func verifyTopics(client kafkaClient, cfg *ConsumerConfig) (bool, error) { - topics := cfg.getTopics() - +func verifyTopics(client kafkaClient, topics []string) (bool, error) { metadata, err := client.Metadata(context.Background(), &kafka.MetadataRequest{ Topics: topics, }) diff --git a/verify_topic_test.go b/verify_topic_test.go index 0ee1e94..9d0d1da 100644 --- a/verify_topic_test.go +++ b/verify_topic_test.go @@ -46,7 +46,7 @@ func Test_kafkaClientWrapper_VerifyTopics(t *testing.T) { cfg := &ConsumerConfig{} // When - _, err := verifyTopics(mockClient, cfg) + _, err := verifyTopics(mockClient, cfg.getTopics()) // Then if err == nil { @@ -63,7 +63,7 @@ func Test_kafkaClientWrapper_VerifyTopics(t *testing.T) { } // When - exist, err := verifyTopics(mockClient, cfg) + exist, err := verifyTopics(mockClient, cfg.getTopics()) // Then if exist { @@ -83,7 +83,7 @@ func Test_kafkaClientWrapper_VerifyTopics(t *testing.T) { } // When - exist, err := verifyTopics(mockClient, cfg) + exist, err := verifyTopics(mockClient, cfg.getTopics()) // Then if !exist { @@ -105,7 +105,7 @@ func Test_newKafkaClient(t *testing.T) { } // When - client, err := newKafkaClient(cfg) + client, err := newKafkaClient(cfg.Reader.Brokers, cfg.getTopics(), cfg.SASL, cfg.TLS) // Then if client.GetClient().Addr.String() != "127.0.0.1:9092" {