Skip to content

Commit

Permalink
Setup internal agent controller (#203)
Browse files Browse the repository at this point in the history
Co-authored-by: Christos Kalkanis <[email protected]>
  • Loading branch information
dmathieu and christos68k authored Oct 24, 2024
1 parent c088a05 commit 5af7c72
Show file tree
Hide file tree
Showing 2 changed files with 235 additions and 191 deletions.
228 changes: 228 additions & 0 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package controller // import "go.opentelemetry.io/ebpf-profiler/internal/controller"

import (
"context"
"fmt"
"time"

log "github.com/sirupsen/logrus"
"github.com/tklauser/numcpus"

"go.opentelemetry.io/ebpf-profiler/host"
"go.opentelemetry.io/ebpf-profiler/hostmetadata"
"go.opentelemetry.io/ebpf-profiler/internal/helpers"
"go.opentelemetry.io/ebpf-profiler/metrics"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/times"
"go.opentelemetry.io/ebpf-profiler/tracehandler"
"go.opentelemetry.io/ebpf-profiler/tracer"
tracertypes "go.opentelemetry.io/ebpf-profiler/tracer/types"
"go.opentelemetry.io/ebpf-profiler/util"
)

// Controller is an instance that runs, manages and stops the agent.
type Controller struct {
config *Config
reporter reporter.Reporter
tracer *tracer.Tracer
}

// New creates a new controller
// The controller can set global configurations (such as the eBPF syscalls) on
// setup. So there should only ever be one running.
func New(cfg *Config) *Controller {
return &Controller{
config: cfg,
}
}

// Start starts the controller
func (c *Controller) Start(ctx context.Context) error {
if err := tracer.ProbeBPFSyscall(); err != nil {
return fmt.Errorf("failed to probe eBPF syscall: %w", err)
}

if err := tracer.ProbeTracepoint(); err != nil {
return fmt.Errorf("failed to probe tracepoint: %w", err)
}

presentCores, err := numcpus.GetPresent()
if err != nil {
return fmt.Errorf("failed to read CPU file: %w", err)
}

traceHandlerCacheSize :=
traceCacheSize(c.config.MonitorInterval, c.config.SamplesPerSecond, uint16(presentCores))

intervals := times.New(c.config.MonitorInterval,
c.config.ReporterInterval, c.config.ProbabilisticInterval)

// Start periodic synchronization with the realtime clock
times.StartRealtimeSync(ctx, c.config.ClockSyncInterval)

log.Debugf("Determining tracers to include")
includeTracers, err := tracertypes.Parse(c.config.Tracers)
if err != nil {
return fmt.Errorf("failed to parse the included tracers: %w", err)
}

metadataCollector := hostmetadata.NewCollector(c.config.CollAgentAddr)
metadataCollector.AddCustomData("os.type", "linux")

kernelVersion, err := helpers.GetKernelVersion()
if err != nil {
return fmt.Errorf("failed to get Linux kernel version: %w", err)
}
// OTel semantic introduced in https://github.com/open-telemetry/semantic-conventions/issues/66
metadataCollector.AddCustomData("os.kernel.release", kernelVersion)

// hostname and sourceIP will be populated from the root namespace.
hostname, sourceIP, err := helpers.GetHostnameAndSourceIP(c.config.CollAgentAddr)
if err != nil {
log.Warnf("Failed to fetch metadata information in the root namespace: %v", err)
}
metadataCollector.AddCustomData("host.name", hostname)
metadataCollector.AddCustomData("host.ip", sourceIP)

// Network operations to CA start here
var rep reporter.Reporter
// Connect to the collection agent
rep, err = reporter.Start(ctx, &reporter.Config{
CollAgentAddr: c.config.CollAgentAddr,
DisableTLS: c.config.DisableTLS,
MaxRPCMsgSize: 32 << 20, // 32 MiB
MaxGRPCRetries: 5,
GRPCOperationTimeout: intervals.GRPCOperationTimeout(),
GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(),
GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(),
ReportInterval: intervals.ReportInterval(),
CacheSize: traceHandlerCacheSize,
SamplesPerSecond: c.config.SamplesPerSecond,
KernelVersion: kernelVersion,
HostName: hostname,
IPAddress: sourceIP,
})
if err != nil {
return fmt.Errorf("failed to start reporting: %w", err)
}
c.reporter = rep

metrics.SetReporter(rep)

// Now that set the initial host metadata, start a goroutine to keep sending updates regularly.
metadataCollector.StartMetadataCollection(ctx, rep)

// Load the eBPF code and map definitions
trc, err := tracer.NewTracer(ctx, &tracer.Config{
Reporter: rep,
Intervals: intervals,
IncludeTracers: includeTracers,
FilterErrorFrames: !c.config.SendErrorFrames,
SamplesPerSecond: c.config.SamplesPerSecond,
MapScaleFactor: int(c.config.MapScaleFactor),
KernelVersionCheck: !c.config.NoKernelVersionCheck,
DebugTracer: c.config.VerboseMode,
BPFVerifierLogLevel: uint32(c.config.BpfVerifierLogLevel),
ProbabilisticInterval: c.config.ProbabilisticInterval,
ProbabilisticThreshold: c.config.ProbabilisticThreshold,
})
if err != nil {
return fmt.Errorf("failed to load eBPF tracer: %w", err)
}
c.tracer = trc
log.Printf("eBPF tracer loaded")

now := time.Now()
// Initial scan of /proc filesystem to list currently active PIDs and have them processed.
if err = trc.StartPIDEventProcessor(ctx); err != nil {
log.Errorf("failed to list processes from /proc: %v", err)
}
metrics.Add(metrics.IDProcPIDStartupMs, metrics.MetricValue(time.Since(now).Milliseconds()))
log.Debug("Completed initial PID listing")

// Attach our tracer to the perf event
if err := trc.AttachTracer(); err != nil {
return fmt.Errorf("failed to attach to perf event: %w", err)
}
log.Info("Attached tracer program")

if c.config.ProbabilisticThreshold < tracer.ProbabilisticThresholdMax {
trc.StartProbabilisticProfiling(ctx)
log.Printf("Enabled probabilistic profiling")
} else {
if err := trc.EnableProfiling(); err != nil {
return fmt.Errorf("failed to enable perf events: %w", err)
}
}

if err := trc.AttachSchedMonitor(); err != nil {
return fmt.Errorf("failed to attach scheduler monitor: %w", err)
}

// This log line is used in our system tests to verify if that the agent has started. So if you
// change this log line update also the system test.
log.Printf("Attached sched monitor")

if err := startTraceHandling(ctx, rep, intervals, trc, traceHandlerCacheSize); err != nil {
return fmt.Errorf("failed to start trace handling: %w", err)
}

return nil
}

// Shutdown stops the controller
func (c *Controller) Shutdown() {
log.Info("Stop processing ...")
if c.reporter != nil {
c.reporter.Stop()
}

if c.tracer != nil {
c.tracer.Close()
}
}

func startTraceHandling(ctx context.Context, rep reporter.TraceReporter,
intervals *times.Times, trc *tracer.Tracer, cacheSize uint32) error {
// Spawn monitors for the various result maps
traceCh := make(chan *host.Trace)

if err := trc.StartMapMonitors(ctx, traceCh); err != nil {
return fmt.Errorf("failed to start map monitors: %v", err)
}

_, err := tracehandler.Start(ctx, rep, trc.TraceProcessor(),
traceCh, intervals, cacheSize)
return err
}

// traceCacheSize defines the maximum number of elements for the caches in tracehandler.
//
// The caches in tracehandler have a size-"processing overhead" trade-off: Every cache miss will
// trigger additional processing for that trace in userspace (Go). For most maps, we use
// maxElementsPerInterval as a base sizing factor. For the tracehandler caches, we also multiply
// with traceCacheIntervals. For typical/small values of maxElementsPerInterval, this can lead to
// non-optimal map sizing (reduced cache_hit:cache_miss ratio and increased processing overhead).
// Simply increasing traceCacheIntervals is problematic when maxElementsPerInterval is large
// (e.g. too many CPU cores present) as we end up using too much memory. A minimum size is
// therefore used here.
func traceCacheSize(monitorInterval time.Duration, samplesPerSecond int,
presentCPUCores uint16) uint32 {
const (
traceCacheIntervals = 6
traceCacheMinSize = 65536
)

maxElements := maxElementsPerInterval(monitorInterval, samplesPerSecond, presentCPUCores)

size := maxElements * uint32(traceCacheIntervals)
if size < traceCacheMinSize {
size = traceCacheMinSize
}
return util.NextPowerOfTwo(size)
}

func maxElementsPerInterval(monitorInterval time.Duration, samplesPerSecond int,
presentCPUCores uint16) uint32 {
return uint32(uint16(samplesPerSecond) * uint16(monitorInterval.Seconds()) * presentCPUCores)
}
Loading

0 comments on commit 5af7c72

Please sign in to comment.