Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test suite can run multiple times #312

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions kafka_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package kafka

import (
"context"
"fmt"
"testing"
"time"

"github.com/grafana/sobek"
kafkago "github.com/segmentio/kafka-go"
Expand All @@ -20,6 +22,7 @@ const (

// struct to keep all the things test need in one place.
type kafkaTest struct {
topicName string
rt *sobek.Runtime
module *Module
vu *modulestest.VU
Expand Down Expand Up @@ -52,8 +55,10 @@ func getTestModuleInstance(tb testing.TB) *kafkaTest {
require.True(tb, ok)

require.NoError(tb, runtime.Set("kafka", moduleInstance.Exports().Default))
topicName := fmt.Sprintf("%s-%d", tb.Name(), time.Now().UnixMilli())

return &kafkaTest{
topicName: topicName,
rt: runtime,
module: moduleInstance,
vu: mockVU,
Expand Down Expand Up @@ -99,37 +104,37 @@ func (k *kafkaTest) getCounterMetricsValues() map[string]float64 {
}

// newWriter creates a Kafka writer for the reader tests.
func (k *kafkaTest) newWriter(topicName string) *kafkago.Writer {
func (k *kafkaTest) newWriter() *kafkago.Writer {
// Create a writer to produce messages.
return k.module.Kafka.writer(&WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: topicName,
Topic: k.topicName,
})
}

// newReader creates a Kafka reader for the reader tests.
func (k *kafkaTest) newReader(topicName string) *kafkago.Reader {
func (k *kafkaTest) newReader() *kafkago.Reader {
// Create a reader to consume messages.
return k.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topicName,
Topic: k.topicName,
})
}

// createTopic creates a topic.
func (k *kafkaTest) createTopic(topicName string) {
func (k *kafkaTest) createTopic() {
// Create a connection to Kafka.
connection := k.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{
Address: "localhost:9092",
})
defer connection.Close()

// Create a topic.
k.module.Kafka.createTopic(connection, &kafkago.TopicConfig{Topic: topicName})
k.module.Kafka.createTopic(connection, &kafkago.TopicConfig{Topic: k.topicName})
}

// topicExists checks if a topic exists.
func (k *kafkaTest) topicExists(topicName string) bool {
func (k *kafkaTest) topicExists() bool {
// Create a connection to Kafka.
connection := k.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{
Address: "localhost:9092",
Expand All @@ -139,7 +144,7 @@ func (k *kafkaTest) topicExists(topicName string) bool {
// Create a topic.
topics := k.module.Kafka.listTopics(connection)
for _, topic := range topics {
if topic == topicName {
if topic == k.topicName {
return true
}
}
Expand Down
81 changes: 40 additions & 41 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
// The reader should not hang
func TestConsumerMaxWaitExceeded(t *testing.T) {
test := getTestModuleInstance(t)
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Topic: test.topicName,
MaxWait: Duration{time.Second * 3},
})
assert.NotNil(t, reader)
Expand Down Expand Up @@ -52,17 +53,17 @@ func TestConsumerMaxWaitExceeded(t *testing.T) {
// nolint: funlen
func TestConsume(t *testing.T) {
test := getTestModuleInstance(t)
test.createTopic("test-topic")
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

assert.True(t, test.topicExists("test-topic"))
assert.True(t, test.topicExists())

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Topic: test.topicName,
})
assert.NotNil(t, reader)
defer reader.Close()
Expand Down Expand Up @@ -139,19 +140,19 @@ func TestConsume(t *testing.T) {
// TestConsumeWithoutKey tests the consume function without a key.
func TestConsumeWithoutKey(t *testing.T) {
test := getTestModuleInstance(t)
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
})
assert.NotNil(t, reader)
defer reader.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Offset: 1,
})
assert.NotNil(t, reader)
defer reader.Close()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())

Expand All @@ -164,7 +165,6 @@ func TestConsumeWithoutKey(t *testing.T) {
Data: "value1",
SchemaType: String,
}),
Offset: 1,
},
},
})
Expand Down Expand Up @@ -198,18 +198,19 @@ func TestConsumeWithoutKey(t *testing.T) {
// TestConsumerContextCancelled tests the consume function and fails on a cancelled context.
func TestConsumerContextCancelled(t *testing.T) {
test := getTestModuleInstance(t)
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
})
assert.NotNil(t, reader)
defer reader.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
})
assert.NotNil(t, reader)
defer reader.Close()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())

Expand All @@ -232,8 +233,7 @@ func TestConsumerContextCancelled(t *testing.T) {

// Consume a message in the VU function.
assert.Panics(t, func() {
messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
assert.Empty(t, messages)
test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
})
})

Expand All @@ -249,19 +249,19 @@ func TestConsumerContextCancelled(t *testing.T) {
// TestConsumeJSON tests the consume function with a JSON value.
func TestConsumeJSON(t *testing.T) {
test := getTestModuleInstance(t)
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
})
assert.NotNil(t, reader)
defer reader.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Offset: 3,
})
assert.NotNil(t, reader)
defer reader.Close()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())

Expand All @@ -273,8 +273,7 @@ func TestConsumeJSON(t *testing.T) {
test.module.Kafka.produce(writer, &ProduceConfig{
Messages: []Message{
{
Value: serialized,
Offset: 3,
Value: serialized,
},
},
})
Expand Down Expand Up @@ -309,8 +308,8 @@ func TestReaderClass(t *testing.T) {
test := getTestModuleInstance(t)

require.NoError(t, test.moveToVUCode())
test.createTopic("test-reader-class")
writer := test.newWriter("test-reader-class")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

test.module.Kafka.produce(writer, &ProduceConfig{
Expand All @@ -334,7 +333,7 @@ func TestReaderClass(t *testing.T) {
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"brokers": []string{"localhost:9092"},
"topic": "test-reader-class",
"topic": test.topicName,
"maxWait": "3s",
},
),
Expand All @@ -344,7 +343,7 @@ func TestReaderClass(t *testing.T) {
this := reader.Get("This").Export().(*kafkago.Reader)
assert.NotNil(t, this)
assert.Equal(t, this.Config().Brokers, []string{"localhost:9092"})
assert.Equal(t, this.Config().Topic, "test-reader-class")
assert.Equal(t, this.Config().Topic, test.topicName)
assert.Equal(t, this.Config().MaxWait, time.Second*3)

consume := reader.Get("consume").Export().(func(sobek.FunctionCall) sobek.Value)
Expand Down
6 changes: 3 additions & 3 deletions schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestGetLatestSchemaFails(t *testing.T) {
srClient := test.module.Kafka.schemaRegistryClient(&srConfig)
assert.Panics(t, func() {
schema := test.module.Kafka.getSchema(srClient, &Schema{
Subject: "test-subject",
Subject: "no-such-subject",
Version: 0,
})
assert.Equal(t, schema, nil)
Expand All @@ -125,7 +125,7 @@ func TestGetSchemaFails(t *testing.T) {
srClient := test.module.Kafka.schemaRegistryClient(&srConfig)
assert.Panics(t, func() {
schema := test.module.Kafka.getSchema(srClient, &Schema{
Subject: "test-subject",
Subject: "no-such-subject",
Version: 0,
})
assert.Equal(t, schema, nil)
Expand All @@ -147,7 +147,7 @@ func TestCreateSchemaFails(t *testing.T) {
srClient := test.module.Kafka.schemaRegistryClient(&srConfig)
assert.Panics(t, func() {
schema := test.module.Kafka.getSchema(srClient, &Schema{
Subject: "test-subject",
Subject: "no-such-subject",
Version: 0,
})
assert.Equal(t, schema, nil)
Expand Down
6 changes: 3 additions & 3 deletions serdes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (
func TestSerdes(t *testing.T) {
test := getTestModuleInstance(t)

test.createTopic("test-serdes-topic")
writer := test.newWriter("test-serdes-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()
reader := test.newReader("test-serdes-topic")
reader := test.newReader()
defer reader.Close()

// Switch to VU code.
Expand Down
19 changes: 11 additions & 8 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
// nolint: funlen
func TestProduce(t *testing.T) {
test := getTestModuleInstance(t)
assert.True(t, test.topicExists("test-topic"))
test.createTopic()

assert.NotPanics(t, func() {
writer := test.module.Kafka.writer(&WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Topic: test.topicName,
})
assert.NotNil(t, writer)
defer writer.Close()
Expand Down Expand Up @@ -106,6 +106,7 @@ func TestProduce(t *testing.T) {
// TestProduceWithoutKey tests the produce function without a key.
func TestProduceWithoutKey(t *testing.T) {
test := getTestModuleInstance(t)
test.createTopic()

assert.NotPanics(t, func() {
writer := test.module.Kafka.writer(&WriterConfig{
Expand All @@ -125,7 +126,7 @@ func TestProduceWithoutKey(t *testing.T) {
Data: "value1",
SchemaType: String,
}),
Topic: "test-topic",
Topic: test.topicName,
Offset: 0,
Time: time.Now(),
},
Expand All @@ -134,7 +135,7 @@ func TestProduceWithoutKey(t *testing.T) {
Data: "value2",
SchemaType: String,
}),
Topic: "test-topic",
Topic: test.topicName,
},
},
})
Expand All @@ -153,11 +154,12 @@ func TestProduceWithoutKey(t *testing.T) {
// TestProducerContextCancelled tests the produce function with a cancelled context.
func TestProducerContextCancelled(t *testing.T) {
test := getTestModuleInstance(t)
test.createTopic()

assert.NotPanics(t, func() {
writer := test.module.Kafka.writer(&WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Topic: test.topicName,
})
assert.NotNil(t, writer)
defer writer.Close()
Expand Down Expand Up @@ -208,11 +210,12 @@ func TestProducerContextCancelled(t *testing.T) {
// TestProduceJSON tests the produce function with a JSON value.
func TestProduceJSON(t *testing.T) {
test := getTestModuleInstance(t)
test.createTopic()

assert.NotPanics(t, func() {
writer := test.module.Kafka.writer(&WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Topic: test.topicName,
})
assert.NotNil(t, writer)
defer writer.Close()
Expand Down Expand Up @@ -247,15 +250,15 @@ func TestWriterClass(t *testing.T) {
test := getTestModuleInstance(t)

require.NoError(t, test.moveToVUCode())
test.createTopic("test-writer-class")
test.createTopic()

assert.NotPanics(t, func() {
writer := test.module.writerClass(sobek.ConstructorCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"brokers": []string{"localhost:9092"},
"topic": "test-writer-class",
"topic": test.topicName,
},
),
},
Expand Down
Loading