From 27bfd016284a8df1ac3bb6f5ceb4c2bdd884c80a Mon Sep 17 00:00:00 2001 From: Bittrance Date: Tue, 3 Dec 2024 11:54:51 +0100 Subject: [PATCH] Test suite can run multiple times (#312) * Each test that uses a topic now creates its own. Topic names use test name and unix time, so it is easy to find and inspect it when the test fails. This also removes some offsets that worked around other tests having polluted the common topic. * Do not depend on values being shared between assert closures. --- kafka_helpers_test.go | 21 +++++++---- reader_test.go | 81 ++++++++++++++++++++--------------------- schema_registry_test.go | 6 +-- serdes_test.go | 6 +-- writer_test.go | 19 ++++++---- 5 files changed, 70 insertions(+), 63 deletions(-) diff --git a/kafka_helpers_test.go b/kafka_helpers_test.go index 4e36ea0..c9d2e2e 100644 --- a/kafka_helpers_test.go +++ b/kafka_helpers_test.go @@ -2,7 +2,9 @@ package kafka import ( "context" + "fmt" "testing" + "time" "github.com/grafana/sobek" kafkago "github.com/segmentio/kafka-go" @@ -20,6 +22,7 @@ const ( // struct to keep all the things test need in one place. type kafkaTest struct { + topicName string rt *sobek.Runtime module *Module vu *modulestest.VU @@ -52,8 +55,10 @@ func getTestModuleInstance(tb testing.TB) *kafkaTest { require.True(tb, ok) require.NoError(tb, runtime.Set("kafka", moduleInstance.Exports().Default)) + topicName := fmt.Sprintf("%s-%d", tb.Name(), time.Now().UnixMilli()) return &kafkaTest{ + topicName: topicName, rt: runtime, module: moduleInstance, vu: mockVU, @@ -99,25 +104,25 @@ func (k *kafkaTest) getCounterMetricsValues() map[string]float64 { } // newWriter creates a Kafka writer for the reader tests. -func (k *kafkaTest) newWriter(topicName string) *kafkago.Writer { +func (k *kafkaTest) newWriter() *kafkago.Writer { // Create a writer to produce messages. return k.module.Kafka.writer(&WriterConfig{ Brokers: []string{"localhost:9092"}, - Topic: topicName, + Topic: k.topicName, }) } // newReader creates a Kafka reader for the reader tests. -func (k *kafkaTest) newReader(topicName string) *kafkago.Reader { +func (k *kafkaTest) newReader() *kafkago.Reader { // Create a reader to consume messages. return k.module.Kafka.reader(&ReaderConfig{ Brokers: []string{"localhost:9092"}, - Topic: topicName, + Topic: k.topicName, }) } // createTopic creates a topic. -func (k *kafkaTest) createTopic(topicName string) { +func (k *kafkaTest) createTopic() { // Create a connection to Kafka. connection := k.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{ Address: "localhost:9092", @@ -125,11 +130,11 @@ func (k *kafkaTest) createTopic(topicName string) { defer connection.Close() // Create a topic. - k.module.Kafka.createTopic(connection, &kafkago.TopicConfig{Topic: topicName}) + k.module.Kafka.createTopic(connection, &kafkago.TopicConfig{Topic: k.topicName}) } // topicExists checks if a topic exists. -func (k *kafkaTest) topicExists(topicName string) bool { +func (k *kafkaTest) topicExists() bool { // Create a connection to Kafka. connection := k.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{ Address: "localhost:9092", @@ -139,7 +144,7 @@ func (k *kafkaTest) topicExists(topicName string) bool { // Create a topic. topics := k.module.Kafka.listTopics(connection) for _, topic := range topics { - if topic == topicName { + if topic == k.topicName { return true } } diff --git a/reader_test.go b/reader_test.go index 483c082..ed924b9 100644 --- a/reader_test.go +++ b/reader_test.go @@ -16,14 +16,15 @@ import ( // The reader should not hang func TestConsumerMaxWaitExceeded(t *testing.T) { test := getTestModuleInstance(t) - writer := test.newWriter("test-topic") + test.createTopic() + writer := test.newWriter() defer writer.Close() // Create a reader to consume messages. assert.NotPanics(t, func() { reader := test.module.Kafka.reader(&ReaderConfig{ Brokers: []string{"localhost:9092"}, - Topic: "test-topic", + Topic: test.topicName, MaxWait: Duration{time.Second * 3}, }) assert.NotNil(t, reader) @@ -52,17 +53,17 @@ func TestConsumerMaxWaitExceeded(t *testing.T) { // nolint: funlen func TestConsume(t *testing.T) { test := getTestModuleInstance(t) - test.createTopic("test-topic") - writer := test.newWriter("test-topic") + test.createTopic() + writer := test.newWriter() defer writer.Close() - assert.True(t, test.topicExists("test-topic")) + assert.True(t, test.topicExists()) // Create a reader to consume messages. assert.NotPanics(t, func() { reader := test.module.Kafka.reader(&ReaderConfig{ Brokers: []string{"localhost:9092"}, - Topic: "test-topic", + Topic: test.topicName, }) assert.NotNil(t, reader) defer reader.Close() @@ -139,19 +140,19 @@ func TestConsume(t *testing.T) { // TestConsumeWithoutKey tests the consume function without a key. func TestConsumeWithoutKey(t *testing.T) { test := getTestModuleInstance(t) - writer := test.newWriter("test-topic") + test.createTopic() + writer := test.newWriter() defer writer.Close() + reader := test.module.Kafka.reader(&ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: test.topicName, + }) + assert.NotNil(t, reader) + defer reader.Close() + // Create a reader to consume messages. assert.NotPanics(t, func() { - reader := test.module.Kafka.reader(&ReaderConfig{ - Brokers: []string{"localhost:9092"}, - Topic: "test-topic", - Offset: 1, - }) - assert.NotNil(t, reader) - defer reader.Close() - // Switch to VU code. require.NoError(t, test.moveToVUCode()) @@ -164,7 +165,6 @@ func TestConsumeWithoutKey(t *testing.T) { Data: "value1", SchemaType: String, }), - Offset: 1, }, }, }) @@ -198,18 +198,19 @@ func TestConsumeWithoutKey(t *testing.T) { // TestConsumerContextCancelled tests the consume function and fails on a cancelled context. func TestConsumerContextCancelled(t *testing.T) { test := getTestModuleInstance(t) - writer := test.newWriter("test-topic") + test.createTopic() + writer := test.newWriter() defer writer.Close() + reader := test.module.Kafka.reader(&ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: test.topicName, + }) + assert.NotNil(t, reader) + defer reader.Close() + // Create a reader to consume messages. assert.NotPanics(t, func() { - reader := test.module.Kafka.reader(&ReaderConfig{ - Brokers: []string{"localhost:9092"}, - Topic: "test-topic", - }) - assert.NotNil(t, reader) - defer reader.Close() - // Switch to VU code. require.NoError(t, test.moveToVUCode()) @@ -232,8 +233,7 @@ func TestConsumerContextCancelled(t *testing.T) { // Consume a message in the VU function. assert.Panics(t, func() { - messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1}) - assert.Empty(t, messages) + test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1}) }) }) @@ -249,19 +249,19 @@ func TestConsumerContextCancelled(t *testing.T) { // TestConsumeJSON tests the consume function with a JSON value. func TestConsumeJSON(t *testing.T) { test := getTestModuleInstance(t) - writer := test.newWriter("test-topic") + test.createTopic() + writer := test.newWriter() defer writer.Close() + reader := test.module.Kafka.reader(&ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: test.topicName, + }) + assert.NotNil(t, reader) + defer reader.Close() + // Create a reader to consume messages. assert.NotPanics(t, func() { - reader := test.module.Kafka.reader(&ReaderConfig{ - Brokers: []string{"localhost:9092"}, - Topic: "test-topic", - Offset: 3, - }) - assert.NotNil(t, reader) - defer reader.Close() - // Switch to VU code. require.NoError(t, test.moveToVUCode()) @@ -273,8 +273,7 @@ func TestConsumeJSON(t *testing.T) { test.module.Kafka.produce(writer, &ProduceConfig{ Messages: []Message{ { - Value: serialized, - Offset: 3, + Value: serialized, }, }, }) @@ -309,8 +308,8 @@ func TestReaderClass(t *testing.T) { test := getTestModuleInstance(t) require.NoError(t, test.moveToVUCode()) - test.createTopic("test-reader-class") - writer := test.newWriter("test-reader-class") + test.createTopic() + writer := test.newWriter() defer writer.Close() test.module.Kafka.produce(writer, &ProduceConfig{ @@ -334,7 +333,7 @@ func TestReaderClass(t *testing.T) { test.module.vu.Runtime().ToValue( map[string]interface{}{ "brokers": []string{"localhost:9092"}, - "topic": "test-reader-class", + "topic": test.topicName, "maxWait": "3s", }, ), @@ -344,7 +343,7 @@ func TestReaderClass(t *testing.T) { this := reader.Get("This").Export().(*kafkago.Reader) assert.NotNil(t, this) assert.Equal(t, this.Config().Brokers, []string{"localhost:9092"}) - assert.Equal(t, this.Config().Topic, "test-reader-class") + assert.Equal(t, this.Config().Topic, test.topicName) assert.Equal(t, this.Config().MaxWait, time.Second*3) consume := reader.Get("consume").Export().(func(sobek.FunctionCall) sobek.Value) diff --git a/schema_registry_test.go b/schema_registry_test.go index c2ddea3..ae0abbf 100644 --- a/schema_registry_test.go +++ b/schema_registry_test.go @@ -103,7 +103,7 @@ func TestGetLatestSchemaFails(t *testing.T) { srClient := test.module.Kafka.schemaRegistryClient(&srConfig) assert.Panics(t, func() { schema := test.module.Kafka.getSchema(srClient, &Schema{ - Subject: "test-subject", + Subject: "no-such-subject", Version: 0, }) assert.Equal(t, schema, nil) @@ -125,7 +125,7 @@ func TestGetSchemaFails(t *testing.T) { srClient := test.module.Kafka.schemaRegistryClient(&srConfig) assert.Panics(t, func() { schema := test.module.Kafka.getSchema(srClient, &Schema{ - Subject: "test-subject", + Subject: "no-such-subject", Version: 0, }) assert.Equal(t, schema, nil) @@ -147,7 +147,7 @@ func TestCreateSchemaFails(t *testing.T) { srClient := test.module.Kafka.schemaRegistryClient(&srConfig) assert.Panics(t, func() { schema := test.module.Kafka.getSchema(srClient, &Schema{ - Subject: "test-subject", + Subject: "no-such-subject", Version: 0, }) assert.Equal(t, schema, nil) diff --git a/serdes_test.go b/serdes_test.go index 050f863..9e240e6 100644 --- a/serdes_test.go +++ b/serdes_test.go @@ -15,10 +15,10 @@ import ( func TestSerdes(t *testing.T) { test := getTestModuleInstance(t) - test.createTopic("test-serdes-topic") - writer := test.newWriter("test-serdes-topic") + test.createTopic() + writer := test.newWriter() defer writer.Close() - reader := test.newReader("test-serdes-topic") + reader := test.newReader() defer reader.Close() // Switch to VU code. diff --git a/writer_test.go b/writer_test.go index 84eb5b5..e886048 100644 --- a/writer_test.go +++ b/writer_test.go @@ -14,12 +14,12 @@ import ( // nolint: funlen func TestProduce(t *testing.T) { test := getTestModuleInstance(t) - assert.True(t, test.topicExists("test-topic")) + test.createTopic() assert.NotPanics(t, func() { writer := test.module.Kafka.writer(&WriterConfig{ Brokers: []string{"localhost:9092"}, - Topic: "test-topic", + Topic: test.topicName, }) assert.NotNil(t, writer) defer writer.Close() @@ -106,6 +106,7 @@ func TestProduce(t *testing.T) { // TestProduceWithoutKey tests the produce function without a key. func TestProduceWithoutKey(t *testing.T) { test := getTestModuleInstance(t) + test.createTopic() assert.NotPanics(t, func() { writer := test.module.Kafka.writer(&WriterConfig{ @@ -125,7 +126,7 @@ func TestProduceWithoutKey(t *testing.T) { Data: "value1", SchemaType: String, }), - Topic: "test-topic", + Topic: test.topicName, Offset: 0, Time: time.Now(), }, @@ -134,7 +135,7 @@ func TestProduceWithoutKey(t *testing.T) { Data: "value2", SchemaType: String, }), - Topic: "test-topic", + Topic: test.topicName, }, }, }) @@ -153,11 +154,12 @@ func TestProduceWithoutKey(t *testing.T) { // TestProducerContextCancelled tests the produce function with a cancelled context. func TestProducerContextCancelled(t *testing.T) { test := getTestModuleInstance(t) + test.createTopic() assert.NotPanics(t, func() { writer := test.module.Kafka.writer(&WriterConfig{ Brokers: []string{"localhost:9092"}, - Topic: "test-topic", + Topic: test.topicName, }) assert.NotNil(t, writer) defer writer.Close() @@ -208,11 +210,12 @@ func TestProducerContextCancelled(t *testing.T) { // TestProduceJSON tests the produce function with a JSON value. func TestProduceJSON(t *testing.T) { test := getTestModuleInstance(t) + test.createTopic() assert.NotPanics(t, func() { writer := test.module.Kafka.writer(&WriterConfig{ Brokers: []string{"localhost:9092"}, - Topic: "test-topic", + Topic: test.topicName, }) assert.NotNil(t, writer) defer writer.Close() @@ -247,7 +250,7 @@ func TestWriterClass(t *testing.T) { test := getTestModuleInstance(t) require.NoError(t, test.moveToVUCode()) - test.createTopic("test-writer-class") + test.createTopic() assert.NotPanics(t, func() { writer := test.module.writerClass(sobek.ConstructorCall{ @@ -255,7 +258,7 @@ func TestWriterClass(t *testing.T) { test.module.vu.Runtime().ToValue( map[string]interface{}{ "brokers": []string{"localhost:9092"}, - "topic": "test-writer-class", + "topic": test.topicName, }, ), },