Skip to content

Commit

Permalink
Merge pull request #297 from lovoo/improve-topicmanager-creation
Browse files Browse the repository at this point in the history
topic-manager improvements
  • Loading branch information
frairon authored Feb 10, 2021
2 parents dbc15b1 + 16f5055 commit f804627
Show file tree
Hide file tree
Showing 15 changed files with 1,073 additions and 271 deletions.
4 changes: 2 additions & 2 deletions doc.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//go:generate go-bindata -pkg templates -o web/templates/bindata.go web/templates/common/... web/templates/monitor/... web/templates/query/... web/templates/index/...
//go:generate mockgen -self_package github.com/lovoo/goka -package goka -destination mockstorage.go github.com/lovoo/goka/storage Storage
//go:generate mockgen -self_package github.com/lovoo/goka -package goka -destination mocks.go github.com/lovoo/goka TopicManager,Producer,Broker
//go:generate mockgen -self_package github.com/lovoo/goka -package goka -destination mockssarama.go github.com/Shopify/sarama Client
//go:generate mockgen -self_package github.com/lovoo/goka -package goka -destination mocks.go github.com/lovoo/goka TopicManager,Producer,
//go:generate mockgen -self_package github.com/lovoo/goka -package goka -destination mockssarama.go github.com/Shopify/sarama Client,ClusterAdmin

/*
Package goka is a stateful stream processing library for Apache Kafka (version 0.9+) that eases
Expand Down
2 changes: 2 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package goka

import (
"errors"
"fmt"
reflect "reflect"
"regexp"
Expand All @@ -13,6 +14,7 @@ var (
errBuildConsumer = "error creating Kafka consumer: %v"
errBuildProducer = "error creating Kafka producer: %v"
errApplyOptions = "error applying options: %v"
errTopicNotFound = errors.New("requested topic was not found")
)

var (
Expand Down
22 changes: 18 additions & 4 deletions graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ func (gg *GroupGraph) isOutputTopic(topic Stream) bool {

// inputs returns all input topics (tables and streams)
func (gg *GroupGraph) inputs() Edges {
return append(append(gg.inputStreams, gg.inputTables...), gg.crossTables...)
return chainEdges(gg.inputStreams, gg.inputTables, gg.crossTables)
}

// copartitioned returns all copartitioned topics (joint tables and input streams)
func (gg *GroupGraph) copartitioned() Edges {
return append(gg.inputStreams, gg.inputTables...)
return chainEdges(gg.inputStreams, gg.inputTables)
}

func (gg *GroupGraph) codec(topic string) Codec {
Expand Down Expand Up @@ -196,8 +196,7 @@ func (gg *GroupGraph) Validate() error {
if len(gg.inputStreams) == 0 {
return errors.New("no input stream in group graph")
}
for _, t := range append(gg.outputStreams,
append(gg.inputStreams, append(gg.inputTables, gg.crossTables...)...)...) {
for _, t := range chainEdges(gg.outputStreams, gg.inputStreams, gg.inputTables, gg.crossTables) {
if t.Topic() == loopName(gg.Group()) {
return errors.New("should not directly use loop stream")
}
Expand All @@ -219,6 +218,21 @@ type Edge interface {
// Edges is a slice of edge objects.
type Edges []Edge

// chainEdges chains edges together to avoid error-prone
// append(edges, moreEdges...) constructs in the graph
func chainEdges(edgeList ...Edges) Edges {
var sum int
for _, edges := range edgeList {
sum += len(edges)
}
chained := make(Edges, 0, sum)

for _, edges := range edgeList {
chained = append(chained, edges...)
}
return chained
}

// Topics returns the names of the topics of the edges.
func (e Edges) Topics() []string {
var t []string
Expand Down
7 changes: 6 additions & 1 deletion graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,13 @@ func TestGroupGraph_Validate(t *testing.T) {
)
err = g.Validate()
test.AssertStringContains(t, err.Error(), "loop stream")
}

func TestGroupGraph_chainEdges(t *testing.T) {
test.AssertEqual(t, len(chainEdges()), 0)
test.AssertEqual(t, len(chainEdges(Edges{}, Edges{})), 0)
test.AssertEqual(t, chainEdges(Edges{Join("a", nil)}, Edges{}), Edges{Join("a", nil)})
test.AssertEqual(t, chainEdges(Edges{Join("a", nil)}, Edges{Join("a", nil), Join("b", nil)}), Edges{Join("a", nil), Join("a", nil), Join("b", nil)})
}

func TestGroupGraph_codec(t *testing.T) {
Expand All @@ -87,7 +93,6 @@ func TestGroupGraph_codec(t *testing.T) {
codec := g.codec(topic)
test.AssertEqual(t, codec, c)
}

}

func TestGroupGraph_callback(t *testing.T) {
Expand Down
106 changes: 106 additions & 0 deletions integrationtest/topicmanager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package integrationtest

import (
"crypto/rand"
"encoding/hex"
"flag"
"strings"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/lovoo/goka"
"github.com/lovoo/goka/internal/test"
)

var (
systemtest = flag.Bool("systemtest", false, "set to run systemtests that require a running kafka-version")
)

func TestTopicManagerCreate(t *testing.T) {
if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0

tm, err := goka.TopicManagerBuilderWithConfig(cfg, goka.NewTopicManagerConfig())([]string{"localhost:9092"})
test.AssertNil(t, err)

err = tm.EnsureTopicExists("test10", 4, 2, nil)
test.AssertNil(t, err)

}

// Tests the topic manager with sarama version v11 --> so it will test topic configuration using
// the sarama.ClusterAdmin
func TestTopicManager_v11(t *testing.T) {
if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0
tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.MismatchBehavior = goka.TMConfigMismatchBehaviorFail

tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{"localhost:9092"})
test.AssertNil(t, err)

client, _ := sarama.NewClient([]string{"localhost:9092"}, cfg)
admin, _ := sarama.NewClusterAdminFromClient(client)

t.Run("ensure-new-stream", func(t *testing.T) {
topic := newTopicName()

// delete topic, ignore error if it does not exist
admin.DeleteTopic(topic)

err := tm.EnsureStreamExists(topic, 10)
test.AssertNil(t, err)
time.Sleep(1 * time.Second)
// trying to create the same is fine
err = tm.EnsureStreamExists(topic, 10)
test.AssertNil(t, err)
time.Sleep(1 * time.Second)
// partitions changed - error
err = tm.EnsureStreamExists(topic, 11)
test.AssertNotNil(t, err)
})

t.Run("list-partitions", func(t *testing.T) {

var (
topic = newTopicName()
partitions []int32
err error
)
_, err = tm.Partitions(topic)
test.AssertNotNil(t, err)
test.AssertTrue(t, strings.Contains(err.Error(), "requested topic was not found"))
test.AssertEqual(t, len(partitions), 0)

tm.EnsureTableExists(topic, 123)
time.Sleep(1 * time.Second)
partitions, err = tm.Partitions(topic)
test.AssertNil(t, err)
test.AssertEqual(t, len(partitions), 123)

})

t.Run("non-existent", func(t *testing.T) {
// topic does not exist
partitions, err := tm.Partitions("non-existent-topic")
test.AssertTrue(t, len(partitions) == 0, "expected no partitions, was", partitions)
test.AssertNotNil(t, err)
})

}

func newTopicName() string {
topicBytes := make([]byte, 4)
rand.Read(topicBytes)
return hex.EncodeToString(topicBytes)
}
3 changes: 2 additions & 1 deletion internal/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ func AssertNil(t Fataler, actual interface{}) {
value := reflect.ValueOf(actual)
if value.IsValid() {
if !value.IsNil() {
t.Fatalf("Expected value to be nil, but was not nil in %s", string(debug.Stack()))

t.Fatalf("Expected value to be nil, but was not nil (%v) in %s", actual, string(debug.Stack()))
}
}
}
Expand Down
6 changes: 0 additions & 6 deletions mockautoconsumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,11 +634,5 @@ func (cg *MockConsumerGroup) Close() error {

// close old errs chan and create new one
close(cg.errs)
cg.errs = make(chan error)

cg.offset = 0
cg.currentGeneration = 0
cg.sessions = make(map[string]*MockConsumerGroupSession)
cg.failOnConsume = nil
return nil
}
4 changes: 2 additions & 2 deletions mockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type builderMock struct {
consumerGroup *MockConsumerGroup
producer *MockProducer
client *MockClient
broker *MockBroker
admin *MockClusterAdmin
}

func newBuilderMock(ctrl *gomock.Controller) *builderMock {
Expand All @@ -33,7 +33,7 @@ func newBuilderMock(ctrl *gomock.Controller) *builderMock {
tmgr: NewMockTopicManager(ctrl),
producer: NewMockProducer(ctrl),
client: NewMockClient(ctrl),
broker: NewMockBroker(ctrl),
admin: NewMockClusterAdmin(ctrl),
}
}

Expand Down
Loading

0 comments on commit f804627

Please sign in to comment.