Skip to content

Commit

Permalink
Try to fix race again - don't reuse clients for sender/receiver
Browse files Browse the repository at this point in the history
Signed-off-by: Doug Davis <[email protected]>
  • Loading branch information
Doug Davis committed Oct 26, 2023
1 parent d7f845b commit d262f75
Showing 1 changed file with 15 additions and 13 deletions.
28 changes: 15 additions & 13 deletions test/integration/kafka_sarama_binding/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/IBM/sarama"
"github.com/google/uuid"
Expand Down Expand Up @@ -91,25 +90,28 @@ func testClient(t testing.TB) sarama.Client {
}

func testSenderReceiver(t testing.TB) (func(), bindings.Sender, bindings.Receiver, string) {
client := testClient(t)

topicName := "test-ce-client-" + uuid.New().String()
p, err := kafka_sarama.NewProtocolFromClient(client, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID))

clientR := testClient(t)
pR, err := kafka_sarama.NewProtocolFromClient(clientR, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID))
require.NoError(t, err)
require.NotNil(t, p)
require.NotNil(t, pR)

clientS := testClient(t)
pS, err := kafka_sarama.NewProtocolFromClient(clientS, topicName, topicName, kafka_sarama.WithReceiverGroupId(TEST_GROUP_ID))
require.NoError(t, err)
require.NotNil(t, pS)

go func() {
require.NoError(t, p.OpenInbound(context.TODO()))
require.NoError(t, pR.OpenInbound(context.TODO()))
}()

// Not perfect but we need to give OpenInbound() as chance to start
// as it's a race condition. I couldn't find something on 'p' to wait for
time.Sleep(6 * time.Second)

return func() {
require.NoError(t, p.Close(context.TODO()))
require.NoError(t, client.Close())
}, p, p, topicName
require.NoError(t, pR.Close(context.TODO()))
require.NoError(t, pS.Close(context.TODO()))
require.NoError(t, clientR.Close())
require.NoError(t, clientS.Close())
}, pS, pR, topicName
}

func BenchmarkSendReceive(b *testing.B) {
Expand Down

0 comments on commit d262f75

Please sign in to comment.