Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow passing a custom reporter to the controller #207

Merged
merged 9 commits into from
Oct 31, 2024
3 changes: 3 additions & 0 deletions internal/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

log "github.com/sirupsen/logrus"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/tracer"
)

Expand All @@ -30,6 +31,8 @@ type Config struct {
VerboseMode bool
Version bool

Reporter reporter.Reporter

Fs *flag.FlagSet
}

Expand Down
42 changes: 13 additions & 29 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@ type Controller struct {
// The controller can set global configurations (such as the eBPF syscalls) on
// setup. So there should only ever be one running.
func New(cfg *Config) *Controller {
return &Controller{
config: cfg,
c := &Controller{
config: cfg,
reporter: cfg.Reporter,
}

return c
}

// Start starts the controller
// The controller should only be started once.
func (c *Controller) Start(ctx context.Context) error {
if err := tracer.ProbeBPFSyscall(); err != nil {
return fmt.Errorf("failed to probe eBPF syscall: %w", err)
Expand Down Expand Up @@ -86,40 +90,19 @@ func (c *Controller) Start(ctx context.Context) error {
metadataCollector.AddCustomData("host.name", hostname)
metadataCollector.AddCustomData("host.ip", sourceIP)

// Network operations to CA start here
var rep reporter.Reporter
// Connect to the collection agent
rep, err = reporter.Start(ctx, &reporter.Config{
CollAgentAddr: c.config.CollAgentAddr,
DisableTLS: c.config.DisableTLS,
MaxRPCMsgSize: 32 * MiB,
MaxGRPCRetries: 5,
GRPCOperationTimeout: intervals.GRPCOperationTimeout(),
GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(),
GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(),
ReportInterval: intervals.ReportInterval(),
ExecutablesCacheElements: 4096,
// Next step: Calculate FramesCacheElements from numCores and samplingRate.
FramesCacheElements: 65536,
CGroupCacheElements: 1024,
SamplesPerSecond: c.config.SamplesPerSecond,
KernelVersion: kernelVersion,
HostName: hostname,
IPAddress: sourceIP,
})
err = c.reporter.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start reporting: %w", err)
return fmt.Errorf("failed to start reporter: %w", err)
}
c.reporter = rep

metrics.SetReporter(rep)
metrics.SetReporter(c.reporter)

// Now that set the initial host metadata, start a goroutine to keep sending updates regularly.
metadataCollector.StartMetadataCollection(ctx, rep)
metadataCollector.StartMetadataCollection(ctx, c.reporter)

// Load the eBPF code and map definitions
trc, err := tracer.NewTracer(ctx, &tracer.Config{
Reporter: rep,
Reporter: c.reporter,
Intervals: intervals,
IncludeTracers: includeTracers,
FilterErrorFrames: !c.config.SendErrorFrames,
Expand Down Expand Up @@ -167,7 +150,8 @@ func (c *Controller) Start(ctx context.Context) error {
// change this log line update also the system test.
log.Printf("Attached sched monitor")

if err := startTraceHandling(ctx, rep, intervals, trc, traceHandlerCacheSize); err != nil {
if err := startTraceHandling(ctx, c.reporter, intervals, trc,
traceHandlerCacheSize); err != nil {
return fmt.Errorf("failed to start trace handling: %w", err)
}

Expand Down
43 changes: 43 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"golang.org/x/sys/unix"

"go.opentelemetry.io/ebpf-profiler/internal/controller"
"go.opentelemetry.io/ebpf-profiler/internal/helpers"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/times"
"go.opentelemetry.io/ebpf-profiler/vc"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -96,6 +99,46 @@ func mainWithExitCode() exitCode {
}()
}

intervals := times.New(cfg.MonitorInterval,
cfg.ReporterInterval, cfg.ProbabilisticInterval)

kernelVersion, err := helpers.GetKernelVersion()
if err != nil {
log.Error(err)
return exitFailure
}

// hostname and sourceIP will be populated from the root namespace.
hostname, sourceIP, err := helpers.GetHostnameAndSourceIP(cfg.CollAgentAddr)
if err != nil {
log.Error(err)
return exitFailure
}

rep, err := reporter.NewOTLP(&reporter.Config{
CollAgentAddr: cfg.CollAgentAddr,
DisableTLS: cfg.DisableTLS,
MaxRPCMsgSize: 32 << 20, // 32 MiB
MaxGRPCRetries: 5,
GRPCOperationTimeout: intervals.GRPCOperationTimeout(),
GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(),
GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(),
ReportInterval: intervals.ReportInterval(),
ExecutablesCacheElements: 4096,
// Next step: Calculate FramesCacheElements from numCores and samplingRate.
FramesCacheElements: 65536,
CGroupCacheElements: 1024,
SamplesPerSecond: cfg.SamplesPerSecond,
KernelVersion: kernelVersion,
HostName: hostname,
IPAddress: sourceIP,
})
if err != nil {
log.Error(err)
return exitFailure
}
cfg.Reporter = rep

log.Infof("Starting OTEL profiling agent %s (revision %s, build timestamp %s)",
vc.Version(), vc.Revision(), vc.BuildTimestamp())

Expand Down
7 changes: 7 additions & 0 deletions reporter/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ type Reporter interface {
HostMetadataReporter
MetricsReporter

// Start starts the reporter in the background.
//
// If the reporter needs to perform a long-running starting operation then it
// is recommended that Start() returns quickly and the long-running operation
// is performed in the background.
Comment on lines +23 to +25
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we should have this here as in some cases (e.g. long-running operation that can fail) it's going to create problems (reporter that hasn't initialized properly with the rest of the agent being oblivious and executing as normal).

I'd reword to the following or remove altogether:

Suggested change
// If the reporter needs to perform a long-running starting operation then it
// is recommended that Start() returns quickly and the long-running operation
// is performed in the background.
// If this method needs to perform a long-running operation that can NOT fail, it is
// recommended that Start returns quickly and the long-running operation is
// performed in the background.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. Running an HTTP server can fail, yet that's a long-running operation that should not run synchronously here.
If folks need something long-running that can report errors, they should setup a channel, or an error handler to be able to report the error upstream.

Note that this comment is heavily inspired by the one on collector component, which does the same thing.
https://github.com/open-telemetry/opentelemetry-collector/blob/main/component/component.go#L32

Start(context.Context) error

// Stop triggers a graceful shutdown of the reporter.
Stop()
// GetMetrics returns the reporter internal metrics.
Expand Down
119 changes: 62 additions & 57 deletions reporter/otlp_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type attrKeyValue struct {

// OTLPReporter receives and transforms information to be OTLP/profiles compliant.
type OTLPReporter struct {
config *Config
// name is the ScopeProfile's name.
name string

Expand Down Expand Up @@ -144,6 +145,60 @@ type OTLPReporter struct {
ipAddress string
}

// NewOTLP returns a new instance of OTLPReporter
func NewOTLP(cfg *Config) (*OTLPReporter, error) {
executables, err :=
lru.NewSynced[libpf.FileID, execInfo](cfg.ExecutablesCacheElements, libpf.FileID.Hash32)
if err != nil {
return nil, err
}
executables.SetLifetime(1 * time.Hour) // Allow GC to clean stale items.

frames, err := lru.NewSynced[libpf.FileID,
*xsync.RWMutex[map[libpf.AddressOrLineno]sourceInfo]](
cfg.FramesCacheElements, libpf.FileID.Hash32)
if err != nil {
return nil, err
}
frames.SetLifetime(1 * time.Hour) // Allow GC to clean stale items.

cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements,
func(pid libpf.PID) uint32 { return uint32(pid) })
if err != nil {
return nil, err
}
// Set a lifetime to reduce risk of invalid data in case of PID reuse.
cgroupv2ID.SetLifetime(90 * time.Second)

// Next step: Dynamically configure the size of this LRU.
// Currently, we use the length of the JSON array in
// hostmetadata/hostmetadata.json.
hostmetadata, err := lru.NewSynced[string, string](115, hashString)
if err != nil {
return nil, err
}

return &OTLPReporter{
config: cfg,
name: cfg.Name,
version: cfg.Version,
kernelVersion: cfg.KernelVersion,
hostName: cfg.HostName,
ipAddress: cfg.IPAddress,
samplesPerSecond: cfg.SamplesPerSecond,
hostID: strconv.FormatUint(cfg.HostID, 10),
stopSignal: make(chan libpf.Void),
pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout,
client: nil,
rpcStats: NewStatsHandler(),
executables: executables,
frames: frames,
hostmetadata: hostmetadata,
traceEvents: xsync.NewRWMutex(map[traceAndMetaKey]*traceEvents{}),
cgroupv2ID: cgroupv2ID,
}, nil
}

// hashString is a helper function for LRUs that use string as a key.
// Xxh3 turned out to be the fastest hash function for strings in the FreeLRU benchmarks.
// It was only outperformed by the AES hash function, which is implemented in Plan9 assembly.
Expand Down Expand Up @@ -303,73 +358,23 @@ func (r *OTLPReporter) GetMetrics() Metrics {
}

// Start sets up and manages the reporting connection to a OTLP backend.
func Start(mainCtx context.Context, cfg *Config) (Reporter, error) {
executables, err :=
lru.NewSynced[libpf.FileID, execInfo](cfg.ExecutablesCacheElements, libpf.FileID.Hash32)
if err != nil {
return nil, err
}
executables.SetLifetime(1 * time.Hour) // Allow GC to clean stale items.

frames, err := lru.NewSynced[libpf.FileID,
*xsync.RWMutex[map[libpf.AddressOrLineno]sourceInfo]](
cfg.FramesCacheElements, libpf.FileID.Hash32)
if err != nil {
return nil, err
}
frames.SetLifetime(1 * time.Hour) // Allow GC to clean stale items.

cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements,
func(pid libpf.PID) uint32 { return uint32(pid) })
if err != nil {
return nil, err
}
// Set a lifetime to reduce risk of invalid data in case of PID reuse.
cgroupv2ID.SetLifetime(90 * time.Second)

// Next step: Dynamically configure the size of this LRU.
// Currently, we use the length of the JSON array in
// hostmetadata/hostmetadata.json.
hostmetadata, err := lru.NewSynced[string, string](115, hashString)
if err != nil {
return nil, err
}

r := &OTLPReporter{
name: cfg.Name,
version: cfg.Version,
kernelVersion: cfg.KernelVersion,
hostName: cfg.HostName,
ipAddress: cfg.IPAddress,
samplesPerSecond: cfg.SamplesPerSecond,
hostID: strconv.FormatUint(cfg.HostID, 10),
stopSignal: make(chan libpf.Void),
pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout,
client: nil,
rpcStats: NewStatsHandler(),
executables: executables,
frames: frames,
hostmetadata: hostmetadata,
traceEvents: xsync.NewRWMutex(map[traceAndMetaKey]*traceEvents{}),
cgroupv2ID: cgroupv2ID,
}

func (r *OTLPReporter) Start(ctx context.Context) error {
// Create a child context for reporting features
ctx, cancelReporting := context.WithCancel(mainCtx)
ctx, cancelReporting := context.WithCancel(ctx)

// Establish the gRPC connection before going on, waiting for a response
// from the collectionAgent endpoint.
// Use grpc.WithBlock() in setupGrpcConnection() for this to work.
otlpGrpcConn, err := waitGrpcEndpoint(ctx, cfg, r.rpcStats)
otlpGrpcConn, err := waitGrpcEndpoint(ctx, r.config, r.rpcStats)
if err != nil {
cancelReporting()
close(r.stopSignal)
return nil, err
return err
}
r.client = otlpcollector.NewProfilesServiceClient(otlpGrpcConn)

go func() {
tick := time.NewTicker(cfg.ReportInterval)
tick := time.NewTicker(r.config.ReportInterval)
defer tick.Stop()
purgeTick := time.NewTicker(5 * time.Minute)
defer purgeTick.Stop()
Expand All @@ -383,7 +388,7 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) {
if err := r.reportOTLPProfile(ctx); err != nil {
log.Errorf("Request failed: %v", err)
}
tick.Reset(libpf.AddJitter(cfg.ReportInterval, 0.2))
tick.Reset(libpf.AddJitter(r.config.ReportInterval, 0.2))
case <-purgeTick.C:
// Allow the GC to purge expired entries to avoid memory leaks.
r.executables.PurgeExpired()
Expand All @@ -404,7 +409,7 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) {
}
}()

return r, nil
return nil
}

// reportOTLPProfile creates and sends out an OTLP profile.
Expand Down