Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle leak of process info in hostfs provider for add_session_metadata #42398

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
cfg "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
)

const (
Expand Down Expand Up @@ -58,7 +59,7 @@ func New(cfg *cfg.C) (beat.Processor, error) {

ctx, cancel := context.WithCancel(context.Background())
reader := procfs.NewProcfsReader(*logger)
db, err := processdb.NewDB(reader, *logger)
db, err := processdb.NewDB(monitoring.Default, reader, *logger, c.DBReaperPeriod, c.ReapProcesses)
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create DB: %w", err)
Expand Down Expand Up @@ -182,7 +183,7 @@ func (p *addSessionMetadata) enrich(ev *beat.Event) (*beat.Event, error) {
fullProcess, err = p.db.GetProcess(pid)
if err != nil {
e := fmt.Errorf("pid %v not found in db: %w", pid, err)
p.logger.Debugw("PID not found in provider", "pid", pid, "error", err)
p.logger.Debugf("PID %d not found in provider: %s", pid, err)
return nil, e
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package sessionmd

import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
)

var (
Expand Down Expand Up @@ -340,7 +342,7 @@ func TestEnrich(t *testing.T) {
for _, tt := range enrichTests {
t.Run(tt.testName, func(t *testing.T) {
reader := procfs.NewMockReader()
db, err := processdb.NewDB(reader, *logger)
db, err := processdb.NewDB(monitoring.NewRegistry(), reader, *logger, time.Second*30, false)
require.Nil(t, err)

for _, ev := range tt.mockProcesses {
Expand Down
13 changes: 9 additions & 4 deletions x-pack/auditbeat/processors/sessionmd/config.go
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@

package sessionmd

import "time"

// Config for add_session_metadata processor.
type config struct {
Backend string `config:"backend"`
PIDField string `config:"pid_field"`
Backend string `config:"backend"`
PIDField string `config:"pid_field"`
DBReaperPeriod time.Duration `config:"db_reaper_period"`
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
ReapProcesses bool `config:"reap_processes"`
}

func defaultConfig() config {
return config{
Backend: "auto",
PIDField: "process.pid",
Backend: "auto",
PIDField: "process.pid",
DBReaperPeriod: time.Second * 30,
}
}
97 changes: 74 additions & 23 deletions x-pack/auditbeat/processors/sessionmd/processdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package processdb

import (
"container/heap"
"encoding/base64"
"errors"
"fmt"
Expand All @@ -26,6 +25,7 @@ import (
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/timeutils"
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
)

type EntryType string
Expand Down Expand Up @@ -63,14 +63,18 @@ const (
)

type Process struct {
Tgid int
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
PIDs types.PIDInfo
Creds types.CredInfo
CTTY tty.TTYDev
Argv []string
Cwd string
Env map[string]string
Filename string
ExitCode int32

// ProcfsLookupFail is true if procfs couldn't find a matching PID for us
ProcfsLookupFail bool
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
InsertTime time.Time
}

var (
Expand Down Expand Up @@ -175,10 +179,24 @@ type DB struct {
entryLeaderRelationships map[uint32]uint32
procfs procfs.Reader
stopChan chan struct{}
removalCandidates rcHeap
removalMap map[uint32]removalCandidate
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

// used for metrics reporting
stats *Stats

// knobs for the reaper thread follows

// how often the reaper runs
reaperPeriod time.Duration
// used for testing
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
skipReaper bool
// the reaper will remove processes events, an well as exit events
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
reapProcesses bool
// the age at which we'll reap a process that has no procfs data
processReapAfter time.Duration
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
}

func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) {
func NewDB(metrics *monitoring.Registry, reader procfs.Reader, logger logp.Logger, reaperPeriod time.Duration, reapProcesses bool) (*DB, error) {
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
once.Do(initialize)
if initError != nil {
return &DB{}, initError
Expand All @@ -190,7 +208,16 @@ func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) {
entryLeaderRelationships: make(map[uint32]uint32),
procfs: reader,
stopChan: make(chan struct{}),
removalCandidates: make(rcHeap, 0),
removalMap: make(map[uint32]removalCandidate),
reaperPeriod: reaperPeriod,
skipReaper: false,
stats: NewStats(metrics),
reapProcesses: reapProcesses,
processReapAfter: time.Minute * 10,
}
logger.Infof("starting sessionDB reaper with interval %s", db.reaperPeriod)
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
if db.reapProcesses {
logger.Info("WARNING: reaping orphaned processes. May result in data loss.")
}
db.startReaper()
return &db, nil
Expand Down Expand Up @@ -260,18 +287,35 @@ func (db *DB) insertProcess(process Process) {
}
}

// InsertExec adds an exec event
func (db *DB) InsertExec(exec types.ProcessExecEvent) {
db.mutex.Lock()
defer db.mutex.Unlock()

proc := Process{
PIDs: pidInfoFromProto(exec.PIDs),
Creds: credInfoFromProto(exec.Creds),
CTTY: ttyDevFromProto(exec.CTTY),
Argv: exec.Argv,
Cwd: exec.CWD,
Env: exec.Env,
Filename: exec.Filename,
PIDs: pidInfoFromProto(exec.PIDs),
Creds: credInfoFromProto(exec.Creds),
CTTY: ttyDevFromProto(exec.CTTY),
Argv: exec.Argv,
Cwd: exec.CWD,
Filename: exec.Filename,
ProcfsLookupFail: exec.ProcfsLookupFail,
InsertTime: time.Now(),
}
if proc.ProcfsLookupFail {
db.stats.procfsLookupFail.Add(1)
}

// check to see if an orphaned exit event maps to this exec event.
// the out-of-order problem where we get the exit before the exec usually happens under load.
// if we don't track orphaned processes like this, we'll never scrub them from the DB.
if evt, ok := db.removalMap[proc.PIDs.Tgid]; ok {
proc.ExitCode = evt.exitCode
db.stats.resolvedOrphanExits.Add(1)
db.logger.Debugf("resolved orphan exit for pid %d", proc.PIDs.Tgid)
//evt.orphanTime // set it to remove on the next reaper pass
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
evt.startTime = proc.PIDs.StartTimeNS
db.removalMap[proc.PIDs.Tgid] = evt
}

db.processes[exec.PIDs.Tgid] = proc
Expand Down Expand Up @@ -387,6 +431,7 @@ func (db *DB) evaluateEntryLeader(p Process) *uint32 {
return nil
}

// InsertSetsid adds a set SID event
func (db *DB) InsertSetsid(setsid types.ProcessSetsidEvent) {
db.mutex.Lock()
defer db.mutex.Unlock()
Expand All @@ -401,23 +446,28 @@ func (db *DB) InsertSetsid(setsid types.ProcessSetsidEvent) {
}
}

// InsertExit adds a process exit event
func (db *DB) InsertExit(exit types.ProcessExitEvent) {
db.mutex.Lock()
defer db.mutex.Unlock()

pid := exit.PIDs.Tgid
newRemoval := removalCandidate{
pid: pid,
exitTime: time.Now(),
exitCode: exit.ExitCode,
}

process, ok := db.processes[pid]
if !ok {
db.logger.Debugf("could not insert exit, pid %v not found in db", pid)
return
newRemoval.orphanTime = time.Now()
db.logger.Debugf("pid %v for exit event not found in db, adding as orphan", pid)
} else {
// If we already have the process, add our exit info
process.ExitCode = exit.ExitCode
db.processes[pid] = process
newRemoval.startTime = process.PIDs.StartTimeNS
}
process.ExitCode = exit.ExitCode
db.processes[pid] = process
heap.Push(&db.removalCandidates, removalCandidate{
pid: pid,
startTime: process.PIDs.StartTimeNS,
exitTime: time.Now(),
})
db.removalMap[pid] = newRemoval
}

func fullProcessFromDBProcess(p Process) types.Process {
Expand Down Expand Up @@ -610,8 +660,10 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {

process, ok := db.processes[pid]
if !ok {
db.stats.failedToFindProcessCount.Add(1)
return types.Process{}, errors.New("process not found")
}
db.stats.servedProcessCount.Add(1)

ret := fullProcessFromDBProcess(process)

Expand Down Expand Up @@ -695,7 +747,6 @@ func (db *DB) ScrapeProcfs() []uint32 {
CTTY: ttyDevFromProto(procInfo.CTTY),
Argv: procInfo.Argv,
Cwd: procInfo.Cwd,
Env: procInfo.Env,
Filename: procInfo.Filename,
}

Expand Down
Loading
Loading