From 6ba9e474a305b71d222e652d39aa9d9cc76912a4 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 27 Oct 2023 14:50:22 +0000 Subject: [PATCH] Try to make sure the Receiver starts before we send events Signed-off-by: Doug Davis --- v2/client/test/test.go | 22 ++++++++++++++++------ v2/protocol/test/test.go | 10 ++++++++++ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/v2/client/test/test.go b/v2/client/test/test.go index 82f9925d2..d855af410 100644 --- a/v2/client/test/test.go +++ b/v2/client/test/test.go @@ -25,13 +25,20 @@ import ( func SendReceive(t *testing.T, protocolFactory func() interface{}, in event.Event, outAssert func(e event.Event), opts ...client.Option) { t.Helper() pf := protocolFactory() - c, err := client.New(pf, opts...) + + // Create a sender and receiver client since we can't assume it's safe + // to use the same one for both roles + + sender, err := client.New(pf, opts...) + require.NoError(t, err) + + receiver, err := client.New(pf, opts...) require.NoError(t, err) + wg := sync.WaitGroup{} wg.Add(2) - // Give time for Kafka client protocol to get setup - time.Sleep(2 * time.Second) + receiverReady := make(chan bool) go func() { ctx, cancel := context.WithCancel(context.TODO()) @@ -42,7 +49,8 @@ func SendReceive(t *testing.T, protocolFactory func() interface{}, in event.Even wg.Done() }(inCh) go func(channel chan event.Event) { - err := c.StartReceiver(ctx, func(e event.Event) { + receiverReady <- true + err := receiver.StartReceiver(ctx, func(e event.Event) { channel <- e }) if err != nil { @@ -53,12 +61,14 @@ func SendReceive(t *testing.T, protocolFactory func() interface{}, in event.Even outAssert(e) }() - // Give time for the receiever to start + // Wait for receiver to be setup. Not 100% perefect but the channel + the + // sleep should do it + <-receiverReady time.Sleep(2 * time.Second) go func() { defer wg.Done() - err := c.Send(context.Background(), in) + err := sender.Send(context.Background(), in) require.NoError(t, err) }() diff --git a/v2/protocol/test/test.go b/v2/protocol/test/test.go index 3a250e360..57349df90 100644 --- a/v2/protocol/test/test.go +++ b/v2/protocol/test/test.go @@ -25,8 +25,13 @@ func SendReceive(t *testing.T, ctx context.Context, in binding.Message, s protoc wg := sync.WaitGroup{} wg.Add(2) + // Used to try to make sure the receiver is ready before we start to + // send events + wait := make(chan bool) + go func() { defer wg.Done() + wait <- true out, result := r.Receive(ctx) if !protocol.IsACK(result) { require.NoError(t, result) @@ -36,6 +41,11 @@ func SendReceive(t *testing.T, ctx context.Context, in binding.Message, s protoc require.NoError(t, finishErr) }() + // Wait until receiver thread starts, and then wait a second to + // give the "Receive" call a chance to start (finger's crossed) + <-wait + time.Sleep(time.Second) + go func() { defer wg.Done() mx := sync.Mutex{}