Skip to content

Commit

Permalink
Export node stats
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao committed Jan 16, 2025
1 parent 508612b commit 2ee899e
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 315 deletions.
240 changes: 97 additions & 143 deletions api/v1/runtime/common.pb.go

Large diffs are not rendered by default.

11 changes: 1 addition & 10 deletions api/v1/runtime/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,7 @@ message NetflowDestination {
}

message PSIData {
double avg10 = 1;
double avg60 = 2;
double avg300 = 3;
uint64 total = 4;
uint64 total = 1;
}

message PSIStats {
Expand Down Expand Up @@ -194,15 +191,9 @@ message MemoryData {
}

message MemoryStats {
// memory used for cache
uint64 cache = 1;
// usage of memory
MemoryData usage = 2;
// usage of memory + swap
MemoryData swap_usage = 3;
// usage of swap only
MemoryData swap_only_usage = 4;
// usage of kernel memory
PSIStats psi = 8;
}

Expand Down
4 changes: 2 additions & 2 deletions charts/kvisor/values-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ agent:
pyroscope-addr: http://kvisord-pyroscope:4040
file-hash-enricher-enabled: true
signature-socks5-detection-enabled: true
container-stats-enabled: true
container-stats-scrape-interval: 5s
stats-enabled: true
stats-scrape-interval: 5s
ebpf-events-enabled: true
netflow-enabled: true
netflow-sample-submit-interval-seconds: 1
Expand Down
18 changes: 7 additions & 11 deletions cmd/agent/daemon/state/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func NewController(
enrichmentService: enrichmentService,
kubeClient: kubeClient,
nodeName: os.Getenv("NODE_NAME"),
resourcesStatsScrapePoints: map[uint64]*containerStatsScrapePoint{},
containerStatsScrapePoints: map[uint64]*containerStatsScrapePoint{},
mutedNamespaces: map[string]struct{}{},
dnsCache: dnsCache,
podCache: podCache,
Expand All @@ -157,8 +157,9 @@ type Controller struct {
nodeName string

// Scrape points are used to calculate deltas between scrapes.
resourcesStatsScrapePointsMu sync.RWMutex
resourcesStatsScrapePoints map[uint64]*containerStatsScrapePoint
containerStatsScrapePointsMu sync.RWMutex
containerStatsScrapePoints map[uint64]*containerStatsScrapePoint
nodeScrapePoint *nodeScrapePoint

mutedNamespacesMu sync.RWMutex
mutedNamespaces map[string]struct{}
Expand Down Expand Up @@ -216,20 +217,15 @@ func (c *Controller) onNewContainer(container *containers.Container) {
}

func (c *Controller) onDeleteContainer(container *containers.Container) {
c.resourcesStatsScrapePointsMu.Lock()
delete(c.resourcesStatsScrapePoints, container.CgroupID)
c.resourcesStatsScrapePointsMu.Unlock()
c.containerStatsScrapePointsMu.Lock()
delete(c.containerStatsScrapePoints, container.CgroupID)
c.containerStatsScrapePointsMu.Unlock()

c.dnsCache.Remove(container.CgroupID)

c.log.Debugf("removed cgroup %d", container.CgroupID)
}

type containerStatsScrapePoint struct {
ts time.Time
cpuStat *castaipb.CpuStats
}

func (c *Controller) MuteNamespace(namespace string) error {
c.mutedNamespacesMu.Lock()
c.mutedNamespaces[namespace] = struct{}{}
Expand Down
182 changes: 129 additions & 53 deletions cmd/agent/daemon/state/stats_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,46 @@ import (
"github.com/castai/kvisor/pkg/containers"
)

type containerStatsScrapePoint struct {
cpuStat *castaipb.CpuStats
memStat *castaipb.MemoryStats
ioStat *castaipb.IOStats
}

type nodeScrapePoint struct {
cpuStat *castaipb.CpuStats
memStat *castaipb.MemoryStats
ioStat *castaipb.IOStats
}

func (c *Controller) runStatsPipeline(ctx context.Context) error {
c.log.Info("running stats pipeline")
defer c.log.Info("stats pipeline done")

ticker := time.NewTicker(c.cfg.StatsScrapeInterval)
defer ticker.Stop()

// Initial scrape to populate container metrics for cpu diff.
// Initial scrape to populate initial points for diffs.
c.scrapeNodeStats(nil)
c.scrapeContainersResourceStats(nil)

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
batch := &castaipb.StatsBatch{}
c.scrapeNodeStats(batch)
c.scrapeContainersResourceStats(batch)
if len(batch.Items) > 0 {
for _, exp := range c.exporters.Stats {
exp.Enqueue(batch)
func() {
start := time.Now()
batch := &castaipb.StatsBatch{}
c.scrapeNodeStats(batch)
c.scrapeContainersResourceStats(batch)
if len(batch.Items) > 0 {
for _, exp := range c.exporters.Stats {
exp.Enqueue(batch)
}
}
}
c.log.Debugf("stats exported, duration=%v", time.Since(start))
}()
}
}
}
Expand All @@ -44,7 +61,6 @@ func (c *Controller) scrapeContainersResourceStats(batch *castaipb.StatsBatch) {
}

func (c *Controller) scrapeContainerResourcesStats(cont *containers.Container, batch *castaipb.StatsBatch) {
now := time.Now().UTC()
cgStats, err := c.containersClient.GetCgroupStats(cont)
if err != nil {
if c.log.IsEnabled(slog.LevelDebug) {
Expand All @@ -55,18 +71,21 @@ func (c *Controller) scrapeContainerResourcesStats(cont *containers.Container, b
}

currScrape := &containerStatsScrapePoint{
ts: now,
cpuStat: cgStats.CpuStats,
memStat: cgStats.MemoryStats,
ioStat: cgStats.IOStats,
}

// We need at least 2 scrapes to calculate cpu diff count.
c.resourcesStatsScrapePointsMu.RLock()
prevScrape, found := c.resourcesStatsScrapePoints[cont.CgroupID]
c.resourcesStatsScrapePointsMu.RUnlock()
// We need at least 2 scrapes to calculate diffs.
// Diffs are needed for always increasing counters only because we store them as deltas.
// This includes cpu usage and psi total value.
c.containerStatsScrapePointsMu.RLock()
prevScrape, found := c.containerStatsScrapePoints[cont.CgroupID]
c.containerStatsScrapePointsMu.RUnlock()
if !found {
c.resourcesStatsScrapePointsMu.Lock()
c.resourcesStatsScrapePoints[cont.CgroupID] = currScrape
c.resourcesStatsScrapePointsMu.Unlock()
c.containerStatsScrapePointsMu.Lock()
c.containerStatsScrapePoints[cont.CgroupID] = currScrape
c.containerStatsScrapePointsMu.Unlock()
return
}

Expand All @@ -80,10 +99,10 @@ func (c *Controller) scrapeContainerResourcesStats(cont *containers.Container, b
ContainerName: cont.Name,
PodUid: cont.PodUID,
ContainerId: cont.ID,
CpuStats: getCPUStatsDiff(prevScrape, currScrape),
MemoryStats: cgStats.MemoryStats,
CpuStats: getCPUStatsDiff(prevScrape.cpuStat, currScrape.cpuStat),
MemoryStats: getMemoryStatsDiff(prevScrape.memStat, currScrape.memStat),
PidsStats: cgStats.PidsStats,
IoStats: cgStats.IOStats,
IoStats: getIOStatsDiff(prevScrape.ioStat, currScrape.ioStat),
}
if podInfo, ok := c.getPodInfo(cont.PodUID); ok {
item.NodeName = podInfo.NodeName
Expand All @@ -92,38 +111,69 @@ func (c *Controller) scrapeContainerResourcesStats(cont *containers.Container, b
}
batch.Items = append(batch.Items, &castaipb.StatsItem{Data: &castaipb.StatsItem_Container{Container: item}})

prevScrape.ts = currScrape.ts
prevScrape.cpuStat = currScrape.cpuStat
prevScrape.memStat = currScrape.memStat
prevScrape.ioStat = currScrape.ioStat
}

func (c *Controller) scrapeNodeStats(batch *castaipb.StatsBatch) {
item := &castaipb.NodeStats{}
if err := func() error {
if c.procHandler.PSIEnabled() {
cpuPSI, err := c.procHandler.GetPSIStats("cpu")
if err != nil {
return err
}
item.CpuStats = &castaipb.CpuStats{Psi: cpuPSI}

memStats, err := c.procHandler.GetMeminfoStats()
if err != nil {
return err
}
item.MemoryStats = memStats
memoryPSI, err := c.procHandler.GetPSIStats("memory")
if err != nil {
return err
}
item.MemoryStats.Psi = memoryPSI

ioPSI, err := c.procHandler.GetPSIStats("io")
if err != nil {
return err
}
item.IoStats = &castaipb.IOStats{Psi: ioPSI}
batch.Items = append(batch.Items, &castaipb.StatsItem{Data: &castaipb.StatsItem_Node{Node: item}})
// For now, we only care about PSI related metrics on node.
if !c.procHandler.PSIEnabled() {
return nil
}

cpuPSI, err := c.procHandler.GetPSIStats("cpu")
if err != nil {
return err
}
memStats, err := c.procHandler.GetMeminfoStats()
if err != nil {
return err
}
memoryPSI, err := c.procHandler.GetPSIStats("memory")
if err != nil {
return err
}
ioPSI, err := c.procHandler.GetPSIStats("io")
if err != nil {
return err
}

currScrape := &nodeScrapePoint{
cpuStat: &castaipb.CpuStats{Psi: cpuPSI},
memStat: &castaipb.MemoryStats{
Usage: memStats.Usage,
SwapOnlyUsage: memStats.SwapOnlyUsage,
Psi: memoryPSI,
},
ioStat: &castaipb.IOStats{Psi: ioPSI},
}

// We need at least 2 scrapes to calculate diffs.
// Diffs are needed for always increasing counters only because we store them as deltas.
// This includes cpu usage and psi total value.
if c.nodeScrapePoint == nil {
c.nodeScrapePoint = currScrape
return nil
}
if batch == nil {
return nil
}

batch.Items = append(batch.Items, &castaipb.StatsItem{Data: &castaipb.StatsItem_Node{
Node: &castaipb.NodeStats{
NodeName: c.nodeName,
CpuStats: getCPUStatsDiff(c.nodeScrapePoint.cpuStat, currScrape.cpuStat),
MemoryStats: getMemoryStatsDiff(c.nodeScrapePoint.memStat, currScrape.memStat),
IoStats: getIOStatsDiff(c.nodeScrapePoint.ioStat, currScrape.ioStat),
},
}})

c.nodeScrapePoint.cpuStat = currScrape.cpuStat
c.nodeScrapePoint.memStat = currScrape.memStat
c.nodeScrapePoint.ioStat = currScrape.ioStat

return nil
}(); err != nil {
if c.log.IsEnabled(slog.LevelDebug) {
Expand All @@ -134,13 +184,39 @@ func (c *Controller) scrapeNodeStats(batch *castaipb.StatsBatch) {
}
}

func getCPUStatsDiff(prev, curr *containerStatsScrapePoint) *castaipb.CpuStats {
func getCPUStatsDiff(prev, curr *castaipb.CpuStats) *castaipb.CpuStats {
return &castaipb.CpuStats{
TotalUsage: curr.cpuStat.TotalUsage - prev.cpuStat.TotalUsage,
UsageInKernelmode: curr.cpuStat.UsageInKernelmode - prev.cpuStat.UsageInKernelmode,
UsageInUsermode: curr.cpuStat.UsageInUsermode - prev.cpuStat.UsageInUsermode,
ThrottledPeriods: curr.cpuStat.ThrottledPeriods,
ThrottledTime: curr.cpuStat.ThrottledTime,
Psi: curr.cpuStat.Psi,
TotalUsage: curr.TotalUsage - prev.TotalUsage,
UsageInKernelmode: curr.UsageInKernelmode - prev.UsageInKernelmode,
UsageInUsermode: curr.UsageInUsermode - prev.UsageInUsermode,
ThrottledPeriods: curr.ThrottledPeriods,
ThrottledTime: curr.ThrottledTime,
Psi: getPSIStatsDiff(prev.Psi, curr.Psi),
}
}

func getMemoryStatsDiff(prev, curr *castaipb.MemoryStats) *castaipb.MemoryStats {
return &castaipb.MemoryStats{
Cache: curr.Cache,
Usage: curr.Usage,
SwapOnlyUsage: curr.SwapOnlyUsage,
Psi: getPSIStatsDiff(prev.Psi, curr.Psi),
}
}

func getIOStatsDiff(prev, curr *castaipb.IOStats) *castaipb.IOStats {
return &castaipb.IOStats{
Psi: getPSIStatsDiff(prev.Psi, curr.Psi),
}
}

func getPSIStatsDiff(prev, curr *castaipb.PSIStats) *castaipb.PSIStats {
return &castaipb.PSIStats{
Some: &castaipb.PSIData{
Total: curr.Some.Total - prev.Some.Total,
},
Full: &castaipb.PSIData{
Total: curr.Full.Total - prev.Full.Total,
},
}
}
2 changes: 1 addition & 1 deletion cmd/controller/state/castai_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (t testGrpcClient) LogsWriteStream(ctx context.Context, opts ...grpc.CallOp
return nil, nil
}

func (t testGrpcClient) ContainerStatsWriteStream(ctx context.Context, opts ...grpc.CallOption) (castaipb.RuntimeSecurityAgentAPI_ContainerStatsWriteStreamClient, error) {
func (t testGrpcClient) StatsWriteStream(ctx context.Context, opts ...grpc.CallOption) (castaipb.RuntimeSecurityAgentAPI_StatsWriteStreamClient, error) {
return nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/mock-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (m *MockServer) StatsWriteStream(server castaipb.RuntimeSecurityAgentAPI_St
}
node := v.GetNode()
if node != nil {
m.log.Debugf("node_stats, node=%s,cpu=%v, mem=%v", node.NodeName, cont.CpuStats, cont.MemoryStats)
m.log.Debugf("node_stats, node=%s,cpu=%v, mem=%v", node.NodeName, node.CpuStats, node.MemoryStats)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/castai/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (*testServer) KubernetesDeltaIngest(castaipb.RuntimeSecurityAgentAPI_Kubern
panic("unimplemented")
}

func (t *testServer) ContainerStatsWriteStream(server castaipb.RuntimeSecurityAgentAPI_ContainerStatsWriteStreamServer) error {
func (t *testServer) StatsWriteStream(server castaipb.RuntimeSecurityAgentAPI_StatsWriteStreamServer) error {
//TODO implement me
panic("implement me")
}
Expand Down
Loading

0 comments on commit 2ee899e

Please sign in to comment.