Skip to content

Commit

Permalink
[receiver/discovery] Emit active endpoints as entity events periodica…
Browse files Browse the repository at this point in the history
…lly (signalfx#4686)

Only applicable if log_endpoints: true
  • Loading branch information
dmitryax authored Apr 19, 2024
1 parent 1849321 commit 012775d
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 2 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

### 🛑 Breaking changes 🛑

- (Splunk) `receiver/discovery`: Emit entity events for discovered endpoints with log_endpoints: true ([#4684](https://github.com/signalfx/splunk-otel-collector/pull/4684))
- (Splunk) `receiver/discovery`: Update the component to emit entity events
- Emit entity events for discovered endpoints with log_endpoints: true ([#4684](https://github.com/signalfx/splunk-otel-collector/pull/4684))
- Ensure active endpoints emitted as entity events periodically ([#4684](https://github.com/signalfx/splunk-otel-collector/pull/4684))

### 💡 Enhancements 💡

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ require (
go.opentelemetry.io/otel/metric v1.25.0
go.opentelemetry.io/otel/trace v1.25.0
go.uber.org/atomic v1.11.0
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/sys v0.19.0
Expand Down
17 changes: 17 additions & 0 deletions internal/receiver/discoveryreceiver/correlation.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type correlationStore interface {
GetOrCreate(receiverID component.ID, endpointID observer.EndpointID) correlation
Attrs(receiverID component.ID) map[string]string
UpdateAttrs(receiverID component.ID, attrs map[string]string)
Endpoints(updatedBefore time.Time) []observer.Endpoint
// Start the reaping loop to prevent unnecessary endpoint buildup
Start()
// Stop the reaping loop
Expand Down Expand Up @@ -109,6 +110,22 @@ func (s *store) UpdateEndpoint(endpoint observer.Endpoint, receiverID component.
}
}

// Endpoints returns all active endpoints that have not been updated since the provided time.
func (s *store) Endpoints(updatedBefore time.Time) []observer.Endpoint {
var endpoints []observer.Endpoint
s.correlations.Range(func(eID, c any) bool {
endpointID := eID.(observer.EndpointID)
endpointUnlock := s.endpointLocks.Lock(endpointID)
defer endpointUnlock()
corr := c.(*correlation)
if corr.lastState != removedState && corr.lastUpdated.Before(updatedBefore) {
endpoints = append(endpoints, c.(*correlation).endpoint)
}
return true
})
return endpoints
}

// GetOrCreate returns an existing receiver/endpoint correlation or creates a new one.
func (s *store) GetOrCreate(receiverID component.ID, endpointID observer.EndpointID) correlation {
endpointUnlock := s.endpointLocks.Lock(endpointID)
Expand Down
32 changes: 31 additions & 1 deletion internal/receiver/discoveryreceiver/endpoint_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ var (
)

type endpointTracker struct {
correlations correlationStore
config *Config
logger *zap.Logger
pLogs chan plog.Logs
observables map[component.ID]observer.Observable
correlations correlationStore
stopCh chan struct{}
notifies []*notify
// emitInterval defines an interval for emitting entity state events.
// Potentially can be exposed as a user config option if there is a need.
emitInterval time.Duration
}

type notify struct {
Expand All @@ -66,6 +70,13 @@ func newEndpointTracker(
logger: logger,
pLogs: pLogs,
correlations: correlations,
// 15 minutes is a reasonable default for emitting entity state events given the 1 hour TTL in the inventory
// service. Potentially we could expose it as a user config, but only if there is a need.
// Note that we emit entity state events on every entity change. Entities that were changed in the last
// 15 minutes are not emitted again. So the actual interval of emitting entity state events can be more than 15
// minutes but always less than 30 minutes.
emitInterval: 15 * time.Minute,
stopCh: make(chan struct{}),
}
}

Expand All @@ -82,6 +93,24 @@ func (et *endpointTracker) start() {
go observable.ListAndWatch(n)
}
et.correlations.Start()
go et.startEmitLoop()
}

func (et *endpointTracker) startEmitLoop() {
timer := time.NewTicker(et.emitInterval)
for {
select {
case <-timer.C:
for obs := range et.observables {
activeEndpoints := et.correlations.Endpoints(time.Now().Add(-et.emitInterval))
// changedState just means that we want to report the current state of the endpoint.
et.emitEndpointLogs(obs, changedState, activeEndpoints, time.Now())
}
case <-et.stopCh:
timer.Stop()
return
}
}
}

func (et *endpointTracker) stop() {
Expand All @@ -90,6 +119,7 @@ func (et *endpointTracker) stop() {
go n.observable.Unsubscribe(n)
}
et.correlations.Stop()
et.stopCh <- struct{}{}
}

func (et *endpointTracker) emitEndpointLogs(observerCID component.ID, eventType endpointState, endpoints []observer.Endpoint, received time.Time) {
Expand Down
61 changes: 61 additions & 0 deletions internal/receiver/discoveryreceiver/endpoint_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package discoveryreceiver

import (
"path"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -621,3 +622,63 @@ func TestUpdateEndpoints(t *testing.T) {
})
}
}

func TestPeriodicEntityEmitting(t *testing.T) {
logger := zap.NewNop()
cfg := createDefaultConfig().(*Config)
cfg.LogEndpoints = true
cfg.Receivers = map[component.ID]ReceiverEntry{
component.MustNewIDWithName("fake_receiver", ""): {
Rule: mustNewRule(`type == "port" && pod.name == "pod.name" && port == 1`),
},
}

ch := make(chan plog.Logs, 10)
obsID := component.MustNewIDWithName("fake_observer", "")
obs := &fakeObservable{}
et := &endpointTracker{
config: cfg,
observables: map[component.ID]observer.Observable{obsID: obs},
logger: logger,
pLogs: ch,
correlations: newCorrelationStore(logger, cfg.CorrelationTTL),
emitInterval: 100 * time.Millisecond,
stopCh: make(chan struct{}),
}
et.start()
defer et.stop()

// Wait for obs.ListAndWatch called asynchronously in the et.start()
require.Eventually(t, func() bool {
obs.lock.Lock()
defer obs.lock.Unlock()
return obs.onAdd != nil
}, 200*time.Millisecond, 10*time.Millisecond)

obs.onAdd([]observer.Endpoint{portEndpoint})

// Wait for at least 2 entity events to be emitted
require.Eventually(t, func() bool { return len(ch) >= 2 }, 1*time.Second, 50*time.Millisecond)

gotLogs := <-ch
require.Equal(t, 1, gotLogs.LogRecordCount())
// TODO: Use plogtest.IgnoreTimestamp once available
expectedLogs, failed, err := endpointToPLogs(obsID, addedState, []observer.Endpoint{portEndpoint},
gotLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Timestamp().AsTime())
require.NoError(t, err)
require.Zero(t, failed)
require.NoError(t, plogtest.CompareLogs(expectedLogs, gotLogs))
}

type fakeObservable struct {
onAdd func([]observer.Endpoint)
lock sync.Mutex
}

func (f *fakeObservable) ListAndWatch(notify observer.Notify) {
f.lock.Lock()
defer f.lock.Unlock()
f.onAdd = notify.OnAdd
}

func (f *fakeObservable) Unsubscribe(observer.Notify) {}
25 changes: 25 additions & 0 deletions internal/receiver/discoveryreceiver/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright Splunk, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package discoveryreceiver

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

0 comments on commit 012775d

Please sign in to comment.