diff --git a/internal/controller/config.go b/internal/controller/config.go index d8e73811..16daddc6 100644 --- a/internal/controller/config.go +++ b/internal/controller/config.go @@ -8,6 +8,7 @@ import ( "time" log "github.com/sirupsen/logrus" + "go.opentelemetry.io/ebpf-profiler/reporter" "go.opentelemetry.io/ebpf-profiler/tracer" ) @@ -30,6 +31,8 @@ type Config struct { VerboseMode bool Version bool + Reporter reporter.Reporter + Fs *flag.FlagSet } diff --git a/internal/controller/controller.go b/internal/controller/controller.go index a58f62b7..6f9d6d4e 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -33,12 +33,16 @@ type Controller struct { // The controller can set global configurations (such as the eBPF syscalls) on // setup. So there should only ever be one running. func New(cfg *Config) *Controller { - return &Controller{ - config: cfg, + c := &Controller{ + config: cfg, + reporter: cfg.Reporter, } + + return c } // Start starts the controller +// The controller should only be started once. func (c *Controller) Start(ctx context.Context) error { if err := tracer.ProbeBPFSyscall(); err != nil { return fmt.Errorf("failed to probe eBPF syscall: %w", err) @@ -86,40 +90,19 @@ func (c *Controller) Start(ctx context.Context) error { metadataCollector.AddCustomData("host.name", hostname) metadataCollector.AddCustomData("host.ip", sourceIP) - // Network operations to CA start here - var rep reporter.Reporter - // Connect to the collection agent - rep, err = reporter.Start(ctx, &reporter.Config{ - CollAgentAddr: c.config.CollAgentAddr, - DisableTLS: c.config.DisableTLS, - MaxRPCMsgSize: 32 * MiB, - MaxGRPCRetries: 5, - GRPCOperationTimeout: intervals.GRPCOperationTimeout(), - GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(), - GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(), - ReportInterval: intervals.ReportInterval(), - ExecutablesCacheElements: 4096, - // Next step: Calculate FramesCacheElements from numCores and samplingRate. - FramesCacheElements: 65536, - CGroupCacheElements: 1024, - SamplesPerSecond: c.config.SamplesPerSecond, - KernelVersion: kernelVersion, - HostName: hostname, - IPAddress: sourceIP, - }) + err = c.reporter.Start(ctx) if err != nil { - return fmt.Errorf("failed to start reporting: %w", err) + return fmt.Errorf("failed to start reporter: %w", err) } - c.reporter = rep - metrics.SetReporter(rep) + metrics.SetReporter(c.reporter) // Now that set the initial host metadata, start a goroutine to keep sending updates regularly. - metadataCollector.StartMetadataCollection(ctx, rep) + metadataCollector.StartMetadataCollection(ctx, c.reporter) // Load the eBPF code and map definitions trc, err := tracer.NewTracer(ctx, &tracer.Config{ - Reporter: rep, + Reporter: c.reporter, Intervals: intervals, IncludeTracers: includeTracers, FilterErrorFrames: !c.config.SendErrorFrames, @@ -167,7 +150,8 @@ func (c *Controller) Start(ctx context.Context) error { // change this log line update also the system test. log.Printf("Attached sched monitor") - if err := startTraceHandling(ctx, rep, intervals, trc, traceHandlerCacheSize); err != nil { + if err := startTraceHandling(ctx, c.reporter, intervals, trc, + traceHandlerCacheSize); err != nil { return fmt.Errorf("failed to start trace handling: %w", err) } diff --git a/main.go b/main.go index 98c3d2b7..7a5e0330 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,9 @@ import ( "golang.org/x/sys/unix" "go.opentelemetry.io/ebpf-profiler/internal/controller" + "go.opentelemetry.io/ebpf-profiler/internal/helpers" + "go.opentelemetry.io/ebpf-profiler/reporter" + "go.opentelemetry.io/ebpf-profiler/times" "go.opentelemetry.io/ebpf-profiler/vc" log "github.com/sirupsen/logrus" @@ -96,6 +99,46 @@ func mainWithExitCode() exitCode { }() } + intervals := times.New(cfg.MonitorInterval, + cfg.ReporterInterval, cfg.ProbabilisticInterval) + + kernelVersion, err := helpers.GetKernelVersion() + if err != nil { + log.Error(err) + return exitFailure + } + + // hostname and sourceIP will be populated from the root namespace. + hostname, sourceIP, err := helpers.GetHostnameAndSourceIP(cfg.CollAgentAddr) + if err != nil { + log.Error(err) + return exitFailure + } + + rep, err := reporter.NewOTLP(&reporter.Config{ + CollAgentAddr: cfg.CollAgentAddr, + DisableTLS: cfg.DisableTLS, + MaxRPCMsgSize: 32 << 20, // 32 MiB + MaxGRPCRetries: 5, + GRPCOperationTimeout: intervals.GRPCOperationTimeout(), + GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(), + GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(), + ReportInterval: intervals.ReportInterval(), + ExecutablesCacheElements: 4096, + // Next step: Calculate FramesCacheElements from numCores and samplingRate. + FramesCacheElements: 65536, + CGroupCacheElements: 1024, + SamplesPerSecond: cfg.SamplesPerSecond, + KernelVersion: kernelVersion, + HostName: hostname, + IPAddress: sourceIP, + }) + if err != nil { + log.Error(err) + return exitFailure + } + cfg.Reporter = rep + log.Infof("Starting OTEL profiling agent %s (revision %s, build timestamp %s)", vc.Version(), vc.Revision(), vc.BuildTimestamp()) diff --git a/reporter/iface.go b/reporter/iface.go index 818d5224..85e4a1b2 100644 --- a/reporter/iface.go +++ b/reporter/iface.go @@ -18,6 +18,13 @@ type Reporter interface { HostMetadataReporter MetricsReporter + // Start starts the reporter in the background. + // + // If the reporter needs to perform a long-running starting operation then it + // is recommended that Start() returns quickly and the long-running operation + // is performed in the background. + Start(context.Context) error + // Stop triggers a graceful shutdown of the reporter. Stop() // GetMetrics returns the reporter internal metrics. diff --git a/reporter/otlp_reporter.go b/reporter/otlp_reporter.go index c76999d3..35d6b056 100644 --- a/reporter/otlp_reporter.go +++ b/reporter/otlp_reporter.go @@ -91,6 +91,7 @@ type attrKeyValue struct { // OTLPReporter receives and transforms information to be OTLP/profiles compliant. type OTLPReporter struct { + config *Config // name is the ScopeProfile's name. name string @@ -144,6 +145,60 @@ type OTLPReporter struct { ipAddress string } +// NewOTLP returns a new instance of OTLPReporter +func NewOTLP(cfg *Config) (*OTLPReporter, error) { + executables, err := + lru.NewSynced[libpf.FileID, execInfo](cfg.ExecutablesCacheElements, libpf.FileID.Hash32) + if err != nil { + return nil, err + } + executables.SetLifetime(1 * time.Hour) // Allow GC to clean stale items. + + frames, err := lru.NewSynced[libpf.FileID, + *xsync.RWMutex[map[libpf.AddressOrLineno]sourceInfo]]( + cfg.FramesCacheElements, libpf.FileID.Hash32) + if err != nil { + return nil, err + } + frames.SetLifetime(1 * time.Hour) // Allow GC to clean stale items. + + cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements, + func(pid libpf.PID) uint32 { return uint32(pid) }) + if err != nil { + return nil, err + } + // Set a lifetime to reduce risk of invalid data in case of PID reuse. + cgroupv2ID.SetLifetime(90 * time.Second) + + // Next step: Dynamically configure the size of this LRU. + // Currently, we use the length of the JSON array in + // hostmetadata/hostmetadata.json. + hostmetadata, err := lru.NewSynced[string, string](115, hashString) + if err != nil { + return nil, err + } + + return &OTLPReporter{ + config: cfg, + name: cfg.Name, + version: cfg.Version, + kernelVersion: cfg.KernelVersion, + hostName: cfg.HostName, + ipAddress: cfg.IPAddress, + samplesPerSecond: cfg.SamplesPerSecond, + hostID: strconv.FormatUint(cfg.HostID, 10), + stopSignal: make(chan libpf.Void), + pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout, + client: nil, + rpcStats: NewStatsHandler(), + executables: executables, + frames: frames, + hostmetadata: hostmetadata, + traceEvents: xsync.NewRWMutex(map[traceAndMetaKey]*traceEvents{}), + cgroupv2ID: cgroupv2ID, + }, nil +} + // hashString is a helper function for LRUs that use string as a key. // Xxh3 turned out to be the fastest hash function for strings in the FreeLRU benchmarks. // It was only outperformed by the AES hash function, which is implemented in Plan9 assembly. @@ -303,73 +358,23 @@ func (r *OTLPReporter) GetMetrics() Metrics { } // Start sets up and manages the reporting connection to a OTLP backend. -func Start(mainCtx context.Context, cfg *Config) (Reporter, error) { - executables, err := - lru.NewSynced[libpf.FileID, execInfo](cfg.ExecutablesCacheElements, libpf.FileID.Hash32) - if err != nil { - return nil, err - } - executables.SetLifetime(1 * time.Hour) // Allow GC to clean stale items. - - frames, err := lru.NewSynced[libpf.FileID, - *xsync.RWMutex[map[libpf.AddressOrLineno]sourceInfo]]( - cfg.FramesCacheElements, libpf.FileID.Hash32) - if err != nil { - return nil, err - } - frames.SetLifetime(1 * time.Hour) // Allow GC to clean stale items. - - cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements, - func(pid libpf.PID) uint32 { return uint32(pid) }) - if err != nil { - return nil, err - } - // Set a lifetime to reduce risk of invalid data in case of PID reuse. - cgroupv2ID.SetLifetime(90 * time.Second) - - // Next step: Dynamically configure the size of this LRU. - // Currently, we use the length of the JSON array in - // hostmetadata/hostmetadata.json. - hostmetadata, err := lru.NewSynced[string, string](115, hashString) - if err != nil { - return nil, err - } - - r := &OTLPReporter{ - name: cfg.Name, - version: cfg.Version, - kernelVersion: cfg.KernelVersion, - hostName: cfg.HostName, - ipAddress: cfg.IPAddress, - samplesPerSecond: cfg.SamplesPerSecond, - hostID: strconv.FormatUint(cfg.HostID, 10), - stopSignal: make(chan libpf.Void), - pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout, - client: nil, - rpcStats: NewStatsHandler(), - executables: executables, - frames: frames, - hostmetadata: hostmetadata, - traceEvents: xsync.NewRWMutex(map[traceAndMetaKey]*traceEvents{}), - cgroupv2ID: cgroupv2ID, - } - +func (r *OTLPReporter) Start(ctx context.Context) error { // Create a child context for reporting features - ctx, cancelReporting := context.WithCancel(mainCtx) + ctx, cancelReporting := context.WithCancel(ctx) // Establish the gRPC connection before going on, waiting for a response // from the collectionAgent endpoint. // Use grpc.WithBlock() in setupGrpcConnection() for this to work. - otlpGrpcConn, err := waitGrpcEndpoint(ctx, cfg, r.rpcStats) + otlpGrpcConn, err := waitGrpcEndpoint(ctx, r.config, r.rpcStats) if err != nil { cancelReporting() close(r.stopSignal) - return nil, err + return err } r.client = otlpcollector.NewProfilesServiceClient(otlpGrpcConn) go func() { - tick := time.NewTicker(cfg.ReportInterval) + tick := time.NewTicker(r.config.ReportInterval) defer tick.Stop() purgeTick := time.NewTicker(5 * time.Minute) defer purgeTick.Stop() @@ -383,7 +388,7 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) { if err := r.reportOTLPProfile(ctx); err != nil { log.Errorf("Request failed: %v", err) } - tick.Reset(libpf.AddJitter(cfg.ReportInterval, 0.2)) + tick.Reset(libpf.AddJitter(r.config.ReportInterval, 0.2)) case <-purgeTick.C: // Allow the GC to purge expired entries to avoid memory leaks. r.executables.PurgeExpired() @@ -404,7 +409,7 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) { } }() - return r, nil + return nil } // reportOTLPProfile creates and sends out an OTLP profile.