Skip to content

Commit

Permalink
Merge pull request #190 from lovoo/188/emitter-tester-flush
Browse files Browse the repository at this point in the history
bugfix #188: tester for emitter
  • Loading branch information
frairon authored Jun 18, 2019
2 parents 6636bb3 + 311273b commit dfc3e62
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 2 deletions.
4 changes: 2 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ type Tester interface {
StorageBuilder() storage.Builder
ConsumerBuilder() kafka.ConsumerBuilder
ProducerBuilder() kafka.ProducerBuilder
EmitterProducerBuilder() kafka.ProducerBuilder
TopicManagerBuilder() kafka.TopicManagerBuilder
RegisterGroupGraph(*GroupGraph)
RegisterEmitter(Stream, Codec)
Expand Down Expand Up @@ -412,12 +413,11 @@ func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption {

func WithEmitterTester(t Tester) EmitterOption {
return func(o *eoptions, topic Stream, codec Codec) {
o.builders.producer = t.ProducerBuilder()
o.builders.producer = t.EmitterProducerBuilder()
o.builders.topicmgr = t.TopicManagerBuilder()
t.RegisterEmitter(topic, codec)
}
}

func (opt *eoptions) applyOptions(topic Stream, codec Codec, opts ...EmitterOption) error {
opt.clientID = defaultClientID
opt.log = logger.Default()
Expand Down
33 changes: 33 additions & 0 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,19 @@ func (km *Tester) ProducerBuilder() kafka.ProducerBuilder {
}
}

// EmitterProducerBuilder creates a producer builder used for Emitters.
// Emitters need to flush when emitting messages.
func (km *Tester) EmitterProducerBuilder() kafka.ProducerBuilder {
builder := km.ProducerBuilder()
return func(b []string, cid string, hasher func() hash.Hash32) (kafka.Producer, error) {
prod, err := builder(b, cid, hasher)
return &flushingProducer{
tester: km,
producer: prod,
}, err
}
}

// StorageBuilder returns the storage builder when this tester is used as an option
// to a processor
func (km *Tester) StorageBuilder() storage.Builder {
Expand Down Expand Up @@ -268,6 +281,7 @@ func (km *Tester) waitStartup() {
// Consume a message using the topic's configured codec
func (km *Tester) Consume(topic string, key string, msg interface{}) {
km.waitStartup()
log.Printf("startup")

// if the user wants to send a nil for some reason,
// just let her. Goka should handle it accordingly :)
Expand Down Expand Up @@ -426,3 +440,22 @@ func (p *producerMock) Close() error {
logger.Printf("Closing producer mock")
return nil
}

// flushingProducer wraps the producer and
// waits for all consumers after the Emit.
type flushingProducer struct {
tester *Tester
producer kafka.Producer
}

// Emit using the underlying producer
func (e *flushingProducer) Emit(topic string, key string, value []byte) *kafka.Promise {
prom := e.producer.Emit(topic, key, value)
e.tester.waitForConsumers()
return prom
}

// Close using the underlying producer
func (e *flushingProducer) Close() error {
return e.producer.Close()
}
65 changes: 65 additions & 0 deletions tester/tester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tester
import (
"context"
"fmt"
"log"
"reflect"
"sync"
"testing"
Expand Down Expand Up @@ -498,3 +499,67 @@ func Test_ManyConsume(t *testing.T) {
t.Fatalf("did not receive all messages")
}
}

func TestEmitterStandalone(t *testing.T) {
gkt := New(t)

em, _ := goka.NewEmitter(nil, "test", new(codec.String), goka.WithEmitterTester(gkt))
est := gkt.NewQueueTracker("test")

em.Emit("key", "value")

_, _, ok := est.Next()
if !ok {
t.Errorf("No message emitted")
}
}

// Tests an emitter used inside a processor.
// For this the user has to start the emitter with
// a separate tester, otherwise it will deadlock.
func TestEmitterInProcessor(t *testing.T) {

gktProcessor := New(t)

// create a new emitter mocked with an extra tester
gktEmitter := New(t)
em, _ := goka.NewEmitter(nil, "output", new(codec.String), goka.WithEmitterTester(gktEmitter))

// create a new processor that uses the emitter internally
proc, err := goka.NewProcessor(nil, goka.DefineGroup("test-group",
goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) {
log.Printf("sending")

prom, err := em.Emit(ctx.Key(), msg)
log.Printf("sending done")
if err != nil {
t.Errorf("Error emitting in processor: %v", err)
}
prom.Then(func(err error) {
if err != nil {
t.Errorf("Error emitting in processor (in promise): %v", err)
}
})
log.Printf("done")
}),
),
goka.WithTester(gktProcessor),
)

if err != nil {
log.Fatalf("Error creating processor: %v", err)
}
runProcOrFail(proc)

// create a queue tracker from the extra tester
est := gktEmitter.NewQueueTracker("output")

// consume a message
gktProcessor.Consume("input", "key", "value")

// ensure the message is there
_, _, ok := est.Next()
if !ok {
t.Errorf("No message emitted")
}
}

0 comments on commit dfc3e62

Please sign in to comment.