Skip to content

Commit

Permalink
Refactor mapHeader function in component/kafka/component.go
Browse files Browse the repository at this point in the history
  • Loading branch information
mantzas committed Sep 19, 2024
1 parent 5957a75 commit 0296e39
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 82 deletions.
8 changes: 0 additions & 8 deletions component/kafka/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,14 +396,6 @@ func getCorrelationID(hh []*sarama.RecordHeader) string {
return uuid.New().String()
}

func mapHeader(hh []*sarama.RecordHeader) map[string]string {
mp := make(map[string]string)
for _, h := range hh {
mp[string(h.Key)] = string(h.Value)
}
return mp
}

// deduplicateMessages takes a slice of Messages and de-duplicates the messages based on the Key of those messages.
// This function assumes that messages are ordered from old to new, and relies on Kafka ordering guarantees within
// partitions. This is the default behaviour from Kafka unless the Producer altered the partition hashing behaviour in
Expand Down
17 changes: 0 additions & 17 deletions component/kafka/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,23 +175,6 @@ func (mp *mockProcessor) GetExecs() int {
return mp.execs
}

func Test_mapHeader(t *testing.T) {
mp := mapHeader([]*sarama.RecordHeader{
{
Key: []byte("X-HEADER-1"),
Value: []byte("1"),
},
{
Key: []byte("X-HEADER-2"),
Value: []byte("2"),
},
})
assert.Equal(t, "1", mp["X-HEADER-1"])
assert.Equal(t, "2", mp["X-HEADER-2"])
_, ok := mp["X-HEADER-3"]
assert.False(t, ok)
}

type mockConsumerClaim struct {
ch chan *sarama.ConsumerMessage
proc *mockProcessor
Expand Down
115 changes: 58 additions & 57 deletions component/kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -145,69 +146,69 @@ func assertSpan(t *testing.T, expected tracetest.SpanStub, got tracetest.SpanStu
assert.Equal(t, expected.Status, got.Status)
}

// func TestKafkaComponent_FailAllRetries(t *testing.T) {
// require.NoError(t, createTopics(broker, failAllRetriesTopic2))
// // Test parameters
// numOfMessagesToSend := 100
// errAtIndex := 70
func TestKafkaComponent_FailAllRetries(t *testing.T) {
require.NoError(t, createTopics(broker, failAllRetriesTopic2))
// Test parameters
numOfMessagesToSend := 10
errAtIndex := 7

// // Set up the kafka component
// actualSuccessfulMessages := make([]int, 0)
// actualNumOfRuns := int32(0)
// processorFunc := func(batch Batch) error {
// for _, msg := range batch.Messages() {
// var msgContent string
// err := decodeString(msg.Message().Value, &msgContent)
// require.NoError(t, err)
// Set up the kafka component
actualSuccessfulMessages := make([]int, 0)
actualNumOfRuns := int32(0)
processorFunc := func(batch Batch) error {
for _, msg := range batch.Messages() {
var msgContent string
err := decodeString(msg.Message().Value, &msgContent)
require.NoError(t, err)

// msgIndex, err := strconv.Atoi(msgContent)
// require.NoError(t, err)
msgIndex, err := strconv.Atoi(msgContent)
require.NoError(t, err)

// if msgIndex == errAtIndex {
// atomic.AddInt32(&actualNumOfRuns, 1)
// return errors.New("expected error")
// }
// actualSuccessfulMessages = append(actualSuccessfulMessages, msgIndex)
// }
// return nil
// }
if msgIndex == errAtIndex {
atomic.AddInt32(&actualNumOfRuns, 1)
return errors.New("expected error")
}
actualSuccessfulMessages = append(actualSuccessfulMessages, msgIndex)
}
return nil
}

// numOfRetries := uint(3)
// batchSize := uint(1)
// component := newComponent(t, failAllRetriesTopic2, numOfRetries, batchSize, processorFunc)
numOfRetries := uint(1)
batchSize := uint(1)
component := newComponent(t, failAllRetriesTopic2, numOfRetries, batchSize, processorFunc)

// producer, err := newProducer(broker)
// require.NoError(t, err)
producer, err := newProducer(broker)
require.NoError(t, err)

// msgs := make([]*sarama.ProducerMessage, 0, numOfMessagesToSend)
msgs := make([]*sarama.ProducerMessage, 0, numOfMessagesToSend)

// for i := 1; i <= numOfMessagesToSend; i++ {
// msgs = append(msgs, &sarama.ProducerMessage{Topic: failAllRetriesTopic2, Value: sarama.StringEncoder(strconv.Itoa(i))})
// }
for i := 1; i <= numOfMessagesToSend; i++ {
msgs = append(msgs, &sarama.ProducerMessage{Topic: failAllRetriesTopic2, Value: sarama.StringEncoder(strconv.Itoa(i))})
}

// err = producer.SendMessages(msgs)
// require.NoError(t, err)
err = producer.SendMessages(msgs)
require.NoError(t, err)

// err = component.Run(context.Background())
// require.Error(t, err)
err = component.Run(context.Background())
require.Error(t, err)

// // Verify all messages were processed in the right order
// for i := 0; i < len(actualSuccessfulMessages)-1; i++ {
// if actualSuccessfulMessages[i+1] > errAtIndex {
// assert.Fail(t, "message higher than expected", "i is %d and i+1 is %d", actualSuccessfulMessages[i+1],
// errAtIndex)
// }
// Verify all messages were processed in the right order
for i := 0; i < len(actualSuccessfulMessages)-1; i++ {
if actualSuccessfulMessages[i+1] > errAtIndex {
assert.Fail(t, "message higher than expected", "i is %d and i+1 is %d", actualSuccessfulMessages[i+1],
errAtIndex)
}

// diff := actualSuccessfulMessages[i+1] - actualSuccessfulMessages[i]
// if diff == 0 || diff == 1 {
// continue
// }
// assert.Fail(t, "messages order is not correct", "i is %d and i+1 is %d", actualSuccessfulMessages[i],
// actualSuccessfulMessages[i+1])
// }
diff := actualSuccessfulMessages[i+1] - actualSuccessfulMessages[i]
if diff == 0 || diff == 1 {
continue
}
assert.Fail(t, "messages order is not correct", "i is %d and i+1 is %d", actualSuccessfulMessages[i],
actualSuccessfulMessages[i+1])
}

// assert.Equal(t, int32(numOfRetries+1), actualNumOfRuns)
// }
assert.Equal(t, int32(numOfRetries+1), actualNumOfRuns)
}

// func TestKafkaComponent_FailOnceAndRetry(t *testing.T) {
// require.NoError(t, createTopics(broker, failAndRetryTopic2))
Expand Down Expand Up @@ -392,10 +393,10 @@ func createTopics(broker string, topics ...string) error {
return brk.Close()
}

// func newProducer(broker string) (sarama.SyncProducer, error) {
// config := sarama.NewConfig()
// config.Producer.Return.Successes = true
// config.Producer.Return.Errors = true
func newProducer(broker string) (sarama.SyncProducer, error) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true

// return sarama.NewSyncProducer([]string{broker}, config)
// }
return sarama.NewSyncProducer([]string{broker}, config)
}

0 comments on commit 0296e39

Please sign in to comment.