Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename process group to process details #2032

Merged
merged 6 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 37 additions & 37 deletions instrumentation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ var (
// The manager will apply the configuration to all instrumentations that match the config group.
type ConfigUpdate[configGroup ConfigGroup] map[configGroup]Config

type instrumentationDetails[processGroup ProcessGroup, configGroup ConfigGroup] struct {
type instrumentationDetails[processDetails ProcessDetails, configGroup ConfigGroup] struct {
// we want to track the instrumentation even if it failed to load, to be able to report the error
// and clean up the reporter resources once the process exits.
// hence, this might be nil if the instrumentation failed to load.
inst Instrumentation
pg processGroup
pd processDetails
cg configGroup
}

type ManagerOptions[processGroup ProcessGroup, configGroup ConfigGroup] struct {
type ManagerOptions[processDetails ProcessDetails, configGroup ConfigGroup] struct {
Logger logr.Logger

// Factories is a map of OTel distributions to their corresponding instrumentation factories.
Expand All @@ -46,7 +46,7 @@ type ManagerOptions[processGroup ProcessGroup, configGroup ConfigGroup] struct {
// based on the process event.
//
// The handler is also used to report the instrumentation lifecycle events.
Handler *Handler[processGroup, configGroup]
Handler *Handler[processDetails, configGroup]

// DetectorOptions is a list of options to configure the process detector.
//
Expand All @@ -69,27 +69,27 @@ type Manager interface {
Run(ctx context.Context) error
}

type manager[processGroup ProcessGroup, configGroup ConfigGroup] struct {
type manager[processDetails ProcessDetails, configGroup ConfigGroup] struct {
// channel for receiving process events,
// used to detect new processes and process exits, and handle their instrumentation accordingly.
procEvents <-chan detector.ProcessEvent
detector detector.Detector
handler *Handler[processGroup, configGroup]
handler *Handler[processDetails, configGroup]
factories map[OtelDistribution]Factory
logger logr.Logger

// all the created instrumentations by pid,
// this map is not concurrent safe, so it should be accessed only from the main event loop
detailsByPid map[int]*instrumentationDetails[processGroup, configGroup]
detailsByPid map[int]*instrumentationDetails[processDetails, configGroup]

// instrumentations by workload, and aggregated by pid
// this map is not concurrent safe, so it should be accessed only from the main event loop
detailsByWorkload map[configGroup]map[int]*instrumentationDetails[processGroup, configGroup]
detailsByWorkload map[configGroup]map[int]*instrumentationDetails[processDetails, configGroup]

configUpdates <-chan ConfigUpdate[configGroup]
}

func NewManager[processGroup ProcessGroup, configGroup ConfigGroup](options ManagerOptions[processGroup, configGroup]) (Manager, error) {
func NewManager[processDetails ProcessDetails, configGroup ConfigGroup](options ManagerOptions[processDetails, configGroup]) (Manager, error) {
handler := options.Handler
if handler == nil {
return nil, errors.New("handler is required for ebpf instrumentation manager")
Expand All @@ -99,7 +99,7 @@ func NewManager[processGroup ProcessGroup, configGroup ConfigGroup](options Mana
return nil, errors.New("reporter is required for ebpf instrumentation manager")
}

if handler.ProcessGroupResolver == nil {
if handler.ProcessDetailsResolver == nil {
return nil, errors.New("details resolver is required for ebpf instrumentation manager")
}

Expand All @@ -126,19 +126,19 @@ func NewManager[processGroup ProcessGroup, configGroup ConfigGroup](options Mana
return nil, fmt.Errorf("failed to create process detector: %w", err)
}

return &manager[processGroup, configGroup]{
return &manager[processDetails, configGroup]{
procEvents: procEvents,
detector: detector,
handler: handler,
factories: options.Factories,
logger: logger.WithName("ebpf-instrumentation-manager"),
detailsByPid: make(map[int]*instrumentationDetails[processGroup, configGroup]),
detailsByWorkload: map[configGroup]map[int]*instrumentationDetails[processGroup, configGroup]{},
detailsByPid: make(map[int]*instrumentationDetails[processDetails, configGroup]),
detailsByWorkload: map[configGroup]map[int]*instrumentationDetails[processDetails, configGroup]{},
configUpdates: options.ConfigUpdates,
}, nil
}

func (m *manager[ProcessGroup, ConfigGroup]) runEventLoop(ctx context.Context) {
func (m *manager[ProcessDetails, ConfigGroup]) runEventLoop(ctx context.Context) {
// main event loop for handling instrumentations
for {
select {
Expand Down Expand Up @@ -182,7 +182,7 @@ func (m *manager[ProcessGroup, ConfigGroup]) runEventLoop(ctx context.Context) {
}
}

func (m *manager[ProcessGroup, ConfigGroup]) Run(ctx context.Context) error {
func (m *manager[ProcessDetails, ConfigGroup]) Run(ctx context.Context) error {
g, errCtx := errgroup.WithContext(ctx)

g.Go(func() error {
Expand All @@ -198,14 +198,14 @@ func (m *manager[ProcessGroup, ConfigGroup]) Run(ctx context.Context) error {
return err
}

func (m *manager[ProcessGroup, ConfigGroup]) cleanInstrumentation(ctx context.Context, pid int) {
func (m *manager[ProcessDetails, ConfigGroup]) cleanInstrumentation(ctx context.Context, pid int) {
details, found := m.detailsByPid[pid]
if !found {
m.logger.V(3).Info("no instrumentation found for exiting pid, nothing to clean", "pid", pid)
return
}

m.logger.Info("cleaning instrumentation resources", "pid", pid, "process group details", details.pg)
m.logger.Info("cleaning instrumentation resources", "pid", pid, "process group details", details.pd)

if details.inst != nil {
err := details.inst.Close(ctx)
Expand All @@ -214,15 +214,15 @@ func (m *manager[ProcessGroup, ConfigGroup]) cleanInstrumentation(ctx context.Co
}
}

err := m.handler.Reporter.OnExit(ctx, pid, details.pg)
err := m.handler.Reporter.OnExit(ctx, pid, details.pd)
if err != nil {
m.logger.Error(err, "failed to report instrumentation exit")
}

m.stopTrackInstrumentation(pid)
}

func (m *manager[ProcessGroup, ConfigGroup]) handleProcessExecEvent(ctx context.Context, e detector.ProcessEvent) error {
func (m *manager[ProcessDetails, ConfigGroup]) handleProcessExecEvent(ctx context.Context, e detector.ProcessEvent) error {
if details, found := m.detailsByPid[e.PID]; found && details.inst != nil {
// this can happen if we have multiple exec events for the same pid (chain loading)
// TODO: better handle this?
Expand All @@ -232,17 +232,17 @@ func (m *manager[ProcessGroup, ConfigGroup]) handleProcessExecEvent(ctx context.
return nil
}

pg, err := m.handler.ProcessGroupResolver.Resolve(ctx, e)
pd, err := m.handler.ProcessDetailsResolver.Resolve(ctx, e)
if err != nil {
return errors.Join(err, errFailedToGetDetails)
}

otelDisto, err := m.handler.DistributionMatcher.Distribution(ctx, pg)
otelDisto, err := m.handler.DistributionMatcher.Distribution(ctx, pd)
if err != nil {
return errors.Join(err, errFailedToGetDistribution)
}

configGroup, err := m.handler.ConfigGroupResolver.Resolve(ctx, pg, otelDisto)
configGroup, err := m.handler.ConfigGroupResolver.Resolve(ctx, pd, otelDisto)
if err != nil {
return errors.Join(err, errFailedToGetConfigGroup)
}
Expand All @@ -253,7 +253,7 @@ func (m *manager[ProcessGroup, ConfigGroup]) handleProcessExecEvent(ctx context.
}

// Fetch initial settings for the instrumentation
settings, err := m.handler.SettingsGetter.Settings(ctx, pg, otelDisto)
settings, err := m.handler.SettingsGetter.Settings(ctx, pd, otelDisto)
if err != nil {
// for k8s instrumentation config CR will be queried to get the settings
// we should always have config for this event.
Expand All @@ -269,35 +269,35 @@ func (m *manager[ProcessGroup, ConfigGroup]) handleProcessExecEvent(ctx context.
inst, err := factory.CreateInstrumentation(ctx, e.PID, settings)
if err != nil {
m.logger.Error(err, "failed to initialize instrumentation", "language", otelDisto.Language, "sdk", otelDisto.OtelSdk)
err = m.handler.Reporter.OnInit(ctx, e.PID, err, pg)
err = m.handler.Reporter.OnInit(ctx, e.PID, err, pd)
// TODO: should we return here the initialize error? or the handler error? or both?
return err
}

loadErr := inst.Load(ctx)

reporterErr := m.handler.Reporter.OnLoad(ctx, e.PID, loadErr, pg)
reporterErr := m.handler.Reporter.OnLoad(ctx, e.PID, loadErr, pd)
if reporterErr != nil {
m.logger.Error(reporterErr, "failed to report instrumentation load", "loaded", loadErr == nil, "pid", e.PID, "process group details", pg)
m.logger.Error(reporterErr, "failed to report instrumentation load", "loaded", loadErr == nil, "pid", e.PID, "process group details", pd)
}
if loadErr != nil {
// we need to track the instrumentation even if the load failed.
// consider a reporter which writes a persistent record for a failed/successful load
// we need to notify the reporter once that PID exits to clean up the resources - hence we track it.
// saving the inst as nil marking the instrumentation failed to load, and is not valid to run/configure/close.
m.startTrackInstrumentation(e.PID, nil, pg, configGroup)
m.startTrackInstrumentation(e.PID, nil, pd, configGroup)
m.logger.Error(err, "failed to load instrumentation", "language", otelDisto.Language, "sdk", otelDisto.OtelSdk)
// TODO: should we return here the load error? or the instance write error? or both?
return err
}

m.startTrackInstrumentation(e.PID, inst, pg, configGroup)
m.logger.Info("instrumentation loaded", "pid", e.PID, "process group details", pg)
m.startTrackInstrumentation(e.PID, inst, pd, configGroup)
m.logger.Info("instrumentation loaded", "pid", e.PID, "process group details", pd)

go func() {
err := inst.Run(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
reporterErr := m.handler.Reporter.OnRun(ctx, e.PID, err, pg)
reporterErr := m.handler.Reporter.OnRun(ctx, e.PID, err, pd)
if reporterErr != nil {
m.logger.Error(reporterErr, "failed to report instrumentation run")
}
Expand All @@ -308,23 +308,23 @@ func (m *manager[ProcessGroup, ConfigGroup]) handleProcessExecEvent(ctx context.
return nil
}

func (m *manager[ProcessGroup, ConfigGroup]) startTrackInstrumentation(pid int, inst Instrumentation, processGroup ProcessGroup, configGroup ConfigGroup) {
instDetails := &instrumentationDetails[ProcessGroup, ConfigGroup]{
func (m *manager[ProcessDetails, ConfigGroup]) startTrackInstrumentation(pid int, inst Instrumentation, processDetails ProcessDetails, configGroup ConfigGroup) {
instDetails := &instrumentationDetails[ProcessDetails, ConfigGroup]{
inst: inst,
pg: processGroup,
pd: processDetails,
cg: configGroup,
}
m.detailsByPid[pid] = instDetails

if _, found := m.detailsByWorkload[configGroup]; !found {
// first instrumentation for this workload
m.detailsByWorkload[configGroup] = map[int]*instrumentationDetails[ProcessGroup, ConfigGroup]{pid: instDetails}
m.detailsByWorkload[configGroup] = map[int]*instrumentationDetails[ProcessDetails, ConfigGroup]{pid: instDetails}
} else {
m.detailsByWorkload[configGroup][pid] = instDetails
}
}

func (m *manager[ProcessGroup, ConfigGroup]) stopTrackInstrumentation(pid int) {
func (m *manager[ProcessDetails, ConfigGroup]) stopTrackInstrumentation(pid int) {
details, ok := m.detailsByPid[pid]
if !ok {
return
Expand All @@ -339,7 +339,7 @@ func (m *manager[ProcessGroup, ConfigGroup]) stopTrackInstrumentation(pid int) {
}
}

func (m *manager[ProcessGroup, ConfigGroup]) applyInstrumentationConfigurationForSDK(ctx context.Context, configGroup ConfigGroup, config Config) error {
func (m *manager[ProcessDetails, ConfigGroup]) applyInstrumentationConfigurationForSDK(ctx context.Context, configGroup ConfigGroup, config Config) error {
var err error

configGroupInstrumentations, ok := m.detailsByWorkload[configGroup]
Expand All @@ -351,7 +351,7 @@ func (m *manager[ProcessGroup, ConfigGroup]) applyInstrumentationConfigurationFo
if instDetails.inst == nil {
continue
}
m.logger.Info("applying configuration to instrumentation", "process group details", instDetails.pg, "configGroup", configGroup)
m.logger.Info("applying configuration to instrumentation", "process group details", instDetails.pd, "configGroup", configGroup)
applyErr := instDetails.inst.ApplyConfig(ctx, config)
err = errors.Join(err, applyErr)
}
Expand Down
49 changes: 27 additions & 22 deletions instrumentation/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ type OtelDistribution struct {
OtelSdk common.OtelSdk
}

// ProcessGroup represents a group of processes that are managed together by the hosting platform.
// ProcessDetails is used to convert the common process details reported by the detector to details relevant to hosting platform.
//
// ProcessDetails can contain details that associates a process to a group of processes that are managed together by the hosting platform.
// It may include different information depending on the platform (Kubernetes, VM, etc).
//
// For example consider an app which is launched by a bash script, the script launches a python process.
// The process may create different child processes, and the bash script may launch multiple python processes.
// In this case, the process group may include the bash script, the python process, and the child processes.
type ProcessGroup interface {
//
// Another category of information that may be included relates to language and runtime information which can be used to
// determine the OTel distribution to use.
type ProcessDetails interface {
fmt.Stringer
}

Expand All @@ -34,60 +39,60 @@ type ConfigGroup interface {
comparable
}

// ProcessGroupResolver is used to resolve the process group of a process.
type ProcessGroupResolver[processGroup ProcessGroup] interface {
// ProcessDetailsResolver is used to resolve the process group of a process.
type ProcessDetailsResolver[processDetails ProcessDetails] interface {
// Resolve will classify the process into a process group.
// Those process group details may be used for future calls when reporting the status of the instrumentation.
// or for resolving the configuration group of the process.
Resolve(context.Context, detector.ProcessEvent) (processGroup, error)
Resolve(context.Context, detector.ProcessEvent) (processDetails, error)
}

// ConfigGroupResolver is used to resolve the configuration group of a process.
type ConfigGroupResolver[processGroup ProcessGroup, configGroup ConfigGroup] interface {
type ConfigGroupResolver[processDetails ProcessDetails, configGroup ConfigGroup] interface {
// Resolve will classify the process into a configuration group.
// The Otel Distribution is resolved in the time of calling this function, and may be used
// to determine the configuration group.
Resolve(context.Context, processGroup, OtelDistribution) (configGroup, error)
Resolve(context.Context, processDetails, OtelDistribution) (configGroup, error)
}

// Reporter is used to report the status of the instrumentation.
// It is called at different stages of the instrumentation lifecycle.
type Reporter[processGroup ProcessGroup] interface {
type Reporter[processDetails ProcessDetails] interface {
// OnInit is called when the instrumentation is initialized.
// The error parameter will be nil if the instrumentation was initialized successfully.
OnInit(ctx context.Context, pid int, err error, pg processGroup) error
OnInit(ctx context.Context, pid int, err error, pg processDetails) error

// OnLoad is called after an instrumentation is loaded successfully or failed to load.
// The error parameter will be nil if the instrumentation was loaded successfully.
OnLoad(ctx context.Context, pid int, err error, pg processGroup) error
OnLoad(ctx context.Context, pid int, err error, pg processDetails) error

// OnRun is called after the instrumentation stops running.
// An error may report a fatal error during the instrumentation run, or a closing error
// which happened during the closing of the instrumentation.
OnRun(ctx context.Context, pid int, err error, pg processGroup) error
OnRun(ctx context.Context, pid int, err error, pg processDetails) error

// OnExit is called when the instrumented process exits, and the instrumentation has already been stopped.
// For a reported which persists the instrumentation state, this is the time to clean up the state.
OnExit(ctx context.Context, pid int, pg processGroup) error
OnExit(ctx context.Context, pid int, pg processDetails) error
}

// DistributionMatcher is used to match a process to an Otel Distribution.
type DistributionMatcher[processGroup ProcessGroup] interface {
type DistributionMatcher[processDetails ProcessDetails] interface {
// Distribution will match a process to an Otel Distribution.
Distribution(context.Context, processGroup) (OtelDistribution, error)
Distribution(context.Context, processDetails) (OtelDistribution, error)
}

// SettingsGetter is used to fetch the initial settings of an instrumentation.
type SettingsGetter[processGroup ProcessGroup] interface {
type SettingsGetter[processDetails ProcessDetails] interface {
// GetSettings will fetch the initial settings of an instrumentation.
Settings(context.Context, processGroup, OtelDistribution) (Settings, error)
Settings(context.Context, processDetails, OtelDistribution) (Settings, error)
}

// Handler is used to classify, report and configure instrumentations.
type Handler[processGroup ProcessGroup, configGroup comparable] struct {
ProcessGroupResolver ProcessGroupResolver[processGroup]
ConfigGroupResolver ConfigGroupResolver[processGroup, configGroup]
Reporter Reporter[processGroup]
DistributionMatcher DistributionMatcher[processGroup]
SettingsGetter SettingsGetter[processGroup]
type Handler[processDetails ProcessDetails, configGroup comparable] struct {
ProcessDetailsResolver ProcessDetailsResolver[processDetails]
ConfigGroupResolver ConfigGroupResolver[processDetails, configGroup]
Reporter Reporter[processDetails]
DistributionMatcher DistributionMatcher[processDetails]
SettingsGetter SettingsGetter[processDetails]
}
Loading
Loading