From d262f7591cc069b883af305e4240b3df07c8904a Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 25 Oct 2023 16:18:03 +0000 Subject: [PATCH] Try to fix race again - don't reuse clients for sender/receiver Signed-off-by: Doug Davis --- .../kafka_sarama_binding/kafka_test.go | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/test/integration/kafka_sarama_binding/kafka_test.go b/test/integration/kafka_sarama_binding/kafka_test.go index f7ea024ab..dfc2d18d2 100644 --- a/test/integration/kafka_sarama_binding/kafka_test.go +++ b/test/integration/kafka_sarama_binding/kafka_test.go @@ -10,7 +10,6 @@ import ( "os" "strings" "testing" - "time" "github.com/IBM/sarama" "github.com/google/uuid" @@ -91,25 +90,28 @@ func testClient(t testing.TB) sarama.Client { } func testSenderReceiver(t testing.TB) (func(), bindings.Sender, bindings.Receiver, string) { - client := testClient(t) - topicName := "test-ce-client-" + uuid.New().String() - p, err := kafka_sarama.NewProtocolFromClient(client, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID)) + + clientR := testClient(t) + pR, err := kafka_sarama.NewProtocolFromClient(clientR, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID)) require.NoError(t, err) - require.NotNil(t, p) + require.NotNil(t, pR) + + clientS := testClient(t) + pS, err := kafka_sarama.NewProtocolFromClient(clientS, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID)) + require.NoError(t, err) + require.NotNil(t, pS) go func() { - require.NoError(t, p.OpenInbound(context.TODO())) + require.NoError(t, pR.OpenInbound(context.TODO())) }() - // Not perfect but we need to give OpenInbound() as chance to start - // as it's a race condition. I couldn't find something on 'p' to wait for - time.Sleep(6 * time.Second) - return func() { - require.NoError(t, p.Close(context.TODO())) - require.NoError(t, client.Close()) - }, p, p, topicName + require.NoError(t, pR.Close(context.TODO())) + require.NoError(t, pS.Close(context.TODO())) + require.NoError(t, clientR.Close()) + require.NoError(t, clientS.Close()) + }, pS, pR, topicName } func BenchmarkSendReceive(b *testing.B) {