Skip to content

Commit

Permalink
Code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
xitonix committed Aug 16, 2019
1 parent 5ff28f7 commit 4acc70b
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 189 deletions.
92 changes: 92 additions & 0 deletions bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package main

import (
"github.com/xitonix/flags"
"github.com/xitonix/flags/core"

"github.com/xitonix/trubka/kafka"
"github.com/xitonix/trubka/protobuf"
)

var (
profilingMode *core.StringFlag
protoDir *core.StringFlag
protoPrefix *core.StringFlag
topicPrefix *core.StringFlag
logFilePath *core.StringFlag
outputDir *core.StringFlag
topic *core.StringFlag
messageType *core.StringFlag
format *core.StringFlag
kafkaVersion *core.StringFlag
environment *core.StringFlag
topicFilter *core.StringFlag
typeFilter *core.StringFlag
searchQuery *core.StringFlag
saslUsername *core.StringFlag
saslPassword *core.StringFlag
saslMechanism *core.StringFlag
certCA *core.StringFlag
brokers *core.StringSliceFlag
protoFiles *core.StringSliceFlag
interactive *core.BoolFlag
reverse *core.BoolFlag
includeTimeStamp *core.BoolFlag
rewind *core.BoolFlag
enableAutoTopicCreation *core.BoolFlag
versionRequest *core.BoolFlag
timeCheckpoint *core.TimeFlag
offsetCheckpoint *core.Int64Flag
verbosity *core.CounterFlag
)

func initFlags() {
flags.EnableAutoKeyGeneration()
flags.SetKeyPrefix("TBK")
profilingMode = flags.String("profile", "Enables profiling.").WithValidRange(true, "cpu", "mem", "block", "mutex", "thread").Hide()

brokers = flags.StringSlice("brokers", "The comma separated list of Kafka brokers in server:port format.").WithShort("b")
protoDir = flags.String("proto-root", "The path to the folder where your *.proto files live.").WithShort("R")
topic = flags.String("topic", `The Kafka topic to consume from.`).WithShort("t")
messageType = flags.String("proto", `The fully qualified name of the protobuf type, stored in the given topic.`).WithShort("p")
format = flags.String("format", "The format in which the Kafka messages will be written to the output.").
WithValidRange(true, protobuf.Json, protobuf.JsonIndent, protobuf.Text, protobuf.TextIndent, protobuf.Hex, protobuf.HexIndent).
WithDefault(protobuf.JsonIndent)
protoFiles = flags.StringSlice("proto-files", `An optional list of the proto files to load. If not specified all the files in --proto-root will be processed.`)

interactive = flags.Bool("interactive", "Runs the tool in interactive mode.").WithShort("i")
protoPrefix = flags.String("proto-prefix", "The optional prefix to prepend to proto message names.")
topicPrefix = flags.String("topic-prefix", "The optional prefix to add to Kafka topic names.")

logFilePath = flags.String("log-file", "The `file` to write the logs to. Set to '' to discard (Default: stdout).").WithShort("l")
outputDir = flags.String("output-dir", "The `directory` to write the Kafka messages to. Set to '' to discard (Default: Stdout).").WithShort("d")

kafkaVersion = flags.String("kafka-version", "Kafka cluster version.").WithDefault(kafka.DefaultClusterVersion)
rewind = flags.Bool("rewind", `Starts consuming from the beginning of the stream.`).WithShort("w")
timeCheckpoint = flags.Time("from", `Starts consuming from the most recent available offset at the given time. This will override --rewind.`).WithShort("f")
offsetCheckpoint = flags.Int64("from-offset", `Starts consuming from the specified offset (if applicable). This will override --rewind and --from.
If the most recent offset value of a partition is less than the specified value, this flag will be ignored.`).WithShort("o")
environment = flags.String("environment", `To store the offsets on the disk in environment specific paths. It's only required
if you use Trubka to consume from different Kafka clusters on the same machine (eg. dev/prod).`).WithShort("E").WithDefault("offsets")
topicFilter = flags.String("topic-filter", "The optional regular expression to filter the remote topics by (Interactive mode only).").WithShort("n")
typeFilter = flags.String("type-filter", "The optional regular expression to filter the proto types with (Interactive mode only).").WithShort("m")
reverse = flags.Bool("reverse", "If set, the messages which match the --search-query will be filtered out.")
searchQuery = flags.String("search-query", "The optional regular expression to filter the message content by.").WithShort("q")
includeTimeStamp = flags.Bool("include-timestamp", "Prints the message timestamp before the content if it's been provided by Kafka.").WithShort("T")
enableAutoTopicCreation = flags.Bool("auto-topic-creation", `Enables automatic Kafka topic creation before consuming (if it is allowed on the server).
Enabling this option in production is not recommended since it may pollute the environment with unwanted topics.`)
saslMechanism = flags.String("sasl-mechanism", "SASL authentication mechanism.").
WithValidRange(true, kafka.SASLMechanismNone, kafka.SASLMechanismPlain, kafka.SASLMechanismSCRAM256, kafka.SASLMechanismSCRAM512).
WithDefault(kafka.SASLMechanismNone)

saslUsername = flags.String("sasl-username", "SASL authentication username. Will be ignored if --sasl-mechanism is set to none.").WithShort("U")
saslPassword = flags.String("sasl-password", "SASL authentication password. Will be ignored if --sasl-mechanism is set to none.").WithShort("P")

certCA = flags.String("tls", `The certificate authority file to enable TLS for communicating with the Kafka cluster.
If set to an empty string, TLS will be switched to unverified mode (not recommended).`)

verbosity = flags.Verbosity("The verbosity level of the tool.").WithKey("-")
versionRequest = flags.Bool("version", "Prints the current version of Trubka.").WithKey("-")

flags.Parse()
}
6 changes: 3 additions & 3 deletions internal/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
type Printer interface {
Logf(level VerbosityLevel, format string, args ...interface{})
Log(level VerbosityLevel, msg string)
WriteMessage(topic string, bytes []byte)
WriteEvent(topic string, bytes []byte)
Level() VerbosityLevel
Close()
}
Expand Down Expand Up @@ -93,8 +93,8 @@ func (p *SyncPrinter) Level() VerbosityLevel {
return p.currentLevel
}

// WriteMessage writes the message to the relevant message io.Writer.
func (p *SyncPrinter) WriteMessage(topic string, bytes []byte) {
// WriteEvent writes the event content to the relevant message io.Writer.
func (p *SyncPrinter) WriteEvent(topic string, bytes []byte) {
if len(bytes) == 0 {
return
}
Expand Down
66 changes: 32 additions & 34 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ type Consumer struct {
enableAutoTopicCreation bool
environment string
events chan *Event

mux sync.Mutex
isClosed bool
closeOnce sync.Once
}

// NewConsumer creates a new instance of Kafka cluster consumer.
Expand Down Expand Up @@ -132,7 +130,6 @@ func (c *Consumer) Start(ctx context.Context, topics map[string]*Checkpoint) err
defer store.close()
store.start(topicPartitionOffsets)
}

c.consumeTopics(ctx, topicPartitionOffsets)
return nil
}
Expand Down Expand Up @@ -198,20 +195,26 @@ func (c *Consumer) consumeTopics(ctx context.Context, topicPartitionOffsets map[

// Close closes the Kafka consumer.
func (c *Consumer) Close() {
c.mux.Lock()
defer c.mux.Unlock()
if c.isClosed || c.internalConsumer == nil {
return
}
c.printer.Log(internal.Verbose, "Closing Kafka consumer.")
c.isClosed = true
err := c.internalConsumer.Close()
if err != nil {
c.printer.Logf(internal.Forced, "Failed to close Kafka client: %s.", err)
} else {
c.printer.Log(internal.VeryVerbose, "The Kafka client has been closed successfully.")
c.closeOnce.Do(func() {
c.printer.Log(internal.Verbose, "Closing Kafka consumer.")
err := c.internalConsumer.Close()
if err != nil {
c.printer.Logf(internal.Forced, "Failed to close Kafka client: %s.", err)
} else {
c.printer.Log(internal.VeryVerbose, "The Kafka client has been closed successfully.")
}
close(c.events)
})
}

// StoreOffset stores the offset of the successfully processed message into the offset store.
func (c *Consumer) StoreOffset(event *Event) {
if c.config.OffsetStore != nil {
err := c.config.OffsetStore.Store(event.Topic, event.Partition, event.Offset+1)
if err != nil {
c.printer.Logf(internal.Forced, "Failed to store the offset: %s.", err)
}
}
close(c.events)
}

func (c *Consumer) consumePartition(ctx context.Context, topic string, partition int32, offset int64) error {
Expand All @@ -223,27 +226,22 @@ func (c *Consumer) consumePartition(ctx context.Context, topic string, partition

c.wg.Add(1)
go func(pc sarama.PartitionConsumer) {
defer c.wg.Done()
defer func() {
pc.AsyncClose()
c.wg.Done()
}()
for m := range pc.Messages() {
select {
case <-ctx.Done():
pc.AsyncClose()
return
default:
c.events <- &Event{
Topic: m.Topic,
Key: m.Key,
Value: m.Value,
Timestamp: m.Timestamp,
Partition: m.Partition,
Offset: m.Offset,
}
if c.config.OffsetStore != nil {
err := c.config.OffsetStore.Store(m.Topic, m.Partition, m.Offset+1)
if err != nil {
c.printer.Logf(internal.Forced, "Failed to store the offset: %s.", err)
}
}
case c.events <- &Event{
Topic: m.Topic,
Key: m.Key,
Value: m.Value,
Timestamp: m.Timestamp,
Partition: m.Partition,
Offset: m.Offset,
}:
}
}
}(pc)
Expand Down
19 changes: 12 additions & 7 deletions kafka/local_offset_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ func (s *localOffsetStore) start(loaded map[string]PartitionOffsets) {
}()
}

// Returns the channel on which the write errors will be received.
// You must listen to this channel to avoid deadlock.
func (s *localOffsetStore) errors() <-chan error {
return s.writeErrors
}

// Store saves the topic offset to the local disk.
func (s *localOffsetStore) Store(topic string, partition int32, offset int64) error {
s.in <- &progress{
Expand Down Expand Up @@ -116,6 +110,12 @@ func (s *localOffsetStore) Query(topic string) (PartitionOffsets, error) {
return offsets, nil
}

// Returns the channel on which the write errors will be received.
// You must listen to this channel to avoid deadlock.
func (s *localOffsetStore) errors() <-chan error {
return s.writeErrors
}

func (s *localOffsetStore) close() {
if s == nil || s.db == nil {
return
Expand All @@ -141,7 +141,12 @@ func (s *localOffsetStore) writeOffsetsToDisk(offsets map[string]PartitionOffset
return
}
s.checksum[cs] = nil
s.printer.Logf(internal.SuperVerbose, "Writing the offset(s) of topic %s to the disk %s.", topic, cs)
s.printer.Logf(internal.SuperVerbose, "Writing the offset(s) of topic %s to the disk.", topic)
for p, o := range offsets {
if o >= 0 {
s.printer.Logf(internal.Chatty, " P%02d: %d", p, o)
}
}
err = s.db.Write(topic, buff)
if err != nil {
s.writeErrors <- errors.Wrapf(err, "Failed to write the offsets of topic %s to the disk %s", topic, cs)
Expand Down
Loading

0 comments on commit 4acc70b

Please sign in to comment.