Skip to content

Commit

Permalink
Index workload per cgroup ID
Browse files Browse the repository at this point in the history
  • Loading branch information
lebauce committed Dec 4, 2024
1 parent 90969c4 commit 1075b97
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 22 deletions.
9 changes: 5 additions & 4 deletions pkg/security/resolvers/cgroup/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,19 @@ type CacheEntry struct {
}

// NewCacheEntry returns a new instance of a CacheEntry
func NewCacheEntry(containerID containerutils.ContainerID, cgroupFlags uint64, pids ...uint32) (*CacheEntry, error) {
func NewCacheEntry(containerID containerutils.ContainerID, cgroupContext *model.CGroupContext, pids ...uint32) (*CacheEntry, error) {
newCGroup := CacheEntry{
Deleted: atomic.NewBool(false),
CGroupContext: model.CGroupContext{
CGroupFlags: containerutils.CGroupFlags(cgroupFlags),
},
ContainerContext: model.ContainerContext{
ContainerID: containerID,
},
PIDs: make(map[uint32]bool, 10),
}

if cgroupContext != nil {
newCGroup.CGroupContext = *cgroupContext
}

for _, pid := range pids {
newCGroup.PIDs[pid] = true
}
Expand Down
47 changes: 30 additions & 17 deletions pkg/security/resolvers/cgroup/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ const (
type ResolverInterface interface {
Start(context.Context)
AddPID(*model.ProcessCacheEntry)
GetWorkload(string) (*cgroupModel.CacheEntry, bool)
GetWorkload(containerutils.ContainerID) (*cgroupModel.CacheEntry, bool)
DelPID(uint32)
DelPIDWithID(string, uint32)
DelPIDWithID(containerutils.ContainerID, uint32)
Len() int
RegisterListener(Event, utils.Listener[*cgroupModel.CacheEntry]) error
}
Expand All @@ -48,15 +48,15 @@ type ResolverInterface interface {
type Resolver struct {
*utils.Notifier[Event, *cgroupModel.CacheEntry]
sync.RWMutex
workloads *simplelru.LRU[containerutils.ContainerID, *cgroupModel.CacheEntry]
workloads *simplelru.LRU[containerutils.CGroupID, *cgroupModel.CacheEntry]
}

// NewResolver returns a new cgroups monitor
func NewResolver() (*Resolver, error) {
cr := &Resolver{
Notifier: utils.NewNotifier[Event, *cgroupModel.CacheEntry](),
}
workloads, err := simplelru.NewLRU(1024, func(_ containerutils.ContainerID, value *cgroupModel.CacheEntry) {
workloads, err := simplelru.NewLRU(1024, func(_ containerutils.CGroupID, value *cgroupModel.CacheEntry) {
value.CallReleaseCallback()
value.Deleted.Store(true)

Expand All @@ -78,45 +78,56 @@ func (cr *Resolver) AddPID(process *model.ProcessCacheEntry) {
cr.Lock()
defer cr.Unlock()

entry, exists := cr.workloads.Get(process.ContainerID)
entry, exists := cr.workloads.Get(process.CGroup.CGroupID)
if exists {
entry.AddPID(process.Pid)
return
}

var err error
// create new entry now
newCGroup, err := cgroupModel.NewCacheEntry(process.ContainerID, uint64(process.CGroup.CGroupFlags), process.Pid)
newCGroup, err := cgroupModel.NewCacheEntry(process.ContainerID, &process.CGroup, process.Pid)
if err != nil {
seclog.Errorf("couldn't create new cgroup_resolver cache entry: %v", err)
return
}
newCGroup.CreatedAt = uint64(process.ProcessContext.ExecTime.UnixNano())

// add the new CGroup to the cache
cr.workloads.Add(process.ContainerID, newCGroup)
cr.workloads.Add(process.CGroup.CGroupID, newCGroup)

cr.NotifyListeners(CGroupCreated, newCGroup)
}

// Get returns the workload referenced by the provided ID
func (cr *Resolver) Get(id containerutils.CGroupID) (*cgroupModel.CacheEntry, bool) {
cr.RLock()
defer cr.RUnlock()

return cr.workloads.Get(id)
}

// GetWorkload returns the workload referenced by the provided ID
func (cr *Resolver) GetWorkload(id containerutils.ContainerID) (*cgroupModel.CacheEntry, bool) {
cr.RLock()
defer cr.RUnlock()

return cr.workloads.Get(id)
for _, workload := range cr.workloads.Values() {
if workload.ContainerID == id {
return workload, true
}
}

return nil, false
}

// DelPID removes a PID from the cgroup resolver
func (cr *Resolver) DelPID(pid uint32) {
cr.Lock()
defer cr.Unlock()

for _, id := range cr.workloads.Keys() {
entry, exists := cr.workloads.Get(id)
if exists {
cr.deleteWorkloadPID(pid, entry)
}
for _, workload := range cr.workloads.Values() {
cr.deleteWorkloadPID(pid, workload)
}
}

Expand All @@ -125,9 +136,11 @@ func (cr *Resolver) DelPIDWithID(id containerutils.ContainerID, pid uint32) {
cr.Lock()
defer cr.Unlock()

entry, exists := cr.workloads.Get(id)
if exists {
cr.deleteWorkloadPID(pid, entry)
for _, workload := range cr.workloads.Values() {
if workload.ContainerID == id {
cr.deleteWorkloadPID(pid, workload)
return
}
}
}

Expand All @@ -140,7 +153,7 @@ func (cr *Resolver) deleteWorkloadPID(pid uint32, workload *cgroupModel.CacheEnt

// check if the workload should be deleted
if len(workload.PIDs) <= 0 {
cr.workloads.Remove(workload.ContainerID)
cr.workloads.Remove(workload.CGroupID)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/security/resolvers/process/resolver_ebpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func (p *EBPFResolver) insertEntry(entry, prev *model.ProcessCacheEntry, source
prev.Release()
}

if p.cgroupResolver != nil && entry.ContainerID != "" {
if p.cgroupResolver != nil && entry.CGroup.CGroupID != "" {
// add the new PID in the right cgroup_resolver bucket
p.cgroupResolver.AddPID(entry)
}
Expand Down

0 comments on commit 1075b97

Please sign in to comment.