From 311273beaa97486663018db13838f27b4f0bc890 Mon Sep 17 00:00:00 2001 From: frairon Date: Thu, 13 Jun 2019 09:33:19 +0200 Subject: [PATCH] bugfix issue 188: tester for emitter uses tester.Consume to make it wait for messages to be consumed --- options.go | 4 +-- tester/tester.go | 33 ++++++++++++++++++++++ tester/tester_test.go | 65 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 2 deletions(-) diff --git a/options.go b/options.go index 7e3f36e8..efb25c8d 100644 --- a/options.go +++ b/options.go @@ -181,6 +181,7 @@ type Tester interface { StorageBuilder() storage.Builder ConsumerBuilder() kafka.ConsumerBuilder ProducerBuilder() kafka.ProducerBuilder + EmitterProducerBuilder() kafka.ProducerBuilder TopicManagerBuilder() kafka.TopicManagerBuilder RegisterGroupGraph(*GroupGraph) RegisterEmitter(Stream, Codec) @@ -412,12 +413,11 @@ func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption { func WithEmitterTester(t Tester) EmitterOption { return func(o *eoptions, topic Stream, codec Codec) { - o.builders.producer = t.ProducerBuilder() + o.builders.producer = t.EmitterProducerBuilder() o.builders.topicmgr = t.TopicManagerBuilder() t.RegisterEmitter(topic, codec) } } - func (opt *eoptions) applyOptions(topic Stream, codec Codec, opts ...EmitterOption) error { opt.clientID = defaultClientID opt.log = logger.Default() diff --git a/tester/tester.go b/tester/tester.go index 055cc70a..fa65aa2e 100644 --- a/tester/tester.go +++ b/tester/tester.go @@ -214,6 +214,19 @@ func (km *Tester) ProducerBuilder() kafka.ProducerBuilder { } } +// EmitterProducerBuilder creates a producer builder used for Emitters. +// Emitters need to flush when emitting messages. +func (km *Tester) EmitterProducerBuilder() kafka.ProducerBuilder { + builder := km.ProducerBuilder() + return func(b []string, cid string, hasher func() hash.Hash32) (kafka.Producer, error) { + prod, err := builder(b, cid, hasher) + return &flushingProducer{ + tester: km, + producer: prod, + }, err + } +} + // StorageBuilder returns the storage builder when this tester is used as an option // to a processor func (km *Tester) StorageBuilder() storage.Builder { @@ -268,6 +281,7 @@ func (km *Tester) waitStartup() { // Consume a message using the topic's configured codec func (km *Tester) Consume(topic string, key string, msg interface{}) { km.waitStartup() + log.Printf("startup") // if the user wants to send a nil for some reason, // just let her. Goka should handle it accordingly :) @@ -426,3 +440,22 @@ func (p *producerMock) Close() error { logger.Printf("Closing producer mock") return nil } + +// flushingProducer wraps the producer and +// waits for all consumers after the Emit. +type flushingProducer struct { + tester *Tester + producer kafka.Producer +} + +// Emit using the underlying producer +func (e *flushingProducer) Emit(topic string, key string, value []byte) *kafka.Promise { + prom := e.producer.Emit(topic, key, value) + e.tester.waitForConsumers() + return prom +} + +// Close using the underlying producer +func (e *flushingProducer) Close() error { + return e.producer.Close() +} diff --git a/tester/tester_test.go b/tester/tester_test.go index 288d1e6c..33ec2977 100644 --- a/tester/tester_test.go +++ b/tester/tester_test.go @@ -3,6 +3,7 @@ package tester import ( "context" "fmt" + "log" "reflect" "sync" "testing" @@ -498,3 +499,67 @@ func Test_ManyConsume(t *testing.T) { t.Fatalf("did not receive all messages") } } + +func TestEmitterStandalone(t *testing.T) { + gkt := New(t) + + em, _ := goka.NewEmitter(nil, "test", new(codec.String), goka.WithEmitterTester(gkt)) + est := gkt.NewQueueTracker("test") + + em.Emit("key", "value") + + _, _, ok := est.Next() + if !ok { + t.Errorf("No message emitted") + } +} + +// Tests an emitter used inside a processor. +// For this the user has to start the emitter with +// a separate tester, otherwise it will deadlock. +func TestEmitterInProcessor(t *testing.T) { + + gktProcessor := New(t) + + // create a new emitter mocked with an extra tester + gktEmitter := New(t) + em, _ := goka.NewEmitter(nil, "output", new(codec.String), goka.WithEmitterTester(gktEmitter)) + + // create a new processor that uses the emitter internally + proc, err := goka.NewProcessor(nil, goka.DefineGroup("test-group", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + log.Printf("sending") + + prom, err := em.Emit(ctx.Key(), msg) + log.Printf("sending done") + if err != nil { + t.Errorf("Error emitting in processor: %v", err) + } + prom.Then(func(err error) { + if err != nil { + t.Errorf("Error emitting in processor (in promise): %v", err) + } + }) + log.Printf("done") + }), + ), + goka.WithTester(gktProcessor), + ) + + if err != nil { + log.Fatalf("Error creating processor: %v", err) + } + runProcOrFail(proc) + + // create a queue tracker from the extra tester + est := gktEmitter.NewQueueTracker("output") + + // consume a message + gktProcessor.Consume("input", "key", "value") + + // ensure the message is there + _, _, ok := est.Next() + if !ok { + t.Errorf("No message emitted") + } +}