From 66e568644207a50591e53d267ce187653552304e Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Fri, 25 Oct 2024 11:27:27 +0200 Subject: [PATCH 1/7] allow passing a custom reporter to the controller --- internal/controller/controller.go | 38 +++++++++++-------------------- internal/controller/options.go | 21 +++++++++++++++++ internal/controller/reporter.go | 38 +++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 25 deletions(-) create mode 100644 internal/controller/options.go create mode 100644 internal/controller/reporter.go diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 8283c0b7..d484bc81 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -30,10 +30,15 @@ type Controller struct { // New creates a new controller // The controller can set global configurations (such as the eBPF syscalls) on // setup. So there should only ever be one running. -func New(cfg *Config) *Controller { - return &Controller{ +func New(cfg *Config, opts ...Option) *Controller { + c := &Controller{ config: cfg, } + for _, opt := range opts { + c = opt.applyOption(c) + } + + return c } // Start starts the controller @@ -84,37 +89,19 @@ func (c *Controller) Start(ctx context.Context) error { metadataCollector.AddCustomData("host.name", hostname) metadataCollector.AddCustomData("host.ip", sourceIP) - // Network operations to CA start here - var rep reporter.Reporter - // Connect to the collection agent - rep, err = reporter.Start(ctx, &reporter.Config{ - CollAgentAddr: c.config.CollAgentAddr, - DisableTLS: c.config.DisableTLS, - MaxRPCMsgSize: 32 << 20, // 32 MiB - MaxGRPCRetries: 5, - GRPCOperationTimeout: intervals.GRPCOperationTimeout(), - GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(), - GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(), - ReportInterval: intervals.ReportInterval(), - CacheSize: traceHandlerCacheSize, - SamplesPerSecond: c.config.SamplesPerSecond, - KernelVersion: kernelVersion, - HostName: hostname, - IPAddress: sourceIP, - }) + err = c.startReporter(ctx, intervals, traceHandlerCacheSize, kernelVersion, hostname, sourceIP) if err != nil { return fmt.Errorf("failed to start reporting: %w", err) } - c.reporter = rep - metrics.SetReporter(rep) + metrics.SetReporter(c.reporter) // Now that set the initial host metadata, start a goroutine to keep sending updates regularly. - metadataCollector.StartMetadataCollection(ctx, rep) + metadataCollector.StartMetadataCollection(ctx, c.reporter) // Load the eBPF code and map definitions trc, err := tracer.NewTracer(ctx, &tracer.Config{ - Reporter: rep, + Reporter: c.reporter, Intervals: intervals, IncludeTracers: includeTracers, FilterErrorFrames: !c.config.SendErrorFrames, @@ -163,7 +150,8 @@ func (c *Controller) Start(ctx context.Context) error { // change this log line update also the system test. log.Printf("Attached sched monitor") - if err := startTraceHandling(ctx, rep, intervals, trc, traceHandlerCacheSize); err != nil { + if err := startTraceHandling(ctx, c.reporter, intervals, trc, + traceHandlerCacheSize); err != nil { return fmt.Errorf("failed to start trace handling: %w", err) } diff --git a/internal/controller/options.go b/internal/controller/options.go new file mode 100644 index 00000000..4ca123c9 --- /dev/null +++ b/internal/controller/options.go @@ -0,0 +1,21 @@ +package controller // import "go.opentelemetry.io/ebpf-profiler/internal/controller" + +import "go.opentelemetry.io/ebpf-profiler/reporter" + +type Option interface { + applyOption(*Controller) *Controller +} +type controllerOptionFunc func(*Controller) *Controller + +func (f controllerOptionFunc) applyOption(c *Controller) *Controller { + return f(c) +} + +// WithReporter sets a custom reporter that will be run for that controller. +// This defaults to [reporter.OTLPReporter] +func WithReporter(rep reporter.Reporter) Option { + return controllerOptionFunc(func(c *Controller) *Controller { + c.reporter = rep + return c + }) +} diff --git a/internal/controller/reporter.go b/internal/controller/reporter.go new file mode 100644 index 00000000..021286e8 --- /dev/null +++ b/internal/controller/reporter.go @@ -0,0 +1,38 @@ +package controller // import "go.opentelemetry.io/ebpf-profiler/internal/controller" + +import ( + "context" + + "go.opentelemetry.io/ebpf-profiler/reporter" + "go.opentelemetry.io/ebpf-profiler/times" +) + +// startReporter sets up the reporter on the controller +func (c *Controller) startReporter(ctx context.Context, intervals *times.Times, + traceHandlerCacheSize uint32, kernelVersion, hostname, sourceIP string) error { + if c.reporter != nil { + return nil + } + + rep, err := reporter.Start(ctx, &reporter.Config{ + CollAgentAddr: c.config.CollAgentAddr, + DisableTLS: c.config.DisableTLS, + MaxRPCMsgSize: 32 << 20, // 32 MiB + MaxGRPCRetries: 5, + GRPCOperationTimeout: intervals.GRPCOperationTimeout(), + GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(), + GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(), + ReportInterval: intervals.ReportInterval(), + CacheSize: traceHandlerCacheSize, + SamplesPerSecond: c.config.SamplesPerSecond, + KernelVersion: kernelVersion, + HostName: hostname, + IPAddress: sourceIP, + }) + if err != nil { + return err + } + c.reporter = rep + + return nil +} From cea27757ea820736278f516c3f1a71b5dbdf184f Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Mon, 28 Oct 2024 09:42:27 +0100 Subject: [PATCH 2/7] move the reporter back into the main controller method --- internal/controller/controller.go | 24 ++++++++++++++++--- internal/controller/reporter.go | 38 ------------------------------- 2 files changed, 21 insertions(+), 41 deletions(-) delete mode 100644 internal/controller/reporter.go diff --git a/internal/controller/controller.go b/internal/controller/controller.go index fdc69bbe..ebe171d9 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -89,9 +89,27 @@ func (c *Controller) Start(ctx context.Context) error { metadataCollector.AddCustomData("host.name", hostname) metadataCollector.AddCustomData("host.ip", sourceIP) - err = c.startReporter(ctx, intervals, traceHandlerCacheSize, kernelVersion, hostname, sourceIP) - if err != nil { - return fmt.Errorf("failed to start reporting: %w", err) + if c.reporter == nil { + var rep reporter.Reporter + rep, err = reporter.Start(ctx, &reporter.Config{ + CollAgentAddr: c.config.CollAgentAddr, + DisableTLS: c.config.DisableTLS, + MaxRPCMsgSize: 32 << 20, // 32 MiB + MaxGRPCRetries: 5, + GRPCOperationTimeout: intervals.GRPCOperationTimeout(), + GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(), + GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(), + ReportInterval: intervals.ReportInterval(), + CacheSize: traceHandlerCacheSize, + SamplesPerSecond: c.config.SamplesPerSecond, + KernelVersion: kernelVersion, + HostName: hostname, + IPAddress: sourceIP, + }) + if err != nil { + return fmt.Errorf("failed to start reporting: %w", err) + } + c.reporter = rep } metrics.SetReporter(c.reporter) diff --git a/internal/controller/reporter.go b/internal/controller/reporter.go deleted file mode 100644 index 021286e8..00000000 --- a/internal/controller/reporter.go +++ /dev/null @@ -1,38 +0,0 @@ -package controller // import "go.opentelemetry.io/ebpf-profiler/internal/controller" - -import ( - "context" - - "go.opentelemetry.io/ebpf-profiler/reporter" - "go.opentelemetry.io/ebpf-profiler/times" -) - -// startReporter sets up the reporter on the controller -func (c *Controller) startReporter(ctx context.Context, intervals *times.Times, - traceHandlerCacheSize uint32, kernelVersion, hostname, sourceIP string) error { - if c.reporter != nil { - return nil - } - - rep, err := reporter.Start(ctx, &reporter.Config{ - CollAgentAddr: c.config.CollAgentAddr, - DisableTLS: c.config.DisableTLS, - MaxRPCMsgSize: 32 << 20, // 32 MiB - MaxGRPCRetries: 5, - GRPCOperationTimeout: intervals.GRPCOperationTimeout(), - GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(), - GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(), - ReportInterval: intervals.ReportInterval(), - CacheSize: traceHandlerCacheSize, - SamplesPerSecond: c.config.SamplesPerSecond, - KernelVersion: kernelVersion, - HostName: hostname, - IPAddress: sourceIP, - }) - if err != nil { - return err - } - c.reporter = rep - - return nil -} From 850b83761c36e5db15c7412f578199087d49ed56 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Mon, 28 Oct 2024 10:16:53 +0100 Subject: [PATCH 3/7] mention that the controller should only be started once --- internal/controller/controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index ebe171d9..074ce7ce 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -42,6 +42,7 @@ func New(cfg *Config, opts ...Option) *Controller { } // Start starts the controller +// The controller should only be started once. func (c *Controller) Start(ctx context.Context) error { if err := tracer.ProbeBPFSyscall(); err != nil { return fmt.Errorf("failed to probe eBPF syscall: %w", err) From c9f21866155804099caaf9f5cb66e7058539c7ed Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Mon, 28 Oct 2024 13:38:37 +0100 Subject: [PATCH 4/7] start making otlp reporter stateless --- internal/controller/config.go | 3 + internal/controller/controller.go | 32 ++------- internal/controller/options.go | 21 ------ main.go | 22 ++++++ reporter/iface.go | 7 ++ reporter/otlp_reporter.go | 111 ++++++++++++++++-------------- 6 files changed, 96 insertions(+), 100 deletions(-) delete mode 100644 internal/controller/options.go diff --git a/internal/controller/config.go b/internal/controller/config.go index d8e73811..16daddc6 100644 --- a/internal/controller/config.go +++ b/internal/controller/config.go @@ -8,6 +8,7 @@ import ( "time" log "github.com/sirupsen/logrus" + "go.opentelemetry.io/ebpf-profiler/reporter" "go.opentelemetry.io/ebpf-profiler/tracer" ) @@ -30,6 +31,8 @@ type Config struct { VerboseMode bool Version bool + Reporter reporter.Reporter + Fs *flag.FlagSet } diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 074ce7ce..10fa4375 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -30,12 +30,10 @@ type Controller struct { // New creates a new controller // The controller can set global configurations (such as the eBPF syscalls) on // setup. So there should only ever be one running. -func New(cfg *Config, opts ...Option) *Controller { +func New(cfg *Config) *Controller { c := &Controller{ - config: cfg, - } - for _, opt := range opts { - c = opt.applyOption(c) + config: cfg, + reporter: cfg.Reporter, } return c @@ -90,27 +88,9 @@ func (c *Controller) Start(ctx context.Context) error { metadataCollector.AddCustomData("host.name", hostname) metadataCollector.AddCustomData("host.ip", sourceIP) - if c.reporter == nil { - var rep reporter.Reporter - rep, err = reporter.Start(ctx, &reporter.Config{ - CollAgentAddr: c.config.CollAgentAddr, - DisableTLS: c.config.DisableTLS, - MaxRPCMsgSize: 32 << 20, // 32 MiB - MaxGRPCRetries: 5, - GRPCOperationTimeout: intervals.GRPCOperationTimeout(), - GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(), - GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(), - ReportInterval: intervals.ReportInterval(), - CacheSize: traceHandlerCacheSize, - SamplesPerSecond: c.config.SamplesPerSecond, - KernelVersion: kernelVersion, - HostName: hostname, - IPAddress: sourceIP, - }) - if err != nil { - return fmt.Errorf("failed to start reporting: %w", err) - } - c.reporter = rep + err = c.reporter.Start(ctx) + if err != nil { + return fmt.Errorf("failed to start reporter: %w", err) } metrics.SetReporter(c.reporter) diff --git a/internal/controller/options.go b/internal/controller/options.go deleted file mode 100644 index 4ca123c9..00000000 --- a/internal/controller/options.go +++ /dev/null @@ -1,21 +0,0 @@ -package controller // import "go.opentelemetry.io/ebpf-profiler/internal/controller" - -import "go.opentelemetry.io/ebpf-profiler/reporter" - -type Option interface { - applyOption(*Controller) *Controller -} -type controllerOptionFunc func(*Controller) *Controller - -func (f controllerOptionFunc) applyOption(c *Controller) *Controller { - return f(c) -} - -// WithReporter sets a custom reporter that will be run for that controller. -// This defaults to [reporter.OTLPReporter] -func WithReporter(rep reporter.Reporter) Option { - return controllerOptionFunc(func(c *Controller) *Controller { - c.reporter = rep - return c - }) -} diff --git a/main.go b/main.go index 98c3d2b7..a1e7394b 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,8 @@ import ( "golang.org/x/sys/unix" "go.opentelemetry.io/ebpf-profiler/internal/controller" + "go.opentelemetry.io/ebpf-profiler/reporter" + "go.opentelemetry.io/ebpf-profiler/times" "go.opentelemetry.io/ebpf-profiler/vc" log "github.com/sirupsen/logrus" @@ -96,6 +98,26 @@ func mainWithExitCode() exitCode { }() } + intervals := times.New(cfg.MonitorInterval, + cfg.ReporterInterval, cfg.ProbabilisticInterval) + + rep, err := reporter.NewOTLP(&reporter.Config{ + CollAgentAddr: cfg.CollAgentAddr, + DisableTLS: cfg.DisableTLS, + MaxRPCMsgSize: 32 << 20, // 32 MiB + MaxGRPCRetries: 5, + GRPCOperationTimeout: intervals.GRPCOperationTimeout(), + GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(), + GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(), + ReportInterval: intervals.ReportInterval(), + SamplesPerSecond: cfg.SamplesPerSecond, + }) + if err != nil { + log.Error(err) + return exitFailure + } + cfg.Reporter = rep + log.Infof("Starting OTEL profiling agent %s (revision %s, build timestamp %s)", vc.Version(), vc.Revision(), vc.BuildTimestamp()) diff --git a/reporter/iface.go b/reporter/iface.go index 818d5224..85e4a1b2 100644 --- a/reporter/iface.go +++ b/reporter/iface.go @@ -18,6 +18,13 @@ type Reporter interface { HostMetadataReporter MetricsReporter + // Start starts the reporter in the background. + // + // If the reporter needs to perform a long-running starting operation then it + // is recommended that Start() returns quickly and the long-running operation + // is performed in the background. + Start(context.Context) error + // Stop triggers a graceful shutdown of the reporter. Stop() // GetMetrics returns the reporter internal metrics. diff --git a/reporter/otlp_reporter.go b/reporter/otlp_reporter.go index ec08578e..3ca81e00 100644 --- a/reporter/otlp_reporter.go +++ b/reporter/otlp_reporter.go @@ -91,6 +91,7 @@ type attrKeyValue struct { // OTLPReporter receives and transforms information to be OTLP/profiles compliant. type OTLPReporter struct { + config *Config // name is the ScopeProfile's name. name string @@ -144,6 +145,56 @@ type OTLPReporter struct { ipAddress string } +// NewOTLP returns a new instance of OTLPReporter +func NewOTLP(cfg *Config) (*OTLPReporter, error) { + executables, err := lru.NewSynced[libpf.FileID, execInfo](cfg.CacheSize, libpf.FileID.Hash32) + if err != nil { + return nil, err + } + + frames, err := lru.NewSynced[libpf.FileID, + *xsync.RWMutex[map[libpf.AddressOrLineno]sourceInfo]](cfg.CacheSize, libpf.FileID.Hash32) + if err != nil { + return nil, err + } + + // Next step: Dynamically configure the size of this LRU. + // Currently, we use the length of the JSON array in + // hostmetadata/hostmetadata.json. + hostmetadata, err := lru.NewSynced[string, string](115, hashString) + if err != nil { + return nil, err + } + + cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CacheSize, + func(pid libpf.PID) uint32 { return uint32(pid) }) + if err != nil { + return nil, err + } + // Set a lifetime to reduce risk of invalid data in case of PID reuse. + cgroupv2ID.SetLifetime(90 * time.Second) + + return &OTLPReporter{ + config: cfg, + name: cfg.Name, + version: cfg.Version, + kernelVersion: cfg.KernelVersion, + hostName: cfg.HostName, + ipAddress: cfg.IPAddress, + samplesPerSecond: cfg.SamplesPerSecond, + hostID: strconv.FormatUint(cfg.HostID, 10), + stopSignal: make(chan libpf.Void), + pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout, + client: nil, + rpcStats: NewStatsHandler(), + executables: executables, + frames: frames, + hostmetadata: hostmetadata, + traceEvents: xsync.NewRWMutex(map[traceAndMetaKey]*traceEvents{}), + cgroupv2ID: cgroupv2ID, + }, nil +} + // hashString is a helper function for LRUs that use string as a key. // Xxh3 turned out to be the fastest hash function for strings in the FreeLRU benchmarks. // It was only outperformed by the AES hash function, which is implemented in Plan9 assembly. @@ -303,69 +354,23 @@ func (r *OTLPReporter) GetMetrics() Metrics { } // Start sets up and manages the reporting connection to a OTLP backend. -func Start(mainCtx context.Context, cfg *Config) (Reporter, error) { - executables, err := lru.NewSynced[libpf.FileID, execInfo](cfg.CacheSize, libpf.FileID.Hash32) - if err != nil { - return nil, err - } - - frames, err := lru.NewSynced[libpf.FileID, - *xsync.RWMutex[map[libpf.AddressOrLineno]sourceInfo]](cfg.CacheSize, libpf.FileID.Hash32) - if err != nil { - return nil, err - } - - cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CacheSize, - func(pid libpf.PID) uint32 { return uint32(pid) }) - if err != nil { - return nil, err - } - // Set a lifetime to reduce risk of invalid data in case of PID reuse. - cgroupv2ID.SetLifetime(90 * time.Second) - - // Next step: Dynamically configure the size of this LRU. - // Currently, we use the length of the JSON array in - // hostmetadata/hostmetadata.json. - hostmetadata, err := lru.NewSynced[string, string](115, hashString) - if err != nil { - return nil, err - } - - r := &OTLPReporter{ - name: cfg.Name, - version: cfg.Version, - kernelVersion: cfg.KernelVersion, - hostName: cfg.HostName, - ipAddress: cfg.IPAddress, - samplesPerSecond: cfg.SamplesPerSecond, - hostID: strconv.FormatUint(cfg.HostID, 10), - stopSignal: make(chan libpf.Void), - pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout, - client: nil, - rpcStats: NewStatsHandler(), - executables: executables, - frames: frames, - hostmetadata: hostmetadata, - traceEvents: xsync.NewRWMutex(map[traceAndMetaKey]*traceEvents{}), - cgroupv2ID: cgroupv2ID, - } - +func (r *OTLPReporter) Start(ctx context.Context) error { // Create a child context for reporting features - ctx, cancelReporting := context.WithCancel(mainCtx) + ctx, cancelReporting := context.WithCancel(ctx) // Establish the gRPC connection before going on, waiting for a response // from the collectionAgent endpoint. // Use grpc.WithBlock() in setupGrpcConnection() for this to work. - otlpGrpcConn, err := waitGrpcEndpoint(ctx, cfg, r.rpcStats) + otlpGrpcConn, err := waitGrpcEndpoint(ctx, r.config, r.rpcStats) if err != nil { cancelReporting() close(r.stopSignal) - return nil, err + return err } r.client = otlpcollector.NewProfilesServiceClient(otlpGrpcConn) go func() { - tick := time.NewTicker(cfg.ReportInterval) + tick := time.NewTicker(r.config.ReportInterval) defer tick.Stop() for { select { @@ -377,7 +382,7 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) { if err := r.reportOTLPProfile(ctx); err != nil { log.Errorf("Request failed: %v", err) } - tick.Reset(libpf.AddJitter(cfg.ReportInterval, 0.2)) + tick.Reset(libpf.AddJitter(r.config.ReportInterval, 0.2)) } } }() @@ -393,7 +398,7 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) { } }() - return r, nil + return nil } // reportOTLPProfile creates and sends out an OTLP profile. From b912102de534a364d7617b2e4af3af8b7abec0d7 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Mon, 28 Oct 2024 13:50:00 +0100 Subject: [PATCH 5/7] retrieve trace cache size in main.go --- internal/controller/cache_size.go | 38 +++++++++++++++++++++++++++++++ internal/controller/controller.go | 36 +++-------------------------- main.go | 10 ++++++++ 3 files changed, 51 insertions(+), 33 deletions(-) create mode 100644 internal/controller/cache_size.go diff --git a/internal/controller/cache_size.go b/internal/controller/cache_size.go new file mode 100644 index 00000000..d742522b --- /dev/null +++ b/internal/controller/cache_size.go @@ -0,0 +1,38 @@ +package controller // import "go.opentelemetry.io/ebpf-profiler/internal/controller" + +import ( + "fmt" + "time" + + "github.com/tklauser/numcpus" + "go.opentelemetry.io/ebpf-profiler/util" +) + +// TraceCacheSize defines the maximum number of elements for the caches in tracehandler. +// The caches in tracehandler have a size-"processing overhead" trade-off: Every cache miss will +// trigger additional processing for that trace in userspace (Go). For most maps, we use +// maxElementsPerInterval as a base sizing factor. For the tracehandler caches, we also multiply +// with traceCacheIntervals. For typical/small values of maxElementsPerInterval, this can lead to +// non-optimal map sizing (reduced cache_hit:cache_miss ratio and increased processing overhead). +// Simply increasing traceCacheIntervals is problematic when maxElementsPerInterval is large +// (e.g. too many CPU cores present) as we end up using too much memory. A minimum size is +// therefore used here. +func TraceCacheSize(monitorInterval time.Duration, samplesPerSecond int) (uint32, error) { + const ( + traceCacheIntervals = 6 + traceCacheMinSize = 65536 + ) + + presentCores, err := numcpus.GetPresent() + if err != nil { + return 0, fmt.Errorf("failed to read CPU file: %w", err) + } + + maxElements := maxElementsPerInterval(monitorInterval, samplesPerSecond, uint16(presentCores)) + + size := maxElements * uint32(traceCacheIntervals) + if size < traceCacheMinSize { + size = traceCacheMinSize + } + return util.NextPowerOfTwo(size), nil +} diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 10fa4375..2bdaa9c5 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -6,7 +6,6 @@ import ( "time" log "github.com/sirupsen/logrus" - "github.com/tklauser/numcpus" "go.opentelemetry.io/ebpf-profiler/host" "go.opentelemetry.io/ebpf-profiler/hostmetadata" @@ -17,7 +16,6 @@ import ( "go.opentelemetry.io/ebpf-profiler/tracehandler" "go.opentelemetry.io/ebpf-profiler/tracer" tracertypes "go.opentelemetry.io/ebpf-profiler/tracer/types" - "go.opentelemetry.io/ebpf-profiler/util" ) // Controller is an instance that runs, manages and stops the agent. @@ -50,14 +48,12 @@ func (c *Controller) Start(ctx context.Context) error { return fmt.Errorf("failed to probe tracepoint: %w", err) } - presentCores, err := numcpus.GetPresent() + traceHandlerCacheSize, err := + TraceCacheSize(c.config.MonitorInterval, c.config.SamplesPerSecond) if err != nil { - return fmt.Errorf("failed to read CPU file: %w", err) + return fmt.Errorf("retrieve trace cache size: %w", err) } - traceHandlerCacheSize := - traceCacheSize(c.config.MonitorInterval, c.config.SamplesPerSecond, uint16(presentCores)) - intervals := times.New(c.config.MonitorInterval, c.config.ReporterInterval, c.config.ProbabilisticInterval) @@ -182,32 +178,6 @@ func startTraceHandling(ctx context.Context, rep reporter.TraceReporter, return err } -// traceCacheSize defines the maximum number of elements for the caches in tracehandler. -// -// The caches in tracehandler have a size-"processing overhead" trade-off: Every cache miss will -// trigger additional processing for that trace in userspace (Go). For most maps, we use -// maxElementsPerInterval as a base sizing factor. For the tracehandler caches, we also multiply -// with traceCacheIntervals. For typical/small values of maxElementsPerInterval, this can lead to -// non-optimal map sizing (reduced cache_hit:cache_miss ratio and increased processing overhead). -// Simply increasing traceCacheIntervals is problematic when maxElementsPerInterval is large -// (e.g. too many CPU cores present) as we end up using too much memory. A minimum size is -// therefore used here. -func traceCacheSize(monitorInterval time.Duration, samplesPerSecond int, - presentCPUCores uint16) uint32 { - const ( - traceCacheIntervals = 6 - traceCacheMinSize = 65536 - ) - - maxElements := maxElementsPerInterval(monitorInterval, samplesPerSecond, presentCPUCores) - - size := maxElements * uint32(traceCacheIntervals) - if size < traceCacheMinSize { - size = traceCacheMinSize - } - return util.NextPowerOfTwo(size) -} - func maxElementsPerInterval(monitorInterval time.Duration, samplesPerSecond int, presentCPUCores uint16) uint32 { return uint32(uint16(samplesPerSecond) * uint16(monitorInterval.Seconds()) * presentCPUCores) diff --git a/main.go b/main.go index a1e7394b..4da03219 100644 --- a/main.go +++ b/main.go @@ -101,6 +101,12 @@ func mainWithExitCode() exitCode { intervals := times.New(cfg.MonitorInterval, cfg.ReporterInterval, cfg.ProbabilisticInterval) + tcs, err := controller.TraceCacheSize(cfg.MonitorInterval, cfg.SamplesPerSecond) + if err != nil { + log.Error(err) + return exitFailure + } + rep, err := reporter.NewOTLP(&reporter.Config{ CollAgentAddr: cfg.CollAgentAddr, DisableTLS: cfg.DisableTLS, @@ -110,7 +116,11 @@ func mainWithExitCode() exitCode { GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(), GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(), ReportInterval: intervals.ReportInterval(), + CacheSize: tcs, SamplesPerSecond: cfg.SamplesPerSecond, + KernelVersion: "", + HostName: "", + IPAddress: "", }) if err != nil { log.Error(err) From c167bdfaab918e7ead53b4c4ccaf9d16ed7a6984 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Mon, 28 Oct 2024 13:51:16 +0100 Subject: [PATCH 6/7] retrieve kernel version, hostname and source IP --- main.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index 4da03219..92860823 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,7 @@ import ( "golang.org/x/sys/unix" "go.opentelemetry.io/ebpf-profiler/internal/controller" + "go.opentelemetry.io/ebpf-profiler/internal/helpers" "go.opentelemetry.io/ebpf-profiler/reporter" "go.opentelemetry.io/ebpf-profiler/times" "go.opentelemetry.io/ebpf-profiler/vc" @@ -107,6 +108,19 @@ func mainWithExitCode() exitCode { return exitFailure } + kernelVersion, err := helpers.GetKernelVersion() + if err != nil { + log.Error(err) + return exitFailure + } + + // hostname and sourceIP will be populated from the root namespace. + hostname, sourceIP, err := helpers.GetHostnameAndSourceIP(cfg.CollAgentAddr) + if err != nil { + log.Error(err) + return exitFailure + } + rep, err := reporter.NewOTLP(&reporter.Config{ CollAgentAddr: cfg.CollAgentAddr, DisableTLS: cfg.DisableTLS, @@ -118,9 +132,9 @@ func mainWithExitCode() exitCode { ReportInterval: intervals.ReportInterval(), CacheSize: tcs, SamplesPerSecond: cfg.SamplesPerSecond, - KernelVersion: "", - HostName: "", - IPAddress: "", + KernelVersion: kernelVersion, + HostName: hostname, + IPAddress: sourceIP, }) if err != nil { log.Error(err) From 33fdb98c8856e372a46bc42c17d5b7ee78815464 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Wed, 30 Oct 2024 14:32:52 +0100 Subject: [PATCH 7/7] move traceCacheSize back into a private method --- internal/controller/cache_size.go | 38 ------------------------------- internal/controller/controller.go | 36 ++++++++++++++++++++++++++--- 2 files changed, 33 insertions(+), 41 deletions(-) delete mode 100644 internal/controller/cache_size.go diff --git a/internal/controller/cache_size.go b/internal/controller/cache_size.go deleted file mode 100644 index d742522b..00000000 --- a/internal/controller/cache_size.go +++ /dev/null @@ -1,38 +0,0 @@ -package controller // import "go.opentelemetry.io/ebpf-profiler/internal/controller" - -import ( - "fmt" - "time" - - "github.com/tklauser/numcpus" - "go.opentelemetry.io/ebpf-profiler/util" -) - -// TraceCacheSize defines the maximum number of elements for the caches in tracehandler. -// The caches in tracehandler have a size-"processing overhead" trade-off: Every cache miss will -// trigger additional processing for that trace in userspace (Go). For most maps, we use -// maxElementsPerInterval as a base sizing factor. For the tracehandler caches, we also multiply -// with traceCacheIntervals. For typical/small values of maxElementsPerInterval, this can lead to -// non-optimal map sizing (reduced cache_hit:cache_miss ratio and increased processing overhead). -// Simply increasing traceCacheIntervals is problematic when maxElementsPerInterval is large -// (e.g. too many CPU cores present) as we end up using too much memory. A minimum size is -// therefore used here. -func TraceCacheSize(monitorInterval time.Duration, samplesPerSecond int) (uint32, error) { - const ( - traceCacheIntervals = 6 - traceCacheMinSize = 65536 - ) - - presentCores, err := numcpus.GetPresent() - if err != nil { - return 0, fmt.Errorf("failed to read CPU file: %w", err) - } - - maxElements := maxElementsPerInterval(monitorInterval, samplesPerSecond, uint16(presentCores)) - - size := maxElements * uint32(traceCacheIntervals) - if size < traceCacheMinSize { - size = traceCacheMinSize - } - return util.NextPowerOfTwo(size), nil -} diff --git a/internal/controller/controller.go b/internal/controller/controller.go index df4f26b2..6f9d6d4e 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -6,6 +6,7 @@ import ( "time" log "github.com/sirupsen/logrus" + "github.com/tklauser/numcpus" "go.opentelemetry.io/ebpf-profiler/host" "go.opentelemetry.io/ebpf-profiler/hostmetadata" @@ -16,6 +17,7 @@ import ( "go.opentelemetry.io/ebpf-profiler/tracehandler" "go.opentelemetry.io/ebpf-profiler/tracer" tracertypes "go.opentelemetry.io/ebpf-profiler/tracer/types" + "go.opentelemetry.io/ebpf-profiler/util" ) const MiB = 1 << 20 @@ -50,12 +52,14 @@ func (c *Controller) Start(ctx context.Context) error { return fmt.Errorf("failed to probe tracepoint: %w", err) } - traceHandlerCacheSize, err := - TraceCacheSize(c.config.MonitorInterval, c.config.SamplesPerSecond) + presentCores, err := numcpus.GetPresent() if err != nil { - return fmt.Errorf("retrieve trace cache size: %w", err) + return fmt.Errorf("failed to read CPU file: %w", err) } + traceHandlerCacheSize := + traceCacheSize(c.config.MonitorInterval, c.config.SamplesPerSecond, uint16(presentCores)) + intervals := times.New(c.config.MonitorInterval, c.config.ReporterInterval, c.config.ProbabilisticInterval) @@ -180,6 +184,32 @@ func startTraceHandling(ctx context.Context, rep reporter.TraceReporter, return err } +// traceCacheSize defines the maximum number of elements for the caches in tracehandler. +// +// The caches in tracehandler have a size-"processing overhead" trade-off: Every cache miss will +// trigger additional processing for that trace in userspace (Go). For most maps, we use +// maxElementsPerInterval as a base sizing factor. For the tracehandler caches, we also multiply +// with traceCacheIntervals. For typical/small values of maxElementsPerInterval, this can lead to +// non-optimal map sizing (reduced cache_hit:cache_miss ratio and increased processing overhead). +// Simply increasing traceCacheIntervals is problematic when maxElementsPerInterval is large +// (e.g. too many CPU cores present) as we end up using too much memory. A minimum size is +// therefore used here. +func traceCacheSize(monitorInterval time.Duration, samplesPerSecond int, + presentCPUCores uint16) uint32 { + const ( + traceCacheIntervals = 6 + traceCacheMinSize = 65536 + ) + + maxElements := maxElementsPerInterval(monitorInterval, samplesPerSecond, presentCPUCores) + + size := maxElements * uint32(traceCacheIntervals) + if size < traceCacheMinSize { + size = traceCacheMinSize + } + return util.NextPowerOfTwo(size) +} + func maxElementsPerInterval(monitorInterval time.Duration, samplesPerSecond int, presentCPUCores uint16) uint32 { return uint32(uint16(samplesPerSecond) * uint16(monitorInterval.Seconds()) * presentCPUCores)