Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [target allocator] Improvements to scrape config hashing #1545

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions cmd/otel-allocator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
promconfig "github.com/prometheus/prometheus/config"
_ "github.com/prometheus/prometheus/discovery/install"
"github.com/spf13/pflag"
"github.com/zeebo/xxh3"
"gopkg.in/yaml.v2"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -76,24 +77,40 @@ type CLIConfig struct {
PromCRWatcherConf PrometheusCRWatcherConfig
}

// LoadConfig loads the target allocator configuration from the given file.
func Load(file string) (Config, error) {
var cfg Config
if err := unmarshal(&cfg, file); err != nil {
if _, err := unmarshal(&cfg, file, false); err != nil {
return Config{}, err
}
return cfg, nil
}

func unmarshal(cfg *Config, configFile string) error {
// LoadForPromCfg loads only Prometheus configuration from the given file.
// It also returns the hash of the configuration for comparison.
func LoadForPromCfg(file string) (*promconfig.Config, uint64, error) {
var cfg Config
hash, err := unmarshal(&cfg, file, true)
if err != nil {
return nil, 0, err
}
return cfg.Config, hash, nil
}

func unmarshal(cfg *Config, configFile string, withHash bool) (uint64, error) {
yamlFile, err := os.ReadFile(configFile)
if err != nil {
return err
return 0, err
}
if err = yaml.UnmarshalStrict(yamlFile, cfg); err != nil {
return fmt.Errorf("error unmarshaling YAML: %w", err)
return 0, fmt.Errorf("error unmarshaling YAML: %w", err)
}
return nil

if !withHash {
return 0, nil
}

return xxh3.Hash(yamlFile), nil
}

func ParseCLI() (CLIConfig, error) {
Expand Down
3 changes: 2 additions & 1 deletion cmd/otel-allocator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ require (
github.com/gin-gonic/gin v1.9.0
github.com/go-kit/log v0.2.1
github.com/go-logr/logr v1.2.3
github.com/mitchellh/hashstructure v1.1.0
github.com/oklog/run v1.1.0
github.com/prometheus-operator/prometheus-operator v0.63.0
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.63.0
Expand All @@ -20,6 +19,7 @@ require (
github.com/prometheus/prometheus v1.8.2-0.20211214150951-52c693a63be1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.2
github.com/zeebo/xxh3 v1.0.2
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.26.1
k8s.io/apimachinery v0.26.1
Expand Down Expand Up @@ -129,6 +129,7 @@ require (
github.com/metalmatze/signal v0.0.0-20210307161603-1c9aa721a97a // indirect
github.com/miekg/dns v1.1.50 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/hashstructure v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand Down
3 changes: 3 additions & 0 deletions cmd/otel-allocator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,9 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.mongodb.org/mongo-driver v1.7.3/go.mod h1:NqaYOwnXWr5Pm7AOpO5QFxKJ503nbMse/R79oO62zWg=
go.mongodb.org/mongo-driver v1.7.5/go.mod h1:VXEWRZ6URJIkUq2SCAyapmhH0ZLRBP+FT4xhp5Zvxng=
go.mongodb.org/mongo-driver v1.10.0/go.mod h1:wsihk0Kdgv8Kqu1Anit4sfK+22vSFbUrAVEYRhCXrA8=
Expand Down
5 changes: 4 additions & 1 deletion cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,14 @@ func main() {
select {
case event := <-eventChan:
eventsMetric.WithLabelValues(event.Source.String()).Inc()
loadConfig, err := event.Watcher.LoadConfig()
loadConfig, changed, err := event.Watcher.LoadConfig()
if err != nil {
setupLog.Error(err, "Unable to load configuration")
continue
}
if !changed {
continue
}
err = targetDiscoverer.ApplyConfig(event.Source, loadConfig)
if err != nil {
setupLog.Error(err, "Unable to apply configuration")
Expand Down
15 changes: 2 additions & 13 deletions cmd/otel-allocator/target/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package target

import (
"github.com/go-logr/logr"
"github.com/mitchellh/hashstructure"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
Expand All @@ -41,7 +40,6 @@ type Discoverer struct {
configsMap map[allocatorWatcher.EventSource]*config.Config
jobToScrapeConfig map[string]*config.ScrapeConfig
hook discoveryHook
scrapeConfigsHash uint64
scrapeConfigsUpdater scrapeConfigsUpdater
}

Expand Down Expand Up @@ -79,19 +77,10 @@ func (m *Discoverer) ApplyConfig(source allocatorWatcher.EventSource, cfg *confi
}
}

hash, err := hashstructure.Hash(m.jobToScrapeConfig, nil)
if err != nil {
return err
}
// If the hash has changed, updated stored hash and send the new config.
// Otherwise skip updating scrape configs.
if m.scrapeConfigsUpdater != nil && m.scrapeConfigsHash != hash {
err := m.scrapeConfigsUpdater.UpdateScrapeConfigResponse(m.jobToScrapeConfig)
if err != nil {
if m.scrapeConfigsUpdater != nil && len(m.jobToScrapeConfig) > 0 {
if err := m.scrapeConfigsUpdater.UpdateScrapeConfigResponse(m.jobToScrapeConfig); err != nil {
return err
}

m.scrapeConfigsHash = hash
}

if m.hook != nil {
Expand Down
8 changes: 1 addition & 7 deletions cmd/otel-allocator/target/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,7 @@ func TestDiscovery_ScrapeConfigHashing(t *testing.T) {
// },
// },
}
var (
lastValidHash uint64
lastValidConfig map[string]*promconfig.ScrapeConfig
)
var lastValidConfig map[string]*promconfig.ScrapeConfig

scu := &mockScrapeConfigUpdater{}
ctx := context.Background()
Expand All @@ -299,18 +296,15 @@ func TestDiscovery_ScrapeConfigHashing(t *testing.T) {
err := manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, tc.cfg)
if !tc.expectErr {
assert.NoError(t, err)
assert.NotZero(t, manager.scrapeConfigsHash)
// Assert that scrape configs in manager are correctly
// reflected in the scrape job updater.
assert.Equal(t, manager.jobToScrapeConfig, scu.mockCfg)

lastValidHash = manager.scrapeConfigsHash
lastValidConfig = manager.jobToScrapeConfig
} else {
// In case of error, assert that we retain the last
// known valid config.
assert.Error(t, err)
assert.Equal(t, lastValidHash, manager.scrapeConfigsHash)
assert.Equal(t, lastValidConfig, manager.jobToScrapeConfig)
assert.Equal(t, lastValidConfig, scu.mockCfg)
}
Expand Down
14 changes: 10 additions & 4 deletions cmd/otel-allocator/watcher/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var _ Watcher = &FileWatcher{}
type FileWatcher struct {
logger logr.Logger
configFilePath string
lastConfigHash uint64
watcher *fsnotify.Watcher
closer chan bool
}
Expand All @@ -48,13 +49,18 @@ func NewFileWatcher(logger logr.Logger, config config.CLIConfig) (*FileWatcher,
}, nil
}

func (f *FileWatcher) LoadConfig() (*promconfig.Config, error) {
cfg, err := config.Load(f.configFilePath)
func (f *FileWatcher) LoadConfig() (*promconfig.Config, bool, error) {
promCfg, hash, err := config.LoadForPromCfg(f.configFilePath)
if err != nil {
f.logger.Error(err, "Unable to load configuration")
return nil, err
return nil, false, err
}
return cfg.Config, nil
if hash == f.lastConfigHash {
return nil, false, nil
}

f.lastConfigHash = hash
return promCfg, true, nil
}

func (f *FileWatcher) Watch(upstreamEvents chan Event, upstreamErrors chan error) error {
Expand Down
23 changes: 17 additions & 6 deletions cmd/otel-allocator/watcher/promOperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/prometheus-operator/prometheus-operator/pkg/prometheus"
promconfig "github.com/prometheus/prometheus/config"
kubeDiscovery "github.com/prometheus/prometheus/discovery/kubernetes"
"github.com/zeebo/xxh3"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -81,6 +82,7 @@ type PrometheusCRWatcher struct {
informers map[string]*informers.ForResource
stopChannel chan struct{}
configGenerator *prometheus.ConfigGenerator
lastConfigHash uint64
kubeConfigPath string

serviceMonitorSelector labels.Selector
Expand Down Expand Up @@ -133,7 +135,7 @@ func (w *PrometheusCRWatcher) Close() error {
return nil
}

func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) {
func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, bool, error) {
serviceMonitorInstances := make(map[string]*monitoringv1.ServiceMonitor)

smRetrieveErr := w.informers[monitoringv1.ServiceMonitorName].ListAll(w.serviceMonitorSelector, func(sm interface{}) {
Expand All @@ -142,7 +144,7 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) {
serviceMonitorInstances[key] = monitor
})
if smRetrieveErr != nil {
return nil, smRetrieveErr
return nil, false, smRetrieveErr
}

podMonitorInstances := make(map[string]*monitoringv1.PodMonitor)
Expand All @@ -152,7 +154,7 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) {
podMonitorInstances[key] = monitor
})
if pmRetrieveErr != nil {
return nil, pmRetrieveErr
return nil, false, pmRetrieveErr
}

store := assets.Store{
Expand All @@ -173,13 +175,22 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) {
}
generatedConfig, err := w.configGenerator.Generate(prom, serviceMonitorInstances, podMonitorInstances, map[string]*monitoringv1.Probe{}, &store, nil, nil, nil, []string{})
if err != nil {
return nil, err
return nil, false, err
}

if len(generatedConfig) > 0 {
newHash := xxh3.Hash(generatedConfig)
// If not changed, shortcut and return without config change.
if newHash == w.lastConfigHash {
return nil, false, nil
}
w.lastConfigHash = newHash
}

promCfg := &promconfig.Config{}
unmarshalErr := yaml.Unmarshal(generatedConfig, promCfg)
if unmarshalErr != nil {
return nil, unmarshalErr
return nil, false, unmarshalErr
}

// set kubeconfig path to service discovery configs, else kubernetes_sd will always attempt in-cluster
Expand All @@ -192,5 +203,5 @@ func (w *PrometheusCRWatcher) LoadConfig() (*promconfig.Config, error) {
}
}
}
return promCfg, nil
return promCfg, true, nil
}
4 changes: 3 additions & 1 deletion cmd/otel-allocator/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import promconfig "github.com/prometheus/prometheus/config"
type Watcher interface {
// Watch watcher and supply channels which will receive change events
Watch(upstreamEvents chan Event, upstreamErrors chan error) error
LoadConfig() (*promconfig.Config, error)
// LoadConfig loads the Prometheus config from the event. It also returns a boolean
// indicating whether the config has changed since the last time it was loaded.
LoadConfig() (*promconfig.Config, bool, error)
Close() error
}

Expand Down