Skip to content

Commit

Permalink
Fix prometheus init data race
Browse files Browse the repository at this point in the history
  • Loading branch information
muXxer committed May 8, 2024
1 parent 521e90a commit df684db
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 57 deletions.
26 changes: 15 additions & 11 deletions components/prometheus/collector/collection.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,41 @@
package collector

import (
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/runtime/options"
)

type Collection struct {
CollectionName string
metrics map[string]*Metric
metrics *shrinkingmap.ShrinkingMap[string, *Metric]
}

func NewCollection(name string, opts ...options.Option[Collection]) *Collection {
return options.Apply(&Collection{
CollectionName: name,
metrics: make(map[string]*Metric),
}, opts, func(c *Collection) {
for _, m := range c.metrics {
m.Namespace = c.CollectionName
m.initPromMetric()
}
metrics: shrinkingmap.New[string, *Metric](),
}, opts, func(collection *Collection) {
collection.metrics.ForEach(func(_ string, metric *Metric) bool {
metric.Namespace = collection.CollectionName
metric.initPromMetric()

return true
})
})
}

func (c *Collection) GetMetric(metricName string) *Metric {
if metric, exists := c.metrics[metricName]; exists {
return metric
metric, exists := c.metrics.Get(metricName)
if !exists {
return nil
}

return nil
return metric
}

func (c *Collection) addMetric(metric *Metric) {
if metric != nil {
c.metrics[metric.Name] = metric
c.metrics.Set(metric.Name, metric)
}
}

Expand Down
55 changes: 32 additions & 23 deletions components/prometheus/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,49 @@ package collector

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/iotaledger/hive.go/ds/shrinkingmap"
)

// Collector is responsible for creation and collection of metrics for the prometheus.
type Collector struct {
Registry *prometheus.Registry
collections map[string]*Collection
collections *shrinkingmap.ShrinkingMap[string, *Collection]
}

// New creates an instance of Manager and creates a new prometheus registry for the protocol metrics collection.
func New() *Collector {
return &Collector{
Registry: prometheus.NewRegistry(),
collections: make(map[string]*Collection),
collections: shrinkingmap.New[string, *Collection](),
}
}

func (c *Collector) RegisterCollection(coll *Collection) {
c.collections[coll.CollectionName] = coll
for _, m := range coll.metrics {
c.Registry.MustRegister(m.promMetric)
if m.initValueFunc != nil {
metricValue, labelValues := m.initValueFunc()
m.update(metricValue, labelValues...)
func (c *Collector) RegisterCollection(collection *Collection) {
c.collections.Set(collection.CollectionName, collection)
collection.metrics.ForEach(func(_ string, metric *Metric) bool {
c.Registry.MustRegister(metric.promMetric)
if metric.initValueFunc != nil {
metricValue, labelValues := metric.initValueFunc()
metric.update(metricValue, labelValues...)
}
if m.initFunc != nil {
m.initFunc()
if metric.initFunc != nil {
metric.initFunc()
}
}

return true
})
}

// Collect collects all metrics from the registered collections.
func (c *Collector) Collect() {
for _, collection := range c.collections {
for _, metric := range collection.metrics {
c.collections.ForEach(func(_ string, collection *Collection) bool {
collection.metrics.ForEach(func(_ string, metric *Metric) bool {
metric.collect()
}
}
return true
})
return true

Check failure on line 46 in components/prometheus/collector/collector.go

View workflow job for this annotation

GitHub Actions / GolangCI-Lint

return with no blank line before (nlreturn)
})
}

// Update updates the value of the existing metric defined by the subsystem and metricName.
Expand Down Expand Up @@ -78,11 +84,13 @@ func (c *Collector) ResetMetric(namespace string, metricName string) {
}

func (c *Collector) Shutdown() {
for _, collection := range c.collections {
for _, metric := range collection.metrics {
c.collections.ForEach(func(_ string, collection *Collection) bool {
collection.metrics.ForEach(func(_ string, metric *Metric) bool {
metric.shutdown()
}
}
return true
})
return true

Check failure on line 92 in components/prometheus/collector/collector.go

View workflow job for this annotation

GitHub Actions / GolangCI-Lint

return with no blank line before (nlreturn)
})
}

func (c *Collector) getMetric(subsystem string, metricName string) *Metric {
Expand All @@ -95,9 +103,10 @@ func (c *Collector) getMetric(subsystem string, metricName string) *Metric {
}

func (c *Collector) getCollection(subsystem string) *Collection {
if collection, exists := c.collections[subsystem]; exists {
return collection
collection, exists := c.collections.Get(subsystem)
if !exists {
return nil
}

return nil
return collection
}
47 changes: 24 additions & 23 deletions components/prometheus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import (

func init() {
Component = &app.Component{
Name: "Prometheus",
DepsFunc: func(cDeps dependencies) { deps = cDeps },
Params: params,
Provide: provide,
Run: run,
Name: "Prometheus",
DepsFunc: func(cDeps dependencies) { deps = cDeps },
Params: params,
Provide: provide,
Configure: configure,
Run: run,
IsEnabled: func(_ *dig.Container) bool {
return ParamsMetrics.Enabled
},
Expand All @@ -56,17 +57,32 @@ type dependencies struct {
Collector *collector.Collector
}

func run() error {
Component.LogInfo("Starting Prometheus exporter ...")
func provide(c *dig.Container) error {
return c.Provide(collector.New)
}

func configure() error {
if ParamsMetrics.GoMetrics {
deps.Collector.Registry.MustRegister(collectors.NewGoCollector())
}
if ParamsMetrics.ProcessMetrics {
deps.Collector.Registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
}

registerMetrics()
deps.Collector.RegisterCollection(TangleMetrics)
deps.Collector.RegisterCollection(ConflictMetrics)
deps.Collector.RegisterCollection(InfoMetrics)
deps.Collector.RegisterCollection(DBMetrics)
deps.Collector.RegisterCollection(CommitmentsMetrics)
deps.Collector.RegisterCollection(SlotMetrics)
deps.Collector.RegisterCollection(AccountMetrics)
deps.Collector.RegisterCollection(SchedulerMetrics)

return nil
}

func run() error {
Component.LogInfo("Starting Prometheus exporter ...")

return Component.Daemon().BackgroundWorker("Prometheus exporter", func(ctx context.Context) {
Component.LogInfo("Starting Prometheus exporter ... done")
Expand Down Expand Up @@ -118,18 +134,3 @@ func run() error {
Component.LogInfo("Stopping Prometheus exporter ... done")
}, daemon.PriorityMetrics)
}

func provide(c *dig.Container) error {
return c.Provide(collector.New)
}

func registerMetrics() {
deps.Collector.RegisterCollection(TangleMetrics)
deps.Collector.RegisterCollection(ConflictMetrics)
deps.Collector.RegisterCollection(InfoMetrics)
deps.Collector.RegisterCollection(DBMetrics)
deps.Collector.RegisterCollection(CommitmentsMetrics)
deps.Collector.RegisterCollection(SlotMetrics)
deps.Collector.RegisterCollection(AccountMetrics)
deps.Collector.RegisterCollection(SchedulerMetrics)
}

0 comments on commit df684db

Please sign in to comment.