Skip to content

Commit

Permalink
1. Separate observer for each pipeline 2. Register measure for each p…
Browse files Browse the repository at this point in the history
…ipeline and get the measure during collect for the called pipeline
  • Loading branch information
pree-dew committed Oct 28, 2024
1 parent b015b16 commit 3d2c60a
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 47 deletions.
82 changes: 52 additions & 30 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6
continue
}
inst.appendMeasures(in)

// Add the measures to the pipeline. It is required to maintain
// measures per pipeline to avoid calling the measure that
// is not part of the pipeline.
insert.pipeline.addInt64Measure(inst.observableID, in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
fn := cback
Expand Down Expand Up @@ -309,6 +314,11 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl
continue
}
inst.appendMeasures(in)

// Add the measures to the pipeline. It is required to maintain
// measures per pipeline to avoid calling the measure that
// is not part of the pipeline.
insert.pipeline.addFloat64Measure(inst.observableID, in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
fn := cback
Expand Down Expand Up @@ -440,52 +450,58 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
// Don't allocate a observer if not needed.
return noopRegister{}, nil
}

reg := newObserver()
unregs := make([]func(), len(m.pipes))
var err error
for _, inst := range insts {
switch o := inst.(type) {
case int64Observable:
if e := o.registerable(m); e != nil {
if !errors.Is(e, errEmptyAgg) {
err = errors.Join(err, e)
for ix, pipe := range m.pipes {
reg := newObserver(pipe)
for _, inst := range insts {
switch o := inst.(type) {
case int64Observable:
if e := o.registerable(m); e != nil {
if !errors.Is(e, errEmptyAgg) {
err = errors.Join(err, e)
}
continue
}
continue
}
reg.registerInt64(o.observableID)
case float64Observable:
if e := o.registerable(m); e != nil {
if !errors.Is(e, errEmptyAgg) {
err = errors.Join(err, e)
reg.registerInt64(o.observableID)
case float64Observable:
if e := o.registerable(m); e != nil {
if !errors.Is(e, errEmptyAgg) {
err = errors.Join(err, e)
}
continue
}
continue
reg.registerFloat64(o.observableID)
default:
// Instrument external to the SDK.
return nil, fmt.Errorf("invalid observable: from different implementation")
}
reg.registerFloat64(o.observableID)
default:
// Instrument external to the SDK.
return nil, fmt.Errorf("invalid observable: from different implementation")
}
}

if reg.len() == 0 {
// All insts use drop aggregation or are invalid.
return noopRegister{}, err
if reg.len() == 0 {
// All insts use drop aggregation or are invalid.
return noopRegister{}, err
}

// Some or all instruments were valid.
cBack := func(ctx context.Context) error { return f(ctx, reg) }
unregs[ix] = pipe.addMultiCallback(cBack)
}

// Some or all instruments were valid.
cback := func(ctx context.Context) error { return f(ctx, reg) }
return m.pipes.registerMultiCallback(cback), err
return m.pipes.registerMultiCallbacks(unregs), err
}

type observer struct {
embedded.Observer

pipe *pipeline
float64 map[observableID[float64]]struct{}
int64 map[observableID[int64]]struct{}
}

func newObserver() observer {
func newObserver(p *pipeline) observer {
return observer{
pipe: p,
float64: make(map[observableID[float64]]struct{}),
int64: make(map[observableID[int64]]struct{}),
}
Expand Down Expand Up @@ -530,7 +546,10 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ...
return
}
c := metric.NewObserveConfig(opts)
oImpl.observe(v, c.Attributes())
measures := r.pipe.float64Measures[oImpl.observableID]
for _, m := range measures {
m(context.Background(), v, c.Attributes())
}
}

func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric.ObserveOption) {
Expand All @@ -555,7 +574,10 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric
return
}
c := metric.NewObserveConfig(opts)
oImpl.observe(v, c.Attributes())
measures := r.pipe.int64Measures[oImpl.observableID]
for _, m := range measures {
m(context.Background(), v, c.Attributes())
}
}

type noopRegister struct{ embedded.Registration }
Expand Down
46 changes: 33 additions & 13 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFi
res = resource.Empty()
}
return &pipeline{
resource: res,
reader: reader,
views: views,
exemplarFilter: exemplarFilter,
resource: res,
reader: reader,
views: views,
int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{},
float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{},
exemplarFilter: exemplarFilter,
// aggregations is lazy allocated when needed.
}
}
Expand All @@ -64,10 +66,32 @@ type pipeline struct {
views []View

sync.Mutex
aggregations map[instrumentation.Scope][]instrumentSync
callbacks []func(context.Context) error
multiCallbacks list.List
exemplarFilter exemplar.Filter
int64Measures map[observableID[int64]][]aggregate.Measure[int64]
float64Measures map[observableID[float64]][]aggregate.Measure[float64]
aggregations map[instrumentation.Scope][]instrumentSync
callbacks []func(context.Context) error
multiCallbacks list.List
exemplarFilter exemplar.Filter
}

// addInt64Measure adds a new int64 measure to the pipeline for each observer.
func (p *pipeline) addInt64Measure(id observableID[int64], m []aggregate.Measure[int64]) {
p.Lock()
defer p.Unlock()
if _, ok := p.int64Measures[id]; ok {
return
}
p.int64Measures[id] = m
}

// addFloat64Measure adds a new float64 measure to the pipeline for each observer.
func (p *pipeline) addFloat64Measure(id observableID[float64], m []aggregate.Measure[float64]) {
p.Lock()
defer p.Unlock()
if _, ok := p.float64Measures[id]; ok {
return
}
p.float64Measures[id] = m
}

// addSync adds the instrumentSync to pipeline p with scope. This method is not
Expand Down Expand Up @@ -574,11 +598,7 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View, exempl
return pipes
}

func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration {
unregs := make([]func(), len(p))
for i, pipe := range p {
unregs[i] = pipe.addMultiCallback(c)
}
func (p pipelines) registerMultiCallbacks(unregs []func()) metric.Registration {
return unregisterFuncs{f: unregs}
}

Expand Down
10 changes: 6 additions & 4 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -528,22 +529,23 @@ func TestPipelineWithMultipleReaders(t *testing.T) {
mp := NewMeterProvider(WithReader(r1), WithReader(r2))
m := mp.Meter("test")

var val int64 = 1
var val atomic.Int64
val.Add(1)
measure := func(_ context.Context, m metric.Meter) {
oc, err := m.Int64ObservableCounter("int64-observable-counter")
require.NoError(t, err)
_, err = m.RegisterCallback(
// SDK periodically calls this function to collect data.
func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(oc, val)
o.ObserveInt64(oc, val.Load())
return nil
}, oc)
require.NoError(t, err)
}
ctx := context.Background()
measure(ctx, m)
rm := new(metricdata.ResourceMetrics)
val++
val.Add(1)

// adding sleep deliberately so that the callback get triggered
time.Sleep(2 * time.Second)
Expand All @@ -553,7 +555,7 @@ func TestPipelineWithMultipleReaders(t *testing.T) {
assert.Len(t, rm.ScopeMetrics[0].Metrics, 1)
assert.Equal(t, int64(2), rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value)

val++
val.Add(1)
time.Sleep(1 * time.Second)
err = r2.Collect(ctx, rm)
require.NoError(t, err)
Expand Down

0 comments on commit 3d2c60a

Please sign in to comment.