Skip to content

Commit

Permalink
refactoring instrument creation
Browse files Browse the repository at this point in the history
Signed-off-by: Jaydip Gabani <[email protected]>
  • Loading branch information
JaydipGabani committed Jan 17, 2024
1 parent e4a707f commit 37c37c0
Show file tree
Hide file tree
Showing 17 changed files with 252 additions and 313 deletions.
122 changes: 54 additions & 68 deletions pkg/audit/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package audit

import (
"context"
"errors"
"sync"
"time"

Expand All @@ -22,72 +21,13 @@ const (
enforcementActionKey = "enforcement_action"
)

var (
violationsM metric.Int64ObservableGauge
auditDurationM metric.Float64Histogram
lastRunStartTimeM metric.Float64ObservableGauge
lastRunEndTimeM metric.Float64ObservableGauge
meter metric.Meter
)

func init() {
var err error
meter = otel.GetMeterProvider().Meter("gatekeeper")

violationsM, err = meter.Int64ObservableGauge(
violationsMetricName,
metric.WithDescription("Total number of audited violations"),
)

if err != nil {
panic(err)
}

auditDurationM, err = meter.Float64Histogram(
auditDurationMetricName,
metric.WithDescription("Latency of audit operation in seconds"))
if err != nil {
panic(err)
}

lastRunStartTimeM, err = meter.Float64ObservableGauge(
lastRunStartTimeMetricName,
metric.WithDescription("Timestamp of last audit run starting time"),
)
if err != nil {
panic(err)
}

lastRunEndTimeM, err = meter.Float64ObservableGauge(
lastRunEndTimeMetricName,
metric.WithDescription("Timestamp of last audit run ending time"),
)
if err != nil {
panic(err)
}

view.Register(sdkmetric.NewView(
sdkmetric.Instrument{Name: auditDurationMetricName},
sdkmetric.Stream{
Aggregation: sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: []float64{1 * 60, 3 * 60, 5 * 60, 10 * 60, 15 * 60, 20 * 60, 40 * 60, 80 * 60, 160 * 60, 320 * 60},
},
},
))
}
var auditDurationM metric.Float64Histogram

func (r *reporter) registerCallback() error {
_, err1 := meter.RegisterCallback(r.observeTotalViolations, violationsM)
_, err2 := meter.RegisterCallback(r.observeRunEnd, lastRunEndTimeM)
_, err3 := meter.RegisterCallback(r.observeRunStart, lastRunStartTimeM)
return errors.Join(err1, err2, err3)
}

func (r *reporter) observeTotalViolations(_ context.Context, o metric.Observer) error {
func (r *reporter) observeTotalViolations(_ context.Context, o metric.Int64Observer) error {
r.mu.RLock()
defer r.mu.RUnlock()
for k, v := range r.totalViolationsPerEnforcementAction {
o.ObserveInt64(violationsM, v, metric.WithAttributes(attribute.String(enforcementActionKey, string(k))))
o.Observe(v, metric.WithAttributes(attribute.String(enforcementActionKey, string(k))))
}
return nil
}
Expand Down Expand Up @@ -121,24 +61,70 @@ func (r *reporter) reportRunEnd(t time.Time) error {
return nil
}

func (r *reporter) observeRunStart(_ context.Context, o metric.Observer) error {
func (r *reporter) observeRunStart(_ context.Context, o metric.Float64Observer) error {
r.mu.RLock()
defer r.mu.RUnlock()
o.ObserveFloat64(lastRunStartTimeM, float64(r.startTime.Unix()))
o.Observe(float64(r.startTime.Unix()))
return nil
}

func (r *reporter) observeRunEnd(_ context.Context, o metric.Observer) error {
func (r *reporter) observeRunEnd(_ context.Context, o metric.Float64Observer) error {
r.mu.RLock()
defer r.mu.RUnlock()
o.ObserveFloat64(lastRunEndTimeM, float64(r.endTime.Unix()))
o.Observe(float64(r.endTime.Unix()))
return nil
}

// newStatsReporter creates a reporter for audit metrics.
func newStatsReporter() (*reporter, error) {
r := &reporter{}
return r, r.registerCallback()
var err error
meter := otel.GetMeterProvider().Meter("gatekeeper")

_, err = meter.Int64ObservableGauge(
violationsMetricName,
metric.WithDescription("Total number of audited violations"),
metric.WithInt64Callback(r.observeTotalViolations),
)

if err != nil {
return nil, err
}

auditDurationM, err = meter.Float64Histogram(
auditDurationMetricName,
metric.WithDescription("Latency of audit operation in seconds"))
if err != nil {
return nil, err
}

_, err = meter.Float64ObservableGauge(
lastRunStartTimeMetricName,
metric.WithDescription("Timestamp of last audit run starting time"),
metric.WithFloat64Callback(r.observeRunStart),
)
if err != nil {
return nil, err
}

_, err = meter.Float64ObservableGauge(
lastRunEndTimeMetricName,
metric.WithDescription("Timestamp of last audit run ending time"),
metric.WithFloat64Callback(r.observeRunEnd),
)
if err != nil {
return nil, err
}

view.Register(sdkmetric.NewView(
sdkmetric.Instrument{Name: auditDurationMetricName},
sdkmetric.Stream{
Aggregation: sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: []float64{1 * 60, 3 * 60, 5 * 60, 10 * 60, 15 * 60, 20 * 60, 40 * 60, 80 * 60, 160 * 60, 320 * 60},
},
},
))
return r, nil
}

type reporter struct {
Expand Down
14 changes: 7 additions & 7 deletions pkg/audit/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,27 @@ import (
testmetric "github.com/open-policy-agent/gatekeeper/v3/test/metrics"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)

func initializeTestInstruments(t *testing.T) (rdr *sdkmetric.PeriodicReader, r *reporter) {
var err error
r, err = newStatsReporter()
assert.NoError(t, err)
rdr = sdkmetric.NewPeriodicReader(new(testmetric.FnExporter))
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr))
meter = mp.Meter("test")
meter := mp.Meter("test")

violationsM, err = meter.Int64ObservableGauge(violationsMetricName)
_, err = meter.Int64ObservableGauge(violationsMetricName, metric.WithInt64Callback(r.observeTotalViolations))
assert.NoError(t, err)
auditDurationM, err = meter.Float64Histogram(auditDurationMetricName)
assert.NoError(t, err)
lastRunStartTimeM, err = meter.Float64ObservableGauge(lastRunStartTimeMetricName)
assert.NoError(t, err)
lastRunEndTimeM, err = meter.Float64ObservableGauge(lastRunEndTimeMetricName)
_, err = meter.Float64ObservableGauge(lastRunStartTimeMetricName, metric.WithFloat64Callback(r.observeRunStart))
assert.NoError(t, err)

r, err = newStatsReporter()
_, err = meter.Float64ObservableGauge(lastRunEndTimeMetricName, metric.WithFloat64Callback(r.observeRunEnd))
assert.NoError(t, err)

return rdr, r
Expand Down
35 changes: 11 additions & 24 deletions pkg/controller/constraint/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,11 @@ const (
statusKey = "status"
)

var (
constraintsM metric.Int64ObservableGauge
meter metric.Meter
)

func init() {
var err error
meter = otel.GetMeterProvider().Meter("gatekeeper")
constraintsM, err = meter.Int64ObservableGauge(
constraintsMetricName,
metric.WithDescription("Current number of known constraints"))
if err != nil {
panic(err)
}
}

func (r *reporter) observeConstraints(_ context.Context, observer metric.Observer) error {
func (r *reporter) observeConstraints(_ context.Context, observer metric.Int64Observer) error {
r.mux.RLock()
defer r.mux.RUnlock()
for t, v := range r.constraintsReport {
observer.ObserveInt64(constraintsM, v, metric.WithAttributes(attribute.String(enforcementActionKey, string(t.enforcementAction)), attribute.String(statusKey, string(t.status))))
observer.Observe(v, metric.WithAttributes(attribute.String(enforcementActionKey, string(t.enforcementAction)), attribute.String(statusKey, string(t.status))))
}
return nil
}
Expand All @@ -58,12 +42,15 @@ type StatsReporter interface {
// newStatsReporter creates a reporter for audit metrics.
func newStatsReporter() (*reporter, error) {
r := &reporter{}
return r, r.registerCallback()
}

func (r *reporter) registerCallback() error {
_, err := meter.RegisterCallback(r.observeConstraints, constraintsM)
return err
var err error
meter := otel.GetMeterProvider().Meter("gatekeeper")
_, err = meter.Int64ObservableGauge(
constraintsMetricName,
metric.WithDescription("Current number of known constraints"), metric.WithInt64Callback(r.observeConstraints))
if err != nil {
return nil, err
}
return r, nil
}

type reporter struct {
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/constraint/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
testmetric "github.com/open-policy-agent/gatekeeper/v3/test/metrics"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
Expand All @@ -18,12 +19,12 @@ func initializeTestInstruments(t *testing.T) (rdr *sdkmetric.PeriodicReader, r *
var err error
rdr = sdkmetric.NewPeriodicReader(new(testmetric.FnExporter))
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr))
meter = mp.Meter("test")
r, err = newStatsReporter()
assert.NoError(t, err)
meter := mp.Meter("test")

// Ensure the pipeline has a callback setup
constraintsM, err = meter.Int64ObservableGauge(constraintsMetricName)
assert.NoError(t, err)
r, err = newStatsReporter()
_, err = meter.Int64ObservableGauge(constraintsMetricName, metric.WithInt64Callback(r.observeConstraints))
assert.NoError(t, err)
return rdr, r
}
Expand Down
40 changes: 15 additions & 25 deletions pkg/controller/constrainttemplate/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,26 @@ const (
)

var (
ctM metric.Int64ObservableGauge
ingestCountM metric.Int64Counter
ingestDurationM metric.Float64Histogram
meter metric.Meter
)

func init() {
func (r *reporter) reportIngestDuration(ctx context.Context, status metrics.Status, d time.Duration) error {
ingestDurationM.Record(ctx, d.Seconds(), metric.WithAttributes(attribute.String(statusKey, string(status))))
ingestCountM.Add(ctx, 1, metric.WithAttributes(attribute.String(statusKey, string(status))))
return nil
}

// newStatsReporter creates a reporter for watch metrics.
func newStatsReporter() *reporter {
var err error
meter = otel.GetMeterProvider().Meter("gatekeeper")
ctM, err = meter.Int64ObservableGauge(
reg := &ctRegistry{cache: make(map[types.NamespacedName]metrics.Status)}
r := &reporter{registry: reg}
meter := otel.GetMeterProvider().Meter("gatekeeper")
_, err = meter.Int64ObservableGauge(
ctMetricName,
metric.WithDescription(ctDesc),
metric.WithInt64Callback(r.observeCTM),
)

if err != nil {
Expand Down Expand Up @@ -65,19 +73,6 @@ func init() {
},
},
))
}

func (r *reporter) reportIngestDuration(ctx context.Context, status metrics.Status, d time.Duration) error {
ingestDurationM.Record(ctx, d.Seconds(), metric.WithAttributes(attribute.String(statusKey, string(status))))
ingestCountM.Add(ctx, 1, metric.WithAttributes(attribute.String(statusKey, string(status))))
return nil
}

// newStatsReporter creates a reporter for watch metrics.
func newStatsReporter() *reporter {
reg := &ctRegistry{cache: make(map[types.NamespacedName]metrics.Status)}
r := &reporter{registry: reg}
_ = r.registerCallback()
return r
}

Expand Down Expand Up @@ -139,16 +134,11 @@ func (r *ctRegistry) report(_ context.Context, mReporter *reporter) {
}
}

func (r *reporter) registerCallback() error {
_, err := meter.RegisterCallback(r.observeCTM, ctM)
return err
}

func (r *reporter) observeCTM(_ context.Context, o metric.Observer) error {
func (r *reporter) observeCTM(_ context.Context, o metric.Int64Observer) error {
r.mu.RLock()
defer r.mu.RUnlock()
for status, count := range r.ctReport {
o.ObserveInt64(ctM, count, metric.WithAttributes(attribute.String(statusKey, string(status))))
o.Observe(count, metric.WithAttributes(attribute.String(statusKey, string(status))))
}
return nil
}
7 changes: 4 additions & 3 deletions pkg/controller/constrainttemplate/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
testmetric "github.com/open-policy-agent/gatekeeper/v3/test/metrics"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
Expand All @@ -19,7 +20,8 @@ func initializeTestInstruments(t *testing.T) (rdr *sdkmetric.PeriodicReader, r *
var err error
rdr = sdkmetric.NewPeriodicReader(new(testmetric.FnExporter))
mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr))
meter = mp.Meter("test")
r = newStatsReporter()
meter := mp.Meter("test")

// Ensure the pipeline has a callback setup
ingestDurationM, err = meter.Float64Histogram(ingestDuration)
Expand All @@ -28,9 +30,8 @@ func initializeTestInstruments(t *testing.T) (rdr *sdkmetric.PeriodicReader, r *
ingestCountM, err = meter.Int64Counter(ingestCount)
assert.NoError(t, err)

ctM, err = meter.Int64ObservableGauge(ctMetricName)
_, err = meter.Int64ObservableGauge(ctMetricName, metric.WithInt64Callback(r.observeCTM))
assert.NoError(t, err)
r = newStatsReporter()
return rdr, r
}

Expand Down
Loading

0 comments on commit 37c37c0

Please sign in to comment.