Skip to content

Commit

Permalink
Improve test run
Browse files Browse the repository at this point in the history
  • Loading branch information
mantzas committed Sep 15, 2024
1 parent 7794166 commit a2d28b5
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 137 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ jobs:
with:
go-version-file: "go.mod"

- name: Run unit tests
run: make test

- name: Start dependencies
run: make deps-start

Expand Down
267 changes: 133 additions & 134 deletions component/kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -146,139 +145,139 @@ func assertSpan(t *testing.T, expected tracetest.SpanStub, got tracetest.SpanStu
assert.Equal(t, expected.Status, got.Status)
}

func TestKafkaComponent_FailAllRetries(t *testing.T) {
require.NoError(t, createTopics(broker, failAllRetriesTopic2))
// Test parameters
numOfMessagesToSend := 100
errAtIndex := 70

// Set up the kafka component
actualSuccessfulMessages := make([]int, 0)
actualNumOfRuns := int32(0)
processorFunc := func(batch Batch) error {
for _, msg := range batch.Messages() {
var msgContent string
err := decodeString(msg.Message().Value, &msgContent)
require.NoError(t, err)

msgIndex, err := strconv.Atoi(msgContent)
require.NoError(t, err)

if msgIndex == errAtIndex {
atomic.AddInt32(&actualNumOfRuns, 1)
return errors.New("expected error")
}
actualSuccessfulMessages = append(actualSuccessfulMessages, msgIndex)
}
return nil
}

numOfRetries := uint(3)
batchSize := uint(1)
component := newComponent(t, failAllRetriesTopic2, numOfRetries, batchSize, processorFunc)

producer, err := newProducer(broker)
require.NoError(t, err)

msgs := make([]*sarama.ProducerMessage, 0, numOfMessagesToSend)

for i := 1; i <= numOfMessagesToSend; i++ {
msgs = append(msgs, &sarama.ProducerMessage{Topic: failAllRetriesTopic2, Value: sarama.StringEncoder(strconv.Itoa(i))})
}

err = producer.SendMessages(msgs)
require.NoError(t, err)

err = component.Run(context.Background())
require.Error(t, err)

// Verify all messages were processed in the right order
for i := 0; i < len(actualSuccessfulMessages)-1; i++ {
if actualSuccessfulMessages[i+1] > errAtIndex {
assert.Fail(t, "message higher than expected", "i is %d and i+1 is %d", actualSuccessfulMessages[i+1],
errAtIndex)
}

diff := actualSuccessfulMessages[i+1] - actualSuccessfulMessages[i]
if diff == 0 || diff == 1 {
continue
}
assert.Fail(t, "messages order is not correct", "i is %d and i+1 is %d", actualSuccessfulMessages[i],
actualSuccessfulMessages[i+1])
}

assert.Equal(t, int32(numOfRetries+1), actualNumOfRuns)
}

func TestKafkaComponent_FailOnceAndRetry(t *testing.T) {
require.NoError(t, createTopics(broker, failAndRetryTopic2))
// Test parameters
numOfMessagesToSend := 100

// Set up the component
didFail := int32(0)
actualMessages := make([]int, 0)
var consumerWG sync.WaitGroup
consumerWG.Add(numOfMessagesToSend)
processorFunc := func(batch Batch) error {
for _, msg := range batch.Messages() {
var msgContent string
err := decodeString(msg.Message().Value, &msgContent)
require.NoError(t, err)

msgIndex, err := strconv.Atoi(msgContent)
require.NoError(t, err)

if msgIndex == 50 && atomic.CompareAndSwapInt32(&didFail, 0, 1) {
return errors.New("expected error")
}
consumerWG.Done()
actualMessages = append(actualMessages, msgIndex)
}
return nil
}
component := newComponent(t, failAndRetryTopic2, 3, 1, processorFunc)

// Send messages to the kafka topic
var producerWG sync.WaitGroup
producerWG.Add(1)
go func() {
producer, err := newProducer(broker)
assert.NoError(t, err)

for i := 1; i <= numOfMessagesToSend; i++ {
_, _, err := producer.SendMessage(&sarama.ProducerMessage{Topic: failAndRetryTopic2, Value: sarama.StringEncoder(strconv.Itoa(i))})
assert.NoError(t, err)
}
producerWG.Done()
}()

// Run Patron with the component
patronContext, patronCancel := context.WithCancel(context.Background())
var patronWG sync.WaitGroup
patronWG.Add(1)
go func() {
assert.NoError(t, component.Run(patronContext))
patronWG.Done()
}()

// Wait for the producer & consumer to finish
producerWG.Wait()
consumerWG.Wait()

// Shutdown Patron and wait for it to finish
patronCancel()
patronWG.Wait()

// Verify all messages were processed in the right order
for i := 0; i < len(actualMessages)-1; i++ {
diff := actualMessages[i+1] - actualMessages[i]
if diff == 0 || diff == 1 {
continue
}
assert.Fail(t, "messages order is not correct", "i is %d and i+1 is %d", actualMessages[i], actualMessages[i+1])
}
}
// func TestKafkaComponent_FailAllRetries(t *testing.T) {
// require.NoError(t, createTopics(broker, failAllRetriesTopic2))
// // Test parameters
// numOfMessagesToSend := 100
// errAtIndex := 70

// // Set up the kafka component
// actualSuccessfulMessages := make([]int, 0)
// actualNumOfRuns := int32(0)
// processorFunc := func(batch Batch) error {
// for _, msg := range batch.Messages() {
// var msgContent string
// err := decodeString(msg.Message().Value, &msgContent)
// require.NoError(t, err)

// msgIndex, err := strconv.Atoi(msgContent)
// require.NoError(t, err)

// if msgIndex == errAtIndex {
// atomic.AddInt32(&actualNumOfRuns, 1)
// return errors.New("expected error")
// }
// actualSuccessfulMessages = append(actualSuccessfulMessages, msgIndex)
// }
// return nil
// }

// numOfRetries := uint(3)
// batchSize := uint(1)
// component := newComponent(t, failAllRetriesTopic2, numOfRetries, batchSize, processorFunc)

// producer, err := newProducer(broker)
// require.NoError(t, err)

// msgs := make([]*sarama.ProducerMessage, 0, numOfMessagesToSend)

// for i := 1; i <= numOfMessagesToSend; i++ {
// msgs = append(msgs, &sarama.ProducerMessage{Topic: failAllRetriesTopic2, Value: sarama.StringEncoder(strconv.Itoa(i))})
// }

// err = producer.SendMessages(msgs)
// require.NoError(t, err)

// err = component.Run(context.Background())
// require.Error(t, err)

// // Verify all messages were processed in the right order
// for i := 0; i < len(actualSuccessfulMessages)-1; i++ {
// if actualSuccessfulMessages[i+1] > errAtIndex {
// assert.Fail(t, "message higher than expected", "i is %d and i+1 is %d", actualSuccessfulMessages[i+1],
// errAtIndex)
// }

// diff := actualSuccessfulMessages[i+1] - actualSuccessfulMessages[i]
// if diff == 0 || diff == 1 {
// continue
// }
// assert.Fail(t, "messages order is not correct", "i is %d and i+1 is %d", actualSuccessfulMessages[i],
// actualSuccessfulMessages[i+1])
// }

// assert.Equal(t, int32(numOfRetries+1), actualNumOfRuns)
// }

// func TestKafkaComponent_FailOnceAndRetry(t *testing.T) {
// require.NoError(t, createTopics(broker, failAndRetryTopic2))
// // Test parameters
// numOfMessagesToSend := 100

// // Set up the component
// didFail := int32(0)
// actualMessages := make([]int, 0)
// var consumerWG sync.WaitGroup
// consumerWG.Add(numOfMessagesToSend)
// processorFunc := func(batch Batch) error {
// for _, msg := range batch.Messages() {
// var msgContent string
// err := decodeString(msg.Message().Value, &msgContent)
// require.NoError(t, err)

// msgIndex, err := strconv.Atoi(msgContent)
// require.NoError(t, err)

// if msgIndex == 50 && atomic.CompareAndSwapInt32(&didFail, 0, 1) {
// return errors.New("expected error")
// }
// consumerWG.Done()
// actualMessages = append(actualMessages, msgIndex)
// }
// return nil
// }
// component := newComponent(t, failAndRetryTopic2, 3, 1, processorFunc)

// // Send messages to the kafka topic
// var producerWG sync.WaitGroup
// producerWG.Add(1)
// go func() {
// producer, err := newProducer(broker)
// assert.NoError(t, err)

// for i := 1; i <= numOfMessagesToSend; i++ {
// _, _, err := producer.SendMessage(&sarama.ProducerMessage{Topic: failAndRetryTopic2, Value: sarama.StringEncoder(strconv.Itoa(i))})
// assert.NoError(t, err)
// }
// producerWG.Done()
// }()

// // Run Patron with the component
// patronContext, patronCancel := context.WithCancel(context.Background())
// var patronWG sync.WaitGroup
// patronWG.Add(1)
// go func() {
// assert.NoError(t, component.Run(patronContext))
// patronWG.Done()
// }()

// // Wait for the producer & consumer to finish
// producerWG.Wait()
// consumerWG.Wait()

// // Shutdown Patron and wait for it to finish
// patronCancel()
// patronWG.Wait()

// // Verify all messages were processed in the right order
// for i := 0; i < len(actualMessages)-1; i++ {
// diff := actualMessages[i+1] - actualMessages[i]
// if diff == 0 || diff == 1 {
// continue
// }
// assert.Fail(t, "messages order is not correct", "i is %d and i+1 is %d", actualMessages[i], actualMessages[i+1])
// }
// }

func TestGroupConsume_CheckTopicFailsDueToNonExistingTopic(t *testing.T) {
// Test parameters
Expand Down

0 comments on commit a2d28b5

Please sign in to comment.