Skip to content

Commit

Permalink
Merge pull request #341 from lovoo/visitor-tool-poc
Browse files Browse the repository at this point in the history
Visitor Tool implementation
  • Loading branch information
Uwe Jugel authored Aug 27, 2021
2 parents eaaa070 + d252a36 commit 15746b5
Show file tree
Hide file tree
Showing 23 changed files with 1,288 additions and 147 deletions.
24 changes: 17 additions & 7 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ type Context interface {
DeferCommit() func(error)
}

type message struct {
key string
timestamp time.Time
topic string
offset int64
partition int32
headers []*sarama.RecordHeader
value []byte
}

type cbContext struct {
ctx context.Context
graph *GroupGraph
Expand All @@ -160,7 +170,7 @@ type cbContext struct {
// tracking statistics for the output topic
trackOutputStats func(ctx context.Context, topic string, size int)

msg *sarama.ConsumerMessage
msg *message
done bool
counters struct {
emits int
Expand Down Expand Up @@ -262,32 +272,32 @@ func (ctx *cbContext) SetValue(value interface{}, options ...ContextOption) {

// Timestamp returns the timestamp of the input message.
func (ctx *cbContext) Timestamp() time.Time {
return ctx.msg.Timestamp
return ctx.msg.timestamp
}

func (ctx *cbContext) Key() string {
return string(ctx.msg.Key)
return ctx.msg.key
}

func (ctx *cbContext) Topic() Stream {
return Stream(ctx.msg.Topic)
return Stream(ctx.msg.topic)
}

func (ctx *cbContext) Offset() int64 {
return ctx.msg.Offset
return ctx.msg.offset
}

func (ctx *cbContext) Group() Group {
return ctx.graph.Group()
}

func (ctx *cbContext) Partition() int32 {
return ctx.msg.Partition
return ctx.msg.partition
}

func (ctx *cbContext) Headers() Headers {
if ctx.headers == nil {
ctx.headers = HeadersFromSarama(ctx.msg.Headers)
ctx.headers = HeadersFromSarama(ctx.msg.headers)
}
return ctx.headers
}
Expand Down
44 changes: 24 additions & 20 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ func TestContext_Timestamp(t *testing.T) {
ts := time.Now()

ctx := &cbContext{
msg: &sarama.ConsumerMessage{
Timestamp: ts,
msg: &message{
timestamp: ts,
},
}

Expand Down Expand Up @@ -205,7 +205,7 @@ func TestContext_GetSetStateless(t *testing.T) {
// ctx stateless since no storage passed
ctx := &cbContext{
graph: DefineGroup("group"),
msg: new(sarama.ConsumerMessage),
msg: new(message),
syncFailer: func(err error) { panic(err) },
}
func() {
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestContext_Delete(t *testing.T) {
graph: DefineGroup(group, Persist(new(codec.String))),
wg: new(sync.WaitGroup),
commit: func() { ack++ },
msg: &sarama.ConsumerMessage{Offset: offset},
msg: &message{offset: offset},
table: pt,
}

Expand Down Expand Up @@ -275,7 +275,7 @@ func TestContext_DeleteStateless(t *testing.T) {
ctx := &cbContext{
graph: DefineGroup(group),
wg: new(sync.WaitGroup),
msg: &sarama.ConsumerMessage{Offset: offset},
msg: &message{offset: offset},
}
ctx.emitter = newEmitter(nil, nil)

Expand Down Expand Up @@ -306,7 +306,7 @@ func TestContext_DeleteStorageError(t *testing.T) {
ctx := &cbContext{
graph: DefineGroup(group, Persist(new(codec.String))),
wg: new(sync.WaitGroup),
msg: &sarama.ConsumerMessage{Offset: offset},
msg: &message{offset: offset},
table: pt,
}

Expand Down Expand Up @@ -342,7 +342,7 @@ func TestContext_Set(t *testing.T) {
wg: new(sync.WaitGroup),
commit: func() { ack++ },
trackOutputStats: func(ctx context.Context, topic string, size int) {},
msg: &sarama.ConsumerMessage{Key: []byte(key), Offset: offset},
msg: &message{key: key, offset: offset},
table: pt,
ctx: context.Background(),
}
Expand Down Expand Up @@ -396,7 +396,7 @@ func TestContext_GetSetStateful(t *testing.T) {
wg: wg,
graph: graph,
trackOutputStats: func(ctx context.Context, topic string, size int) {},
msg: &sarama.ConsumerMessage{Key: []byte(key), Offset: offset},
msg: &message{key: key, offset: offset},
emitter: func(tp string, k string, v []byte, h Headers) *Promise {
wg.Add(1)
test.AssertEqual(t, tp, graph.GroupTable().Topic())
Expand Down Expand Up @@ -445,7 +445,7 @@ func TestContext_SetErrors(t *testing.T) {
trackOutputStats: func(ctx context.Context, topic string, size int) {},
wg: wg,
graph: DefineGroup(group, Persist(new(codec.String))),
msg: &sarama.ConsumerMessage{Key: []byte(key), Offset: offset},
msg: &message{key: key, offset: offset},
syncFailer: failer,
asyncFailer: failer,
}
Expand Down Expand Up @@ -478,7 +478,7 @@ func TestContext_LoopbackNoLoop(t *testing.T) {
// ctx has no loop set
ctx := &cbContext{
graph: DefineGroup("group", Persist(c)),
msg: &sarama.ConsumerMessage{},
msg: &message{},
syncFailer: func(err error) { panic(err) },
}
func() {
Expand All @@ -502,7 +502,7 @@ func TestContext_Loopback(t *testing.T) {
graph := DefineGroup("group", Persist(c), Loop(c, cb))
ctx := &cbContext{
graph: graph,
msg: &sarama.ConsumerMessage{},
msg: &message{},
trackOutputStats: func(ctx context.Context, topic string, size int) {},
emitter: func(tp string, k string, v []byte, h Headers) *Promise {
cnt++
Expand Down Expand Up @@ -532,7 +532,7 @@ func TestContext_Join(t *testing.T) {

ctx := &cbContext{
graph: DefineGroup("group", Persist(c), Loop(c, cb), Join(table, c)),
msg: &sarama.ConsumerMessage{Key: []byte(key)},
msg: &message{key: key},
pviews: map[string]*PartitionTable{
string(table): {
log: defaultLogger,
Expand Down Expand Up @@ -581,7 +581,7 @@ func TestContext_Lookup(t *testing.T) {

ctx := &cbContext{
graph: DefineGroup("group", Persist(c), Loop(c, cb)),
msg: &sarama.ConsumerMessage{Key: []byte(key)},
msg: &message{key: key},
views: map[string]*View{
string(table): {
opts: &voptions{
Expand Down Expand Up @@ -628,19 +628,23 @@ func TestContext_Headers(t *testing.T) {

// context without headers will return empty map
ctx := &cbContext{
msg: &sarama.ConsumerMessage{Key: []byte("key")},
msg: &message{
key: "key",
},
}
headers := ctx.Headers()
test.AssertNotNil(t, headers)
test.AssertEqual(t, len(headers), 0)

ctx = &cbContext{
msg: &sarama.ConsumerMessage{Key: []byte("key"), Headers: []*sarama.RecordHeader{
{
Key: []byte("key"),
Value: []byte("value"),
},
}},
msg: &message{
key: "key",
headers: []*sarama.RecordHeader{
{
Key: []byte("key"),
Value: []byte("value"),
},
}},
}
headers = ctx.Headers()
test.AssertEqual(t, headers["key"], []byte("value"))
Expand Down
2 changes: 1 addition & 1 deletion doc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//go:generate go-bindata -pkg templates -o web/templates/bindata.go web/templates/common/... web/templates/monitor/... web/templates/query/... web/templates/index/...
//go:generate go-bindata -pkg templates -o web/templates/bindata.go web/templates/...
//go:generate mockgen -self_package github.com/lovoo/goka -package goka -destination mockstorage.go github.com/lovoo/goka/storage Storage
//go:generate mockgen -self_package github.com/lovoo/goka -package goka -destination mocks.go github.com/lovoo/goka TopicManager,Producer,
//go:generate mockgen -self_package github.com/lovoo/goka -package goka -destination mockssarama.go github.com/Shopify/sarama Client,ClusterAdmin
Expand Down
136 changes: 136 additions & 0 deletions examples/10-visit/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/multierr"
)

var (
brokers = []string{"localhost:9092"}
topic goka.Stream = "example-migration-clicks-input"
group goka.Group = "example-migration-group"

tmc *goka.TopicManagerConfig
)

func init() {
// This sets the default replication to 1. If you have more then one broker
// the default configuration can be used.
tmc = goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
}

// emit messages until stopped
func runEmitter(ctx context.Context) {
emitter, err := goka.NewEmitter(brokers, topic, new(codec.Int64))
if err != nil {
log.Fatalf("error creating emitter: %v", err)
}
defer emitter.Finish()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for i := 0; ; i++ {
select {
case <-ticker.C:

err = emitter.EmitSync(fmt.Sprintf("key-%d", i%10), int64(1))
if err != nil {
log.Fatalf("error emitting message: %v", err)
}
case <-ctx.Done():
return
}
}
}

func main() {

tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc)
if err != nil {
log.Fatalf("Error creating topic manager: %v", err)
}
err = tm.EnsureStreamExists(string(topic), 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topic, err)
}

sigs := make(chan os.Signal)
go func() {
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errg, ctx := multierr.NewErrGroup(ctx)

g := goka.DefineGroup(group,
goka.Input(topic, new(codec.Int64), func(ctx goka.Context, msg interface{}) {
var counter int64
if val := ctx.Value(); val != nil {
counter = val.(int64)
}
counter += msg.(int64)
log.Printf("%s: %d", ctx.Key(), counter)
ctx.SetValue(counter)

}),
goka.Visitor("reset", func(ctx goka.Context, meta interface{}) {
log.Printf("resetting %s: %d", ctx.Key(), meta.(int64))
ctx.SetValue(meta)
}),
goka.Persist(new(codec.Int64)),
)

proc, err := goka.NewProcessor(brokers,
g,
goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithTopicManagerConfig(tmc)),
)
if err != nil {
log.Fatalf("error creating processor: %v", err)
}

// start the emitter
errg.Go(func() error {
runEmitter(ctx)
return nil
})

// start the processor
errg.Go(func() error {
return proc.Run(ctx)
})

errg.Go(func() error {
select {
case <-sigs:
case <-ctx.Done():
}
cancel()
return nil
})

time.Sleep(5 * time.Second)

if err := proc.VisitAll(ctx, "reset", int64(0)); err != nil {
log.Printf("error visiting: %v", err)
}

time.Sleep(5 * time.Second)
log.Printf("stopping...")
cancel()
if err := errg.Wait().NilOrError(); err != nil {
log.Fatalf("error running: %v", err)
}
log.Printf("done.")

}
Loading

0 comments on commit 15746b5

Please sign in to comment.