Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle leak of process info in hostfs provider for add_session_metadata #42398

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
cfg "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
)

const (
Expand Down Expand Up @@ -58,7 +59,7 @@ func New(cfg *cfg.C) (beat.Processor, error) {

ctx, cancel := context.WithCancel(context.Background())
reader := procfs.NewProcfsReader(*logger)
db, err := processdb.NewDB(reader, *logger)
db, err := processdb.NewDB(monitoring.Default, reader, *logger, c.DBReaperPeriod, c.ReapProcesses)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create DB: %w", err)
Expand Down Expand Up @@ -182,7 +183,7 @@ func (p *addSessionMetadata) enrich(ev *beat.Event) (*beat.Event, error) {
fullProcess, err = p.db.GetProcess(pid)
if err != nil {
e := fmt.Errorf("pid %v not found in db: %w", pid, err)
p.logger.Debugw("PID not found in provider", "pid", pid, "error", err)
p.logger.Debugf("PID %d not found in provider: %s", pid, err)
return nil, e
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package sessionmd

import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
)

var (
Expand Down Expand Up @@ -340,7 +342,7 @@ func TestEnrich(t *testing.T) {
for _, tt := range enrichTests {
t.Run(tt.testName, func(t *testing.T) {
reader := procfs.NewMockReader()
db, err := processdb.NewDB(reader, *logger)
db, err := processdb.NewDB(monitoring.NewRegistry(), reader, *logger, time.Second*30, false)
require.Nil(t, err)

for _, ev := range tt.mockProcesses {
Expand Down
13 changes: 9 additions & 4 deletions x-pack/auditbeat/processors/sessionmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@

package sessionmd

import "time"

// Config for add_session_metadata processor.
type config struct {
Backend string `config:"backend"`
PIDField string `config:"pid_field"`
Backend string `config:"backend"`
PIDField string `config:"pid_field"`
DBReaperPeriod time.Duration `config:"db_reaper_period"`
ReapProcesses bool `config:"reap_processes"`
}

func defaultConfig() config {
return config{
Backend: "auto",
PIDField: "process.pid",
Backend: "auto",
PIDField: "process.pid",
DBReaperPeriod: time.Second * 30,
}
}
97 changes: 74 additions & 23 deletions x-pack/auditbeat/processors/sessionmd/processdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package processdb

import (
"container/heap"
"encoding/base64"
"errors"
"fmt"
Expand All @@ -26,6 +25,7 @@ import (
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/timeutils"
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
)

type EntryType string
Expand Down Expand Up @@ -63,14 +63,18 @@ const (
)

type Process struct {
Tgid int
PIDs types.PIDInfo
Creds types.CredInfo
CTTY tty.TTYDev
Argv []string
Cwd string
Env map[string]string
Filename string
ExitCode int32

// ProcfsLookupFail is true if procfs couldn't find a matching PID for us
ProcfsLookupFail bool
InsertTime time.Time
}

var (
Expand Down Expand Up @@ -175,10 +179,24 @@ type DB struct {
entryLeaderRelationships map[uint32]uint32
procfs procfs.Reader
stopChan chan struct{}
removalCandidates rcHeap
removalMap map[uint32]removalCandidate

// used for metrics reporting
stats *Stats

// knobs for the reaper thread follows

// how often the reaper runs
reaperPeriod time.Duration
// used for testing
skipReaper bool
// the reaper will remove processes events, an well as exit events
reapProcesses bool
// the age at which we'll reap a process that has no procfs data
processReapAfter time.Duration
}

func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) {
func NewDB(metrics *monitoring.Registry, reader procfs.Reader, logger logp.Logger, reaperPeriod time.Duration, reapProcesses bool) (*DB, error) {
once.Do(initialize)
if initError != nil {
return &DB{}, initError
Expand All @@ -190,7 +208,16 @@ func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) {
entryLeaderRelationships: make(map[uint32]uint32),
procfs: reader,
stopChan: make(chan struct{}),
removalCandidates: make(rcHeap, 0),
removalMap: make(map[uint32]removalCandidate),
reaperPeriod: reaperPeriod,
skipReaper: false,
stats: NewStats(metrics),
reapProcesses: reapProcesses,
processReapAfter: time.Minute * 10,
}
logger.Infof("starting sessionDB reaper with interval %s", db.reaperPeriod)
if db.reapProcesses {
logger.Info("WARNING: reaping orphaned processes. May result in data loss.")
}
db.startReaper()
return &db, nil
Expand Down Expand Up @@ -260,18 +287,35 @@ func (db *DB) insertProcess(process Process) {
}
}

// InsertExec adds an exec event
func (db *DB) InsertExec(exec types.ProcessExecEvent) {
db.mutex.Lock()
defer db.mutex.Unlock()

proc := Process{
PIDs: pidInfoFromProto(exec.PIDs),
Creds: credInfoFromProto(exec.Creds),
CTTY: ttyDevFromProto(exec.CTTY),
Argv: exec.Argv,
Cwd: exec.CWD,
Env: exec.Env,
Filename: exec.Filename,
PIDs: pidInfoFromProto(exec.PIDs),
Creds: credInfoFromProto(exec.Creds),
CTTY: ttyDevFromProto(exec.CTTY),
Argv: exec.Argv,
Cwd: exec.CWD,
Filename: exec.Filename,
ProcfsLookupFail: exec.ProcfsLookupFail,
InsertTime: time.Now(),
}
if proc.ProcfsLookupFail {
db.stats.procfsLookupFail.Add(1)
}

// check to see if an orphaned exit event maps to this exec event.
// the out-of-order problem where we get the exit before the exec usually happens under load.
// if we don't track orphaned processes like this, we'll never scrub them from the DB.
if evt, ok := db.removalMap[proc.PIDs.Tgid]; ok {
proc.ExitCode = evt.exitCode
db.stats.resolvedOrphanExits.Add(1)
db.logger.Debugf("resolved orphan exit for pid %d", proc.PIDs.Tgid)
//evt.orphanTime // set it to remove on the next reaper pass
evt.startTime = proc.PIDs.StartTimeNS
db.removalMap[proc.PIDs.Tgid] = evt
}

db.processes[exec.PIDs.Tgid] = proc
Expand Down Expand Up @@ -387,6 +431,7 @@ func (db *DB) evaluateEntryLeader(p Process) *uint32 {
return nil
}

// InsertSetsid adds a set SID event
func (db *DB) InsertSetsid(setsid types.ProcessSetsidEvent) {
db.mutex.Lock()
defer db.mutex.Unlock()
Expand All @@ -401,23 +446,28 @@ func (db *DB) InsertSetsid(setsid types.ProcessSetsidEvent) {
}
}

// InsertExit adds a process exit event
func (db *DB) InsertExit(exit types.ProcessExitEvent) {
db.mutex.Lock()
defer db.mutex.Unlock()

pid := exit.PIDs.Tgid
newRemoval := removalCandidate{
pid: pid,
exitTime: time.Now(),
exitCode: exit.ExitCode,
}

process, ok := db.processes[pid]
if !ok {
db.logger.Debugf("could not insert exit, pid %v not found in db", pid)
return
newRemoval.orphanTime = time.Now()
db.logger.Debugf("pid %v for exit event not found in db, adding as orphan", pid)
} else {
// If we already have the process, add our exit info
process.ExitCode = exit.ExitCode
db.processes[pid] = process
newRemoval.startTime = process.PIDs.StartTimeNS
}
process.ExitCode = exit.ExitCode
db.processes[pid] = process
heap.Push(&db.removalCandidates, removalCandidate{
pid: pid,
startTime: process.PIDs.StartTimeNS,
exitTime: time.Now(),
})
db.removalMap[pid] = newRemoval
}

func fullProcessFromDBProcess(p Process) types.Process {
Expand Down Expand Up @@ -610,8 +660,10 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {

process, ok := db.processes[pid]
if !ok {
db.stats.failedToFindProcessCount.Add(1)
return types.Process{}, errors.New("process not found")
}
db.stats.servedProcessCount.Add(1)

ret := fullProcessFromDBProcess(process)

Expand Down Expand Up @@ -695,7 +747,6 @@ func (db *DB) ScrapeProcfs() []uint32 {
CTTY: ttyDevFromProto(procInfo.CTTY),
Argv: procInfo.Argv,
Cwd: procInfo.Cwd,
Env: procInfo.Env,
Filename: procInfo.Filename,
}

Expand Down
Loading
Loading