Skip to content

Commit

Permalink
Test suite can run multiple times (#312)
Browse files Browse the repository at this point in the history
* Each test that uses a topic now creates its own.

Topic names use test name and unix time, so it is easy to
find and inspect it when the test fails. This also removes
some offsets that worked around other tests having polluted
the common topic.

* Do not depend on values being shared between assert closures.
  • Loading branch information
bittrance authored Dec 3, 2024
1 parent 9ccf3fe commit 27bfd01
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 63 deletions.
21 changes: 13 additions & 8 deletions kafka_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package kafka

import (
"context"
"fmt"
"testing"
"time"

"github.com/grafana/sobek"
kafkago "github.com/segmentio/kafka-go"
Expand All @@ -20,6 +22,7 @@ const (

// struct to keep all the things test need in one place.
type kafkaTest struct {
topicName string
rt *sobek.Runtime
module *Module
vu *modulestest.VU
Expand Down Expand Up @@ -52,8 +55,10 @@ func getTestModuleInstance(tb testing.TB) *kafkaTest {
require.True(tb, ok)

require.NoError(tb, runtime.Set("kafka", moduleInstance.Exports().Default))
topicName := fmt.Sprintf("%s-%d", tb.Name(), time.Now().UnixMilli())

return &kafkaTest{
topicName: topicName,
rt: runtime,
module: moduleInstance,
vu: mockVU,
Expand Down Expand Up @@ -99,37 +104,37 @@ func (k *kafkaTest) getCounterMetricsValues() map[string]float64 {
}

// newWriter creates a Kafka writer for the reader tests.
func (k *kafkaTest) newWriter(topicName string) *kafkago.Writer {
func (k *kafkaTest) newWriter() *kafkago.Writer {
// Create a writer to produce messages.
return k.module.Kafka.writer(&WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: topicName,
Topic: k.topicName,
})
}

// newReader creates a Kafka reader for the reader tests.
func (k *kafkaTest) newReader(topicName string) *kafkago.Reader {
func (k *kafkaTest) newReader() *kafkago.Reader {
// Create a reader to consume messages.
return k.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topicName,
Topic: k.topicName,
})
}

// createTopic creates a topic.
func (k *kafkaTest) createTopic(topicName string) {
func (k *kafkaTest) createTopic() {
// Create a connection to Kafka.
connection := k.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{
Address: "localhost:9092",
})
defer connection.Close()

// Create a topic.
k.module.Kafka.createTopic(connection, &kafkago.TopicConfig{Topic: topicName})
k.module.Kafka.createTopic(connection, &kafkago.TopicConfig{Topic: k.topicName})
}

// topicExists checks if a topic exists.
func (k *kafkaTest) topicExists(topicName string) bool {
func (k *kafkaTest) topicExists() bool {
// Create a connection to Kafka.
connection := k.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{
Address: "localhost:9092",
Expand All @@ -139,7 +144,7 @@ func (k *kafkaTest) topicExists(topicName string) bool {
// Create a topic.
topics := k.module.Kafka.listTopics(connection)
for _, topic := range topics {
if topic == topicName {
if topic == k.topicName {
return true
}
}
Expand Down
81 changes: 40 additions & 41 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
// The reader should not hang
func TestConsumerMaxWaitExceeded(t *testing.T) {
test := getTestModuleInstance(t)
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Topic: test.topicName,
MaxWait: Duration{time.Second * 3},
})
assert.NotNil(t, reader)
Expand Down Expand Up @@ -52,17 +53,17 @@ func TestConsumerMaxWaitExceeded(t *testing.T) {
// nolint: funlen
func TestConsume(t *testing.T) {
test := getTestModuleInstance(t)
test.createTopic("test-topic")
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

assert.True(t, test.topicExists("test-topic"))
assert.True(t, test.topicExists())

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Topic: test.topicName,
})
assert.NotNil(t, reader)
defer reader.Close()
Expand Down Expand Up @@ -139,19 +140,19 @@ func TestConsume(t *testing.T) {
// TestConsumeWithoutKey tests the consume function without a key.
func TestConsumeWithoutKey(t *testing.T) {
test := getTestModuleInstance(t)
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
})
assert.NotNil(t, reader)
defer reader.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Offset: 1,
})
assert.NotNil(t, reader)
defer reader.Close()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())

Expand All @@ -164,7 +165,6 @@ func TestConsumeWithoutKey(t *testing.T) {
Data: "value1",
SchemaType: String,
}),
Offset: 1,
},
},
})
Expand Down Expand Up @@ -198,18 +198,19 @@ func TestConsumeWithoutKey(t *testing.T) {
// TestConsumerContextCancelled tests the consume function and fails on a cancelled context.
func TestConsumerContextCancelled(t *testing.T) {
test := getTestModuleInstance(t)
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
})
assert.NotNil(t, reader)
defer reader.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
})
assert.NotNil(t, reader)
defer reader.Close()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())

Expand All @@ -232,8 +233,7 @@ func TestConsumerContextCancelled(t *testing.T) {

// Consume a message in the VU function.
assert.Panics(t, func() {
messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
assert.Empty(t, messages)
test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
})
})

Expand All @@ -249,19 +249,19 @@ func TestConsumerContextCancelled(t *testing.T) {
// TestConsumeJSON tests the consume function with a JSON value.
func TestConsumeJSON(t *testing.T) {
test := getTestModuleInstance(t)
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
})
assert.NotNil(t, reader)
defer reader.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Offset: 3,
})
assert.NotNil(t, reader)
defer reader.Close()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())

Expand All @@ -273,8 +273,7 @@ func TestConsumeJSON(t *testing.T) {
test.module.Kafka.produce(writer, &ProduceConfig{
Messages: []Message{
{
Value: serialized,
Offset: 3,
Value: serialized,
},
},
})
Expand Down Expand Up @@ -309,8 +308,8 @@ func TestReaderClass(t *testing.T) {
test := getTestModuleInstance(t)

require.NoError(t, test.moveToVUCode())
test.createTopic("test-reader-class")
writer := test.newWriter("test-reader-class")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

test.module.Kafka.produce(writer, &ProduceConfig{
Expand All @@ -334,7 +333,7 @@ func TestReaderClass(t *testing.T) {
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"brokers": []string{"localhost:9092"},
"topic": "test-reader-class",
"topic": test.topicName,
"maxWait": "3s",
},
),
Expand All @@ -344,7 +343,7 @@ func TestReaderClass(t *testing.T) {
this := reader.Get("This").Export().(*kafkago.Reader)
assert.NotNil(t, this)
assert.Equal(t, this.Config().Brokers, []string{"localhost:9092"})
assert.Equal(t, this.Config().Topic, "test-reader-class")
assert.Equal(t, this.Config().Topic, test.topicName)
assert.Equal(t, this.Config().MaxWait, time.Second*3)

consume := reader.Get("consume").Export().(func(sobek.FunctionCall) sobek.Value)
Expand Down
6 changes: 3 additions & 3 deletions schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestGetLatestSchemaFails(t *testing.T) {
srClient := test.module.Kafka.schemaRegistryClient(&srConfig)
assert.Panics(t, func() {
schema := test.module.Kafka.getSchema(srClient, &Schema{
Subject: "test-subject",
Subject: "no-such-subject",
Version: 0,
})
assert.Equal(t, schema, nil)
Expand All @@ -125,7 +125,7 @@ func TestGetSchemaFails(t *testing.T) {
srClient := test.module.Kafka.schemaRegistryClient(&srConfig)
assert.Panics(t, func() {
schema := test.module.Kafka.getSchema(srClient, &Schema{
Subject: "test-subject",
Subject: "no-such-subject",
Version: 0,
})
assert.Equal(t, schema, nil)
Expand All @@ -147,7 +147,7 @@ func TestCreateSchemaFails(t *testing.T) {
srClient := test.module.Kafka.schemaRegistryClient(&srConfig)
assert.Panics(t, func() {
schema := test.module.Kafka.getSchema(srClient, &Schema{
Subject: "test-subject",
Subject: "no-such-subject",
Version: 0,
})
assert.Equal(t, schema, nil)
Expand Down
6 changes: 3 additions & 3 deletions serdes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (
func TestSerdes(t *testing.T) {
test := getTestModuleInstance(t)

test.createTopic("test-serdes-topic")
writer := test.newWriter("test-serdes-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()
reader := test.newReader("test-serdes-topic")
reader := test.newReader()
defer reader.Close()

// Switch to VU code.
Expand Down
19 changes: 11 additions & 8 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
// nolint: funlen
func TestProduce(t *testing.T) {
test := getTestModuleInstance(t)
assert.True(t, test.topicExists("test-topic"))
test.createTopic()

assert.NotPanics(t, func() {
writer := test.module.Kafka.writer(&WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Topic: test.topicName,
})
assert.NotNil(t, writer)
defer writer.Close()
Expand Down Expand Up @@ -106,6 +106,7 @@ func TestProduce(t *testing.T) {
// TestProduceWithoutKey tests the produce function without a key.
func TestProduceWithoutKey(t *testing.T) {
test := getTestModuleInstance(t)
test.createTopic()

assert.NotPanics(t, func() {
writer := test.module.Kafka.writer(&WriterConfig{
Expand All @@ -125,7 +126,7 @@ func TestProduceWithoutKey(t *testing.T) {
Data: "value1",
SchemaType: String,
}),
Topic: "test-topic",
Topic: test.topicName,
Offset: 0,
Time: time.Now(),
},
Expand All @@ -134,7 +135,7 @@ func TestProduceWithoutKey(t *testing.T) {
Data: "value2",
SchemaType: String,
}),
Topic: "test-topic",
Topic: test.topicName,
},
},
})
Expand All @@ -153,11 +154,12 @@ func TestProduceWithoutKey(t *testing.T) {
// TestProducerContextCancelled tests the produce function with a cancelled context.
func TestProducerContextCancelled(t *testing.T) {
test := getTestModuleInstance(t)
test.createTopic()

assert.NotPanics(t, func() {
writer := test.module.Kafka.writer(&WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Topic: test.topicName,
})
assert.NotNil(t, writer)
defer writer.Close()
Expand Down Expand Up @@ -208,11 +210,12 @@ func TestProducerContextCancelled(t *testing.T) {
// TestProduceJSON tests the produce function with a JSON value.
func TestProduceJSON(t *testing.T) {
test := getTestModuleInstance(t)
test.createTopic()

assert.NotPanics(t, func() {
writer := test.module.Kafka.writer(&WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Topic: test.topicName,
})
assert.NotNil(t, writer)
defer writer.Close()
Expand Down Expand Up @@ -247,15 +250,15 @@ func TestWriterClass(t *testing.T) {
test := getTestModuleInstance(t)

require.NoError(t, test.moveToVUCode())
test.createTopic("test-writer-class")
test.createTopic()

assert.NotPanics(t, func() {
writer := test.module.writerClass(sobek.ConstructorCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"brokers": []string{"localhost:9092"},
"topic": "test-writer-class",
"topic": test.topicName,
},
),
},
Expand Down

0 comments on commit 27bfd01

Please sign in to comment.