diff --git a/cmd/otel-allocator/config/config.go b/cmd/otel-allocator/config/config.go index 7185b5ce8a..63009f9366 100644 --- a/cmd/otel-allocator/config/config.go +++ b/cmd/otel-allocator/config/config.go @@ -27,6 +27,7 @@ import ( promconfig "github.com/prometheus/prometheus/config" _ "github.com/prometheus/prometheus/discovery/install" "github.com/spf13/pflag" + "github.com/zeebo/xxh3" "gopkg.in/yaml.v2" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -76,24 +77,40 @@ type CLIConfig struct { PromCRWatcherConf PrometheusCRWatcherConfig } +// LoadConfig loads the target allocator configuration from the given file. func Load(file string) (Config, error) { var cfg Config - if err := unmarshal(&cfg, file); err != nil { + if _, err := unmarshal(&cfg, file, false); err != nil { return Config{}, err } return cfg, nil } -func unmarshal(cfg *Config, configFile string) error { +// LoadForPromCfg loads only Prometheus configuration from the given file. +// It also returns the hash of the configuration for comparison. +func LoadForPromCfg(file string) (*promconfig.Config, uint64, error) { + var cfg Config + hash, err := unmarshal(&cfg, file, true) + if err != nil { + return nil, 0, err + } + return cfg.Config, hash, nil +} +func unmarshal(cfg *Config, configFile string, withHash bool) (uint64, error) { yamlFile, err := os.ReadFile(configFile) if err != nil { - return err + return 0, err } if err = yaml.UnmarshalStrict(yamlFile, cfg); err != nil { - return fmt.Errorf("error unmarshaling YAML: %w", err) + return 0, fmt.Errorf("error unmarshaling YAML: %w", err) } - return nil + + if !withHash { + return 0, nil + } + + return xxh3.Hash(yamlFile), nil } func ParseCLI() (CLIConfig, error) { diff --git a/cmd/otel-allocator/go.mod b/cmd/otel-allocator/go.mod index 9076b1b344..e34ca9548f 100644 --- a/cmd/otel-allocator/go.mod +++ b/cmd/otel-allocator/go.mod @@ -10,7 +10,6 @@ require ( github.com/gin-gonic/gin v1.9.0 github.com/go-kit/log v0.2.1 github.com/go-logr/logr v1.2.3 - github.com/mitchellh/hashstructure v1.1.0 github.com/oklog/run v1.1.0 github.com/prometheus-operator/prometheus-operator v0.63.0 github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.63.0 @@ -20,6 +19,7 @@ require ( github.com/prometheus/prometheus v1.8.2-0.20211214150951-52c693a63be1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.2 + github.com/zeebo/xxh3 v1.0.2 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.26.1 k8s.io/apimachinery v0.26.1 @@ -129,6 +129,7 @@ require ( github.com/metalmatze/signal v0.0.0-20210307161603-1c9aa721a97a // indirect github.com/miekg/dns v1.1.50 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/mitchellh/hashstructure v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect diff --git a/cmd/otel-allocator/go.sum b/cmd/otel-allocator/go.sum index ade31422d6..16ef665f96 100644 --- a/cmd/otel-allocator/go.sum +++ b/cmd/otel-allocator/go.sum @@ -690,6 +690,9 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.mongodb.org/mongo-driver v1.7.3/go.mod h1:NqaYOwnXWr5Pm7AOpO5QFxKJ503nbMse/R79oO62zWg= go.mongodb.org/mongo-driver v1.7.5/go.mod h1:VXEWRZ6URJIkUq2SCAyapmhH0ZLRBP+FT4xhp5Zvxng= go.mongodb.org/mongo-driver v1.10.0/go.mod h1:wsihk0Kdgv8Kqu1Anit4sfK+22vSFbUrAVEYRhCXrA8= diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index fbb96e6406..41e81b4a64 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -189,11 +189,14 @@ func main() { select { case event := <-eventChan: eventsMetric.WithLabelValues(event.Source.String()).Inc() - loadConfig, err := event.Watcher.LoadConfig() + loadConfig, changed, err := event.Watcher.LoadConfig() if err != nil { setupLog.Error(err, "Unable to load configuration") continue } + if !changed { + continue + } err = targetDiscoverer.ApplyConfig(event.Source, loadConfig) if err != nil { setupLog.Error(err, "Unable to apply configuration") diff --git a/cmd/otel-allocator/target/discovery.go b/cmd/otel-allocator/target/discovery.go index 9b76399709..25d9b442e8 100644 --- a/cmd/otel-allocator/target/discovery.go +++ b/cmd/otel-allocator/target/discovery.go @@ -16,7 +16,6 @@ package target import ( "github.com/go-logr/logr" - "github.com/mitchellh/hashstructure" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -41,7 +40,6 @@ type Discoverer struct { configsMap map[allocatorWatcher.EventSource]*config.Config jobToScrapeConfig map[string]*config.ScrapeConfig hook discoveryHook - scrapeConfigsHash uint64 scrapeConfigsUpdater scrapeConfigsUpdater } @@ -79,19 +77,10 @@ func (m *Discoverer) ApplyConfig(source allocatorWatcher.EventSource, cfg *confi } } - hash, err := hashstructure.Hash(m.jobToScrapeConfig, nil) - if err != nil { - return err - } - // If the hash has changed, updated stored hash and send the new config. - // Otherwise skip updating scrape configs. - if m.scrapeConfigsUpdater != nil && m.scrapeConfigsHash != hash { - err := m.scrapeConfigsUpdater.UpdateScrapeConfigResponse(m.jobToScrapeConfig) - if err != nil { + if m.scrapeConfigsUpdater != nil && len(m.jobToScrapeConfig) > 0 { + if err := m.scrapeConfigsUpdater.UpdateScrapeConfigResponse(m.jobToScrapeConfig); err != nil { return err } - - m.scrapeConfigsHash = hash } if m.hook != nil { diff --git a/cmd/otel-allocator/target/discovery_test.go b/cmd/otel-allocator/target/discovery_test.go index 6230c958a0..89882e3f22 100644 --- a/cmd/otel-allocator/target/discovery_test.go +++ b/cmd/otel-allocator/target/discovery_test.go @@ -284,10 +284,7 @@ func TestDiscovery_ScrapeConfigHashing(t *testing.T) { // }, // }, } - var ( - lastValidHash uint64 - lastValidConfig map[string]*promconfig.ScrapeConfig - ) + var lastValidConfig map[string]*promconfig.ScrapeConfig scu := &mockScrapeConfigUpdater{} ctx := context.Background() @@ -299,18 +296,15 @@ func TestDiscovery_ScrapeConfigHashing(t *testing.T) { err := manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, tc.cfg) if !tc.expectErr { assert.NoError(t, err) - assert.NotZero(t, manager.scrapeConfigsHash) // Assert that scrape configs in manager are correctly // reflected in the scrape job updater. assert.Equal(t, manager.jobToScrapeConfig, scu.mockCfg) - lastValidHash = manager.scrapeConfigsHash lastValidConfig = manager.jobToScrapeConfig } else { // In case of error, assert that we retain the last // known valid config. assert.Error(t, err) - assert.Equal(t, lastValidHash, manager.scrapeConfigsHash) assert.Equal(t, lastValidConfig, manager.jobToScrapeConfig) assert.Equal(t, lastValidConfig, scu.mockCfg) } diff --git a/cmd/otel-allocator/watcher/file.go b/cmd/otel-allocator/watcher/file.go index df561b9188..cba6e5df9f 100644 --- a/cmd/otel-allocator/watcher/file.go +++ b/cmd/otel-allocator/watcher/file.go @@ -29,6 +29,7 @@ var _ Watcher = &FileWatcher{} type FileWatcher struct { logger logr.Logger configFilePath string + lastConfigHash uint64 watcher *fsnotify.Watcher closer chan bool } @@ -48,13 +49,18 @@ func NewFileWatcher(logger logr.Logger, config config.CLIConfig) (*FileWatcher, }, nil } -func (f *FileWatcher) LoadConfig() (*promconfig.Config, error) { - cfg, err := config.Load(f.configFilePath) +func (f *FileWatcher) LoadConfig() (*promconfig.Config, bool, error) { + promCfg, hash, err := config.LoadForPromCfg(f.configFilePath) if err != nil { f.logger.Error(err, "Unable to load configuration") - return nil, err + return nil, false, err } - return cfg.Config, nil + if hash == f.lastConfigHash { + return nil, false, nil + } + + f.lastConfigHash = hash + return promCfg, true, nil } func (f *FileWatcher) Watch(upstreamEvents chan Event, upstreamErrors chan error) error { diff --git a/cmd/otel-allocator/watcher/promOperator.go b/cmd/otel-allocator/watcher/promOperator.go index 92855b5bd2..651e41f9c0 100644 --- a/cmd/otel-allocator/watcher/promOperator.go +++ b/cmd/otel-allocator/watcher/promOperator.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus-operator/prometheus-operator/pkg/prometheus" promconfig "github.com/prometheus/prometheus/config" kubeDiscovery "github.com/prometheus/prometheus/discovery/kubernetes" + "github.com/zeebo/xxh3" "gopkg.in/yaml.v2" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -81,6 +82,7 @@ type PrometheusCRWatcher struct { informers map[string]*informers.ForResource stopChannel chan struct{} configGenerator *prometheus.ConfigGenerator + lastConfigHash uint64 kubeConfigPath string serviceMonitorSelector labels.Selector @@ -133,7 +135,7 @@ func (w *PrometheusCRWatcher) Close() error { return nil } -func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) { +func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, bool, error) { serviceMonitorInstances := make(map[string]*monitoringv1.ServiceMonitor) smRetrieveErr := w.informers[monitoringv1.ServiceMonitorName].ListAll(w.serviceMonitorSelector, func(sm interface{}) { @@ -142,7 +144,7 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) { serviceMonitorInstances[key] = monitor }) if smRetrieveErr != nil { - return nil, smRetrieveErr + return nil, false, smRetrieveErr } podMonitorInstances := make(map[string]*monitoringv1.PodMonitor) @@ -152,7 +154,7 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) { podMonitorInstances[key] = monitor }) if pmRetrieveErr != nil { - return nil, pmRetrieveErr + return nil, false, pmRetrieveErr } store := assets.Store{ @@ -173,13 +175,22 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) { } generatedConfig, err := w.configGenerator.Generate(prom, serviceMonitorInstances, podMonitorInstances, map[string]*monitoringv1.Probe{}, &store, nil, nil, nil, []string{}) if err != nil { - return nil, err + return nil, false, err + } + + if len(generatedConfig) > 0 { + newHash := xxh3.Hash(generatedConfig) + // If not changed, shortcut and return without config change. + if newHash == w.lastConfigHash { + return nil, false, nil + } + w.lastConfigHash = newHash } promCfg := &promconfig.Config{} unmarshalErr := yaml.Unmarshal(generatedConfig, promCfg) if unmarshalErr != nil { - return nil, unmarshalErr + return nil, false, unmarshalErr } // set kubeconfig path to service discovery configs, else kubernetes_sd will always attempt in-cluster @@ -192,5 +203,5 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) { } } } - return promCfg, nil + return promCfg, true, nil } diff --git a/cmd/otel-allocator/watcher/watcher.go b/cmd/otel-allocator/watcher/watcher.go index a1b0e99fa8..67514f81e5 100644 --- a/cmd/otel-allocator/watcher/watcher.go +++ b/cmd/otel-allocator/watcher/watcher.go @@ -19,7 +19,9 @@ import promconfig "github.com/prometheus/prometheus/config" type Watcher interface { // Watch watcher and supply channels which will receive change events Watch(upstreamEvents chan Event, upstreamErrors chan error) error - LoadConfig() (*promconfig.Config, error) + // LoadConfig loads the Prometheus config from the event. It also returns a boolean + // indicating whether the config has changed since the last time it was loaded. + LoadConfig() (*promconfig.Config, bool, error) Close() error }