diff --git a/config.sample.toml b/config.sample.toml index 2aa5301..8562751 100644 --- a/config.sample.toml +++ b/config.sample.toml @@ -117,6 +117,8 @@ request_timeout = "100ms" # Custom go-plugin filter to load to filter messages when relaying [filters.test] +enabled = false +path = "test.bin" config = ''' { "address": ["127.0.0.1:6379"], @@ -130,4 +132,4 @@ config = ''' "write_timeout": 3000, "idle_timeout": 30000 } -''' \ No newline at end of file +''' diff --git a/init.go b/init.go index 1f4a58e..510e9d0 100644 --- a/init.go +++ b/init.go @@ -8,7 +8,6 @@ import ( "log/slog" "net/http" "os" - "path/filepath" "plugin" "strconv" "strings" @@ -196,13 +195,21 @@ func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerC // initFilters loads the go plugin, initializes it and return a map of filter plugins. func initFilters(ko *koanf.Koanf, lo *slog.Logger) (map[string]filter.Provider, error) { + if ko.String("mode") != "single" { + log.Fatalf("filters can only be used in `single` mode.") + } + out := make(map[string]filter.Provider) - for _, name := range ko.MapKeys("filters") { - plg, err := plugin.Open(name) + for _, id := range ko.MapKeys("filters") { + if !ko.Bool("filters." + id + ".enabled") { + continue + } + + path := ko.String("filters." + id + ".path") + plg, err := plugin.Open(path) if err != nil { - return nil, fmt.Errorf("error loading provider plugin '%s': %v", name, err) + return nil, fmt.Errorf("error loading provider plugin '%s': %s: %v", id, path, err) } - id := strings.TrimSuffix(filepath.Base(name), filepath.Ext(name)) newFunc, err := plg.Lookup("New") if err != nil { @@ -215,7 +222,7 @@ func initFilters(ko *koanf.Koanf, lo *slog.Logger) (map[string]filter.Provider, var cfg filter.Config if err := ko.Unmarshal("filter."+id, &cfg); err != nil { - log.Fatalf("error unmarshalling filter config: %s: %v", name, err) + log.Fatalf("error unmarshalling filter config: %s: %v", id, err) } if cfg.Config == "" { lo.Info(fmt.Sprintf("WARNING: No config 'filter.%s' for '%s' in config", id, id)) @@ -226,7 +233,7 @@ func initFilters(ko *koanf.Koanf, lo *slog.Logger) (map[string]filter.Provider, if err != nil { return nil, fmt.Errorf("error initializing filter provider plugin '%s': %v", id, err) } - lo.Info(fmt.Sprintf("loaded filter provider plugin '%s' from %s", id, name)) + lo.Info(fmt.Sprintf("loaded filter provider plugin '%s' from %s", id, id)) p, ok := prov.(filter.Provider) if !ok {