Skip to content

Commit

Permalink
Add an enabled flag to filter config for better control.
Browse files Browse the repository at this point in the history
  • Loading branch information
knadh committed May 3, 2024
1 parent c0967d1 commit 5ccccee
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
4 changes: 3 additions & 1 deletion config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ request_timeout = "100ms"

# Custom go-plugin filter to load to filter messages when relaying
[filters.test]
enabled = false
path = "test.bin"
config = '''
{
"address": ["127.0.0.1:6379"],
Expand All @@ -130,4 +132,4 @@ config = '''
"write_timeout": 3000,
"idle_timeout": 30000
}
'''
'''
21 changes: 14 additions & 7 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"log/slog"
"net/http"
"os"
"path/filepath"
"plugin"
"strconv"
"strings"
Expand Down Expand Up @@ -196,13 +195,21 @@ func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerC

// initFilters loads the go plugin, initializes it and return a map of filter plugins.
func initFilters(ko *koanf.Koanf, lo *slog.Logger) (map[string]filter.Provider, error) {
if ko.String("mode") != "single" {
log.Fatalf("filters can only be used in `single` mode.")
}

out := make(map[string]filter.Provider)
for _, name := range ko.MapKeys("filters") {
plg, err := plugin.Open(name)
for _, id := range ko.MapKeys("filters") {
if !ko.Bool("filters." + id + ".enabled") {
continue
}

path := ko.String("filters." + id + ".path")
plg, err := plugin.Open(path)
if err != nil {
return nil, fmt.Errorf("error loading provider plugin '%s': %v", name, err)
return nil, fmt.Errorf("error loading provider plugin '%s': %s: %v", id, path, err)
}
id := strings.TrimSuffix(filepath.Base(name), filepath.Ext(name))

newFunc, err := plg.Lookup("New")
if err != nil {
Expand All @@ -215,7 +222,7 @@ func initFilters(ko *koanf.Koanf, lo *slog.Logger) (map[string]filter.Provider,

var cfg filter.Config
if err := ko.Unmarshal("filter."+id, &cfg); err != nil {
log.Fatalf("error unmarshalling filter config: %s: %v", name, err)
log.Fatalf("error unmarshalling filter config: %s: %v", id, err)
}
if cfg.Config == "" {
lo.Info(fmt.Sprintf("WARNING: No config 'filter.%s' for '%s' in config", id, id))
Expand All @@ -226,7 +233,7 @@ func initFilters(ko *koanf.Koanf, lo *slog.Logger) (map[string]filter.Provider,
if err != nil {
return nil, fmt.Errorf("error initializing filter provider plugin '%s': %v", id, err)
}
lo.Info(fmt.Sprintf("loaded filter provider plugin '%s' from %s", id, name))
lo.Info(fmt.Sprintf("loaded filter provider plugin '%s' from %s", id, id))

p, ok := prov.(filter.Provider)
if !ok {
Expand Down

0 comments on commit 5ccccee

Please sign in to comment.