Skip to content

Commit

Permalink
Co-authored-by: frairon <[email protected]>
Browse files Browse the repository at this point in the history
Co-authored-by: R053NR07 <[email protected]>
  • Loading branch information
frairon committed Mar 19, 2020
1 parent c7d7725 commit aa4b10f
Show file tree
Hide file tree
Showing 94 changed files with 9,137 additions and 10,031 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
tmp*
*.*~
.tags*
vendor
13 changes: 13 additions & 0 deletions assignment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package goka

import (
"fmt"
)

// Assignment represents a partition:offset assignment for the current connection
type Assignment map[int32]int64

func (a *Assignment) string() string {
var am map[int32]int64 = *a
return fmt.Sprintf("Assignment %v", am)
}
11 changes: 11 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package goka

import "github.com/Shopify/sarama"

// Broker is an interface for the sarama broker
type Broker interface {
Addr() string
Connected() (bool, error)
CreateTopics(request *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error)
Open(conf *sarama.Config) error
}
88 changes: 88 additions & 0 deletions builders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package goka

import (
"hash"

"github.com/Shopify/sarama"
)

// ProducerBuilder create a Kafka producer.
type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)

// DefaultProducerBuilder creates a Kafka producer using the Sarama library.
func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config := globalConfig
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, &config)
}

// ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.
func ProducerBuilderWithConfig(config *sarama.Config) ProducerBuilder {
return func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, config)
}
}

// TopicManagerBuilder creates a TopicManager to check partition counts and
// create tables.
type TopicManagerBuilder func(brokers []string) (TopicManager, error)

// DefaultTopicManagerBuilder creates TopicManager using the Sarama library.
func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error) {
config := globalConfig
config.ClientID = "goka-topic-manager"
return NewTopicManager(brokers, &config, NewTopicManagerConfig())
}

// TopicManagerBuilderWithConfig creates TopicManager using the Sarama library.
func TopicManagerBuilderWithConfig(config *sarama.Config, tmConfig *TopicManagerConfig) TopicManagerBuilder {
return func(brokers []string) (TopicManager, error) {
return NewTopicManager(brokers, config, tmConfig)
}
}

// TopicManagerBuilderWithTopicManagerConfig creates TopicManager using the Sarama library.
func TopicManagerBuilderWithTopicManagerConfig(tmConfig *TopicManagerConfig) TopicManagerBuilder {
return func(brokers []string) (TopicManager, error) {
config := globalConfig
config.ClientID = "goka-topic-manager"
return NewTopicManager(brokers, &config, tmConfig)
}
}

type ConsumerGroupBuilder func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)

// DefaultConsumerGroupBuilder creates a Kafka consumer using the Sarama library.
func DefaultConsumerGroupBuilder(brokers []string, group, clientID string) (sarama.ConsumerGroup, error) {
config := globalConfig
config.ClientID = clientID
return sarama.NewConsumerGroup(brokers, group, &config)
}

// ConsumerGroupBuilderWithConfig creates a sarama consumergroup using passed config
func ConsumerGroupBuilderWithConfig(config *sarama.Config) ConsumerGroupBuilder {
return func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error) {
config.ClientID = clientID
return sarama.NewConsumerGroup(brokers, group, config)
}
}

type SaramaConsumerBuilder func(brokers []string, clientID string) (sarama.Consumer, error)

// DefaultSaramaConsumerBuilder creates a Kafka consumer using the Sarama library.
func DefaultSaramaConsumerBuilder(brokers []string, clientID string) (sarama.Consumer, error) {
config := globalConfig
config.ClientID = clientID
return sarama.NewConsumer(brokers, &config)
}

// ConsumerBuilderWithConfig creates a sarama consumergroup using passed config
func SaramaConsumerBuilderWithConfig(config *sarama.Config) SaramaConsumerBuilder {
return func(brokers []string, clientID string) (sarama.Consumer, error) {
config.ClientID = clientID
return sarama.NewConsumer(brokers, config)
}
}
1 change: 0 additions & 1 deletion codec/codec_test.go

This file was deleted.

57 changes: 57 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package goka

import (
"time"

"github.com/Shopify/sarama"
)

var (
globalConfig = *DefaultConfig()
)

const (
// size of sarama buffer for consumer and producer
defaultChannelBufferSize = 256

// time sarama-cluster assumes the processing of an event may take
defaultMaxProcessingTime = 1 * time.Second

// producer flush configuration
defaultFlushFrequency = 100 * time.Millisecond
defaultFlushBytes = 64 * 1024
defaultProducerMaxRetries = 10
)

// DefaultConfig creates a new config used by goka per default
// Use it to modify and pass to `goka.ReplaceGlobalConifg(...)` to modify
// goka's global config
func DefaultConfig() *sarama.Config {
config := sarama.NewConfig()
config.Version = sarama.V2_0_0_0

// consumer configuration
config.Consumer.Return.Errors = true
config.Consumer.MaxProcessingTime = defaultMaxProcessingTime
// this configures the initial offset for streams. Tables are always
// consumed from OffsetOldest.
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// producer configuration
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = defaultFlushFrequency
config.Producer.Flush.Bytes = defaultFlushBytes
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.Retry.Max = defaultProducerMaxRetries
return config
}

// ReplaceGlobalConfig registeres a standard config used during building if no
// other config is specified
func ReplaceGlobalConfig(config *sarama.Config) {
if config == nil {
panic("nil config registered as global config")
}
globalConfig = *config
}
42 changes: 42 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package goka

import (
"testing"

"github.com/Shopify/sarama"
"github.com/lovoo/goka/internal/test"
)

func TestConfig_DefaultConfig(t *testing.T) {
t.Run("equal", func(t *testing.T) {
cfg := DefaultConfig()
test.AssertTrue(t, cfg.Version == sarama.V2_0_0_0)
test.AssertTrue(t, cfg.Consumer.Return.Errors == true)
test.AssertTrue(t, cfg.Consumer.MaxProcessingTime == defaultMaxProcessingTime)
test.AssertTrue(t, cfg.Consumer.Offsets.Initial == sarama.OffsetNewest)
test.AssertTrue(t, cfg.Producer.RequiredAcks == sarama.WaitForLocal)
test.AssertTrue(t, cfg.Producer.Compression == sarama.CompressionSnappy)
test.AssertTrue(t, cfg.Producer.Flush.Frequency == defaultFlushFrequency)
test.AssertTrue(t, cfg.Producer.Flush.Bytes == defaultFlushBytes)
test.AssertTrue(t, cfg.Producer.Return.Successes == true)
test.AssertTrue(t, cfg.Producer.Return.Errors == true)
test.AssertTrue(t, cfg.Producer.Retry.Max == defaultProducerMaxRetries)
})
}

func TestConfig_ReplaceGlobalConfig(t *testing.T) {
t.Run("succeed", func(t *testing.T) {
custom := DefaultConfig()
custom.Version = sarama.V0_8_2_0
ReplaceGlobalConfig(custom)
test.AssertEqual(t, globalConfig.Version, custom.Version)
})
t.Run("panic", func(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Fatal("there was no panic")
}
}()
ReplaceGlobalConfig(nil)
})
}
Loading

0 comments on commit aa4b10f

Please sign in to comment.