diff --git a/.goreleaser.yml b/.goreleaser.yml index f7761a8..23d97d8 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -4,16 +4,18 @@ before: builds: - env: - - CGO_ENABLED=1 + - CGO_ENABLED=0 binary: kaf-relay goos: - linux goarch: - amd64 + ldflags: + - -s -w -X "main.buildString={{ .Tag }} ({{ .ShortCommit }} {{ .Date }})" -X "main.versionString={{ .Tag }}" archives: - format: tar.gz files: - config.toml.sample - README.md - - LICENSE \ No newline at end of file + - LICENSE diff --git a/Makefile b/Makefile index 71e2c23..4e0913c 100644 --- a/Makefile +++ b/Makefile @@ -1,18 +1,18 @@ -# Git version for injecting into Go bins. -LAST_COMMIT := $(shell git rev-parse --short HEAD) -LAST_COMMIT_DATE := $(shell git show -s --format=%ci ${LAST_COMMIT}) -VERSION := $(shell git describe --tags) -BUILDSTR := ${VERSION} (Commit: ${LAST_COMMIT_DATE} (${LAST_COMMIT}), Build: $(shell date +"%Y-%m-%d% %H:%M:%S %z")) +# Try to get the commit hash from 1) git 2) the VERSION file 3) fallback. +LAST_COMMIT := $(or $(shell git rev-parse --short HEAD 2> /dev/null),$(shell head -n 1 VERSION | grep -oP -m 1 "^[a-z0-9]+$$"),"") -BIN := kaf-relay.bin -DIST := dist +# Try to get the semver from 1) git 2) the VERSION file 3) fallback. +VERSION := $(or $(LISTMONK_VERSION),$(shell git describe --tags --abbrev=0 2> /dev/null),$(shell grep -oP 'tag: \Kv\d+\.\d+\.\d+(-[a-zA-Z0-9.-]+)?' VERSION),"v0.0.0") +BUILDSTR := ${VERSION} (\#${LAST_COMMIT} $(shell date -u +"%Y-%m-%dT%H:%M:%S%z")) -.PHONY: dist -dist: - mkdir -p ${DIST} - CGO_ENABLED=1 go build -o ${BIN} --ldflags="-X 'main.buildString=${BUILDSTR}'" - cp ${BIN} ${DIST} +BIN := kaf-relay + +.PHONY: build +build: $(BIN) + +$(BIN): + CGO_ENABLED=0 go build -o ${BIN} --ldflags="-X 'main.buildString=${BUILDSTR}'" .PHONY: clean clean: - rm -rf ${DIST} \ No newline at end of file + rm -rf ${BIN} diff --git a/consumer.go b/consumer.go index a7ce6eb..4cfddca 100644 --- a/consumer.go +++ b/consumer.go @@ -25,7 +25,7 @@ type consumer struct { offsetMgr *offsetManager nodeTracker *NodeTracker - l *slog.Logger + log *slog.Logger } // offsetManager is a holder for the topic offsets. @@ -74,27 +74,27 @@ func (c *consumer) GetHealthy(ctx context.Context) (int, error) { // reinit reinitializes the consumer group func (c *consumer) Connect(ctx context.Context, cfg ConsumerGroupCfg) error { - c.l.Debug("reinitializing consumer group", "broker", cfg.BootstrapBrokers) + c.log.Debug("reinitializing consumer group", "broker", cfg.BootstrapBrokers) // tcp health check if ok := healthcheck(ctx, cfg.BootstrapBrokers, c.maxReqTime); !ok { return ErrorNoHealthy } - cl, err := initConsumerGroup(ctx, cfg, c.l) + cl, err := initConsumerGroup(ctx, cfg, c.log) if err != nil { return err } offsets := c.GetOffsets() if offsets != nil { - err = leaveAndResetOffsets(ctx, cl, cfg, offsets, c.l) + err = leaveAndResetOffsets(ctx, cl, cfg, offsets, c.log) if err != nil { - c.l.Error("error resetting offsets", "err", err) + c.log.Error("error resetting offsets", "err", err) return err } - cl, err = initConsumerGroup(ctx, cfg, c.l) + cl, err = initConsumerGroup(ctx, cfg, c.log) if err != nil { return err } diff --git a/go.mod b/go.mod index eb35bda..75eb53f 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/knadh/koanf/parsers/toml v0.1.0 github.com/knadh/koanf/providers/file v0.1.0 + github.com/knadh/koanf/providers/posflag v0.1.0 github.com/knadh/koanf/v2 v2.0.1 github.com/spf13/pflag v1.0.5 github.com/twmb/franz-go v1.15.4 diff --git a/go.sum b/go.sum index be49332..9a947a5 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ github.com/knadh/koanf/parsers/toml v0.1.0 h1:S2hLqS4TgWZYj4/7mI5m1CQQcWurxUz6OD github.com/knadh/koanf/parsers/toml v0.1.0/go.mod h1:yUprhq6eo3GbyVXFFMdbfZSo928ksS+uo0FFqNMnO18= github.com/knadh/koanf/providers/file v0.1.0 h1:fs6U7nrV58d3CFAFh8VTde8TM262ObYf3ODrc//Lp+c= github.com/knadh/koanf/providers/file v0.1.0/go.mod h1:rjJ/nHQl64iYCtAW2QQnF0eSmDEX/YZ/eNFj5yR6BvA= +github.com/knadh/koanf/providers/posflag v0.1.0 h1:mKJlLrKPcAP7Ootf4pBZWJ6J+4wHYujwipe7Ie3qW6U= +github.com/knadh/koanf/providers/posflag v0.1.0/go.mod h1:SYg03v/t8ISBNrMBRMlojH8OsKowbkXV7giIbBVgbz0= github.com/knadh/koanf/v2 v2.0.1 h1:1dYGITt1I23x8cfx8ZnldtezdyaZtfAuRtIFOiRzK7g= github.com/knadh/koanf/v2 v2.0.1/go.mod h1:ZeiIlIDXTE7w1lMT6UVcNiRAS2/rCeLn/GdLNvY1Dus= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= diff --git a/initz.go b/initz.go index 4f36c99..0855226 100644 --- a/initz.go +++ b/initz.go @@ -1,107 +1,91 @@ package main import ( + "bytes" "context" "fmt" + "log" "log/slog" + "net/http" "os" "path/filepath" "plugin" "strings" + "time" "github.com/VictoriaMetrics/metrics" + "github.com/knadh/koanf/parsers/toml" + "github.com/knadh/koanf/providers/file" + "github.com/knadh/koanf/providers/posflag" "github.com/knadh/koanf/v2" + flag "github.com/spf13/pflag" "github.com/twmb/franz-go/pkg/kgo" "github.com/zerodha/kaf-relay/filter" ) -// getProducerClient returns a kafka producer client. -func getProducerClient(ctx context.Context, cfg ProducerCfg, bCfg BackoffCfg, l *slog.Logger) (*kgo.Client, error) { - opts := []kgo.Opt{ - kgo.ProduceRequestTimeout(cfg.SessionTimeout), - kgo.RecordDeliveryTimeout(cfg.SessionTimeout), // break the :ProduceSync if it takes too long - kgo.ProducerBatchMaxBytes(int32(cfg.MaxMessageBytes)), - kgo.MaxBufferedRecords(cfg.FlushBatchSize), - kgo.ProducerLinger(cfg.FlushFrequency), - kgo.ProducerBatchCompression(getCompressionCodec(cfg.Compression)), - kgo.SeedBrokers(cfg.BootstrapBrokers...), - kgo.RequiredAcks(getAckPolicy(cfg.CommitAck)), +func initConfig() (*koanf.Koanf, Config) { + // Initialize config + f := flag.NewFlagSet("config", flag.ContinueOnError) + f.Usage = func() { + fmt.Println(f.FlagUsages()) + os.Exit(0) } - // TCPAck/LeaderAck requires kafka deduplication to be turned off - if !cfg.EnableIdempotency { - opts = append(opts, kgo.DisableIdempotentWrite()) + f.StringSlice("config", []string{"config.toml"}, "path to one or more config files (will be merged in order)") + f.String("mode", "single", "single | failover") + f.Bool("stop-at-end", false, "stop relay at the end of offsets") + f.StringSlice("filter", []string{}, "path to one or more filter providers") + f.StringSlice("topic", []string{}, "one or more source:target topic names. Setting this overrides [topics] in the config file.") + f.Bool("version", false, "show current version of the build") + + if err := f.Parse(os.Args[1:]); err != nil { + log.Fatalf("error loading flags: %v", err) } - opts = append(opts, kgo.RecordPartitioner(kgo.ManualPartitioner())) + ko := koanf.New(".") + if err := ko.Load(posflag.Provider(f, ".", ko), nil); err != nil { + log.Fatalf("error reading flag config: %v", err) + } - // Add authentication - if cfg.EnableAuth { - opts = appendSASL(opts, cfg.ClientCfg) + // Version flag. + if ko.Bool("version") { + fmt.Println(buildString) + os.Exit(0) } - if cfg.EnableTLS { - if cfg.CACertPath == "" && cfg.ClientCertPath == "" && cfg.ClientKeyPath == "" { - opts = append(opts, kgo.DialTLS()) - } else { - tlsOpt, err := createTLSConfig(cfg.CACertPath, cfg.ClientCertPath, cfg.ClientKeyPath) - if err != nil { - return nil, err - } + if ko.Bool("stop-at-end") && ko.String("mode") == ModeFailover { + log.Fatalf("`--stop-at-end` cannot be used with `failover` mode") + } - // Set up TLS configuration - opts = append(opts, tlsOpt) + // Load one or more config files. Keys in each subsequent file is merged + // into the previous file's keys. + for _, f := range ko.Strings("config") { + log.Printf("reading config from %s", f) + if err := ko.Load(file.Provider(f), toml.Parser()); err != nil { + log.Fatalf("error reading config: %v", err) } } - var ( - retries = 0 - backoff = getBackoffFn(bCfg) - err error - cl *kgo.Client - ) - - // retry until we can connect to kafka -outerLoop: - for retries < cfg.MaxRetries || cfg.MaxRetries == IndefiniteRetry { - select { - case <-ctx.Done(): - break outerLoop - default: - cl, err = kgo.NewClient(opts...) - if err != nil { - l.Error("error creating producer client", "err", err) - retries++ - waitTries(ctx, backoff(retries)) - continue - } - - // Get the destination topics - var topics []string - for _, v := range cfg.Topics { - topics = append(topics, v) - } + var cfg Config + if err := ko.Unmarshal("", &cfg); err != nil { + log.Fatalf("error marshalling application config: %v", err) + } - // test connectivity and ensures destination topics exists. - err = testConnection(cl, cfg.SessionTimeout, topics, cfg.TopicsPartition) - if err != nil { - l.Error("error connecting to producer", "err", err) - retries++ - waitTries(ctx, backoff(retries)) - continue + // If there are topics in the commandline flags, override the ones read from the file. + if topics := ko.Strings("topic"); len(topics) > 0 { + mp := map[string]string{} + for _, t := range topics { + split := strings.Split(t, ":") + if len(split) != 2 { + log.Fatalf("invalid topic '%s'. Should be in the format 'source:target'", t) } - if err == nil { - break outerLoop - } + mp[split[0]] = split[1] } + cfg.Topics = mp } - if err != nil { - return nil, err - } - - return cl, nil + return ko, cfg } // initProducer initializes the kafka producer client. @@ -160,7 +144,6 @@ func initConsumerGroup(ctx context.Context, cfg ConsumerGroupCfg, l *slog.Logger opts = append(opts, kgo.WithLogger(kgo.BasicLogger(os.Stdout, kgo.LogLevelDebug, nil))) } - // Add authentication if cfg.EnableAuth { opts = appendSASL(opts, cfg.ClientCfg) } @@ -244,7 +227,123 @@ func initFilterProviders(names []string, ko *koanf.Koanf, log *slog.Logger) (map return out, nil } -// getClient returns franz-go client with default config +func initMetricsServer(relay *Relay, addr string) http.Server { + mux := http.NewServeMux() + mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + buf := new(bytes.Buffer) + relay.getMetrics(buf) + + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + buf.WriteTo(w) + }) + + srv := http.Server{ + Addr: addr, + Handler: mux, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + } + + go func() { + err := srv.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + log.Printf("error starting server: %v", err) + } + }() + + return srv +} + +// getProducerClient returns a Kafka producer client. +func getProducerClient(ctx context.Context, cfg ProducerCfg, bCfg BackoffCfg, l *slog.Logger) (*kgo.Client, error) { + opts := []kgo.Opt{ + kgo.ProduceRequestTimeout(cfg.SessionTimeout), + kgo.RecordDeliveryTimeout(cfg.SessionTimeout), // break the :ProduceSync if it takes too long + kgo.ProducerBatchMaxBytes(int32(cfg.MaxMessageBytes)), + kgo.MaxBufferedRecords(cfg.FlushBatchSize), + kgo.ProducerLinger(cfg.FlushFrequency), + kgo.ProducerBatchCompression(getCompressionCodec(cfg.Compression)), + kgo.SeedBrokers(cfg.BootstrapBrokers...), + kgo.RequiredAcks(getAckPolicy(cfg.CommitAck)), + } + + // TCPAck/LeaderAck requires Kafka deduplication to be turned off. + if !cfg.EnableIdempotency { + opts = append(opts, kgo.DisableIdempotentWrite()) + } + + opts = append(opts, kgo.RecordPartitioner(kgo.ManualPartitioner())) + + if cfg.EnableAuth { + opts = appendSASL(opts, cfg.ClientCfg) + } + + if cfg.EnableTLS { + if cfg.CACertPath == "" && cfg.ClientCertPath == "" && cfg.ClientKeyPath == "" { + opts = append(opts, kgo.DialTLS()) + } else { + tlsOpt, err := createTLSConfig(cfg.CACertPath, cfg.ClientCertPath, cfg.ClientKeyPath) + if err != nil { + return nil, err + } + + // Set up TLS configuration + opts = append(opts, tlsOpt) + } + } + + var ( + retries = 0 + backoff = getBackoffFn(bCfg) + err error + cl *kgo.Client + ) + + // Retry until a successful connection. +outerLoop: + for retries < cfg.MaxRetries || cfg.MaxRetries == IndefiniteRetry { + select { + case <-ctx.Done(): + break outerLoop + default: + cl, err = kgo.NewClient(opts...) + if err != nil { + l.Error("error creating producer client", "err", err) + retries++ + waitTries(ctx, backoff(retries)) + continue + } + + // Get the destination topics + var topics []string + for _, v := range cfg.Topics { + topics = append(topics, v) + } + + // Test connectivity and ensure destination topics exists. + err = testConnection(cl, cfg.SessionTimeout, topics, cfg.TopicsPartition) + if err != nil { + l.Error("error connecting to producer", "err", err) + retries++ + waitTries(ctx, backoff(retries)) + continue + } + + if err == nil { + break outerLoop + } + } + } + + if err != nil { + return nil, err + } + + return cl, nil +} + +// getClient returns franz-go client with default config. func getClient(cfg ConsumerGroupCfg) (*kgo.Client, error) { opts := []kgo.Opt{ kgo.SeedBrokers(cfg.BootstrapBrokers...), @@ -256,7 +355,6 @@ func getClient(cfg ConsumerGroupCfg) (*kgo.Client, error) { opts = append(opts, kgo.WithLogger(kgo.BasicLogger(os.Stdout, kgo.LogLevelDebug, nil))) } - // Add authentication if cfg.EnableAuth { opts = appendSASL(opts, cfg.ClientCfg) } diff --git a/main.go b/main.go index 0244662..a39e1ab 100644 --- a/main.go +++ b/main.go @@ -1,22 +1,14 @@ package main import ( - "bytes" "context" - "fmt" "log" "log/slog" - "net/http" "os" "os/signal" "syscall" - "time" "github.com/VictoriaMetrics/metrics" - "github.com/knadh/koanf/parsers/toml" - "github.com/knadh/koanf/providers/file" - "github.com/knadh/koanf/v2" - flag "github.com/spf13/pflag" ) var ( @@ -24,98 +16,54 @@ var ( ) func main() { - // Initialize config - f := flag.NewFlagSet("config", flag.ContinueOnError) - f.Usage = func() { - fmt.Println(f.FlagUsages()) - os.Exit(0) - } - - var ( - configPath string - mode string - stopAtEnd bool - filterPaths []string - ) - f.StringVar(&configPath, "config", "config.toml", "Path to the TOML configuration file") - f.StringVar(&mode, "mode", "single", "single/failover") - f.BoolVar(&stopAtEnd, "stop-at-end", false, "Stop relay at the end of offsets") - f.StringSliceVar(&filterPaths, "filter", []string{}, "Path to filter providers. Can specify multiple values.") - f.Bool("version", false, "Current version of the build") - - if err := f.Parse(os.Args[1:]); err != nil { - log.Fatalf("error loading flags: %v", err) - } - - // Version flag. - if ok, _ := f.GetBool("version"); ok { - fmt.Println(buildString) - os.Exit(0) - } - - if stopAtEnd && mode == ModeFailover { - log.Fatalf("`--stop-at-end` cannot be used with `failover` mode") - } - - // Load the config file. - ko := koanf.New(".") - log.Printf("reading config: %s", configPath) - if err := ko.Load(file.Provider(configPath), toml.Parser()); err != nil { - log.Fatalf("error reading config: %v", err) - } + ko, cfg := initConfig() - var cfg Config - if err := ko.Unmarshal("", &cfg); err != nil { - log.Fatalf("error marshalling application config: %v", err) - } - - // setup logger + // Initialized the structured logger. logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ AddSource: false, Level: cfg.App.LogLevel, })) - // setup filter providers - filters, err := initFilterProviders(filterPaths, ko, logger) + // Load the optional filter providers. + filters, err := initFilterProviders(ko.Strings("filter"), ko, logger) if err != nil { log.Fatalf("error initializing filter provider: %v", err) } - // Assign topic mapping - var topics []string + // Set source consumer topics. + var srcTopics []string for t := range cfg.Topics { - topics = append(topics, t) + srcTopics = append(srcTopics, t) } - - // Set consumer topics for i := 0; i < len(cfg.Consumers); i++ { - cfg.Consumers[i].Topics = topics + cfg.Consumers[i].Topics = srcTopics } + + // Set src:target topic map on the producer. cfg.Producer.Topics = cfg.Topics - // Create context with interrupts signals + // Create context with interrupts signals. ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() - var ( - metr = metrics.NewSet() - ) + // Initialize metrics. + metr := metrics.NewSet() - // setup producer + // Initialize the producer. prod, err := initProducer(ctx, cfg.Producer, cfg.App.Backoff, metr, logger) if err != nil { log.Fatalf("error starting producer: %v", err) } prod.maxReqTime = cfg.App.MaxRequestDuration - // setup offsets manager with the destination offsets + // Setup offsets manager with the destination offsets. destOffsets, err := prod.GetEndOffsets(ctx, cfg.App.MaxRequestDuration) if err != nil { log.Fatalf("error fetching destination offsets: %v", err) } offsetMgr := &offsetManager{Offsets: destOffsets.KOffsets()} - // setup consumer hook, consumer + // Initialize the consumers. hookCh := make(chan struct{}, 1) var n = make([]Node, len(cfg.Consumers)) for i := 0; i < len(cfg.Consumers); i++ { @@ -125,19 +73,18 @@ func main() { } } - cons := &consumer{ - client: nil, // init during track healthy - cfgs: cfg.Consumers, - maxReqTime: cfg.App.MaxRequestDuration, - backoffCfg: cfg.App.Backoff, - offsetMgr: offsetMgr, - nodeTracker: NewNodeTracker(n), - l: logger, - } + // Initialize the Relay. + relay := &Relay{ + consumer: &consumer{ + client: nil, // init during track healthy + cfgs: cfg.Consumers, + maxReqTime: cfg.App.MaxRequestDuration, + backoffCfg: cfg.App.Backoff, + offsetMgr: offsetMgr, + nodeTracker: NewNodeTracker(n), + log: logger, + }, - // setup relay - relay := relay{ - consumer: cons, producer: prod, unhealthyCh: hookCh, @@ -153,7 +100,7 @@ func main() { lagThreshold: cfg.App.LagThreshold, maxReqTime: cfg.App.MaxRequestDuration, - stopAtEnd: stopAtEnd, + stopAtEnd: ko.Bool("stop-at-end"), srcOffsets: make(map[string]map[int32]int64), destOffsets: destOffsets.KOffsets(), @@ -162,41 +109,16 @@ func main() { nodeCh: make(chan int, 1), } - // Start metrics handler - mux := http.NewServeMux() - mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { - buf := new(bytes.Buffer) - relay.getMetrics(buf) - - w.Header().Set("Content-Type", "text/plain") - w.WriteHeader(http.StatusOK) - buf.WriteTo(w) - }) - - srv := http.Server{ - Addr: cfg.App.MetricsServerAddr, - Handler: mux, - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - } - - go func() { - err := srv.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - log.Printf("error starting server: %v", err) - } - }() + // Start metrics HTTP server. + metrics := initMetricsServer(relay, cfg.App.MetricsServerAddr) - // Start forwarder daemon + // Start the blocking relay. if err := relay.Start(ctx); err != nil { - relay.logger.Error("error starting relay", "err", err) + log.Fatalf("error starting relay: %v", err) } - // shutdown server - srv.Shutdown(ctx) - - // close underlying client connections + metrics.Shutdown(ctx) relay.Close() - logger.Info("done!") + logger.Info("done") } diff --git a/relay.go b/relay.go index b9fad78..415897b 100644 --- a/relay.go +++ b/relay.go @@ -14,8 +14,8 @@ import ( "github.com/zerodha/kaf-relay/filter" ) -// relay represents the input, output kafka and the remapping necessary to forward messages from one topic to another. -type relay struct { +// Relay represents the input, output kafka and the remapping necessary to forward messages from one topic to another. +type Relay struct { consumer *consumer producer *producer @@ -52,12 +52,12 @@ type relay struct { } // getMetrics writes the internal prom metrics to the given io.Writer -func (r *relay) getMetrics(buf io.Writer) { +func (r *Relay) getMetrics(buf io.Writer) { r.metrics.WritePrometheus(buf) } // Start starts the consumer loop on kafka (A), fetch messages and relays over to kafka (B) using an async -func (r *relay) Start(ctx context.Context) error { +func (r *Relay) Start(ctx context.Context) error { // wait for all goroutines to exit wg := &sync.WaitGroup{} defer wg.Wait() @@ -109,14 +109,14 @@ func (r *relay) Start(ctx context.Context) error { } // Close close the underlying kgo.Client(s) -func (r *relay) Close() { +func (r *Relay) Close() { r.logger.Debug("closing relay consumer, producer...") r.consumer.Close() r.producer.Close() } // startConsumerWorker starts the consumer worker which polls the kafka cluster for messages. -func (r *relay) startConsumerWorker(ctx context.Context) error { +func (r *Relay) startConsumerWorker(ctx context.Context) error { backoff := getBackoffFn(r.backoffCfg) pollCtx, pollCancelFn := context.WithCancel(context.Background()) @@ -255,7 +255,7 @@ pollLoop: } // processMessage processes the given message and forwards it to the producer batch channel. -func (r *relay) processMessage(ctx context.Context, rec *kgo.Record) error { +func (r *Relay) processMessage(ctx context.Context, rec *kgo.Record) error { // Decrement the end offsets for the given topic and partition till we reach 0 if r.stopAtEnd { r.decrementSourceOffset(rec.Topic, rec.Partition) @@ -302,7 +302,7 @@ func (r *relay) processMessage(ctx context.Context, rec *kgo.Record) error { } // trackHealthy tracks the healthy node in the cluster and checks if the lag threshold is exceeded. -func (r *relay) trackHealthy(ctx context.Context) error { +func (r *Relay) trackHealthy(ctx context.Context) error { tick := time.NewTicker(r.nodeHealthCheckFreq) defer tick.Stop() @@ -425,7 +425,7 @@ func (r *relay) trackHealthy(ctx context.Context) error { } // decrementSourceOffset decrements the offset count for the given topic and partition in the source offsets map. -func (r *relay) decrementSourceOffset(topic string, partition int32) { +func (r *Relay) decrementSourceOffset(topic string, partition int32) { if topicOffsets, ok := r.srcOffsets[topic]; ok { if offset, found := topicOffsets[partition]; found && offset > 0 { topicOffsets[partition]-- @@ -435,7 +435,7 @@ func (r *relay) decrementSourceOffset(topic string, partition int32) { } // setSourceOffsets sets the end offsets of the consumer during bootup to exit on consuming everything. -func (r *relay) setSourceOffsets(of kadm.ListedOffsets) { +func (r *Relay) setSourceOffsets(of kadm.ListedOffsets) { of.Each(func(lo kadm.ListedOffset) { ct, ok := r.srcOffsets[lo.Topic] if !ok {