Skip to content

Commit

Permalink
Use socket cookie for cgroup skb task context (#338)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Sep 2, 2024
1 parent bb09f43 commit c76ae7e
Show file tree
Hide file tree
Showing 18 changed files with 774 additions and 1,222 deletions.
2 changes: 1 addition & 1 deletion cmd/agent/daemon/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (a *App) Run(ctx context.Context) error {
exporters = state.NewExporters(log)
}

kubeAPIServiceConn, err := grpc.Dial(
kubeAPIServiceConn, err := grpc.NewClient(
cfg.KubeAPIServiceAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
Expand Down
91 changes: 62 additions & 29 deletions e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,32 @@ func run(ctx context.Context) error {
srv.eventsAsserted = true
srv.events = nil

fmt.Println("🙏waiting for process tree events")
if err := srv.assertProcessTreeEvents(ctx); err != nil {
return fmt.Errorf("assert process tree events: %w", err)
}
srv.processTreeEventsAsserted = true
srv.processTreeEvents = nil

fmt.Println("🙏waiting for netflows")
if err := srv.assertNetflows(ctx); err != nil {
return fmt.Errorf("assert netflows: %w", err)
}
srv.netflowsAsserted = true
srv.netflows = nil

fmt.Println("🙏waiting for container stats")
if err := srv.assertContainerStats(ctx); err != nil {
return fmt.Errorf("assert container stats: %w", err)
}
srv.containerStatsAsserted = true
srv.containerStats = nil

fmt.Println("🙏waiting for kubernetes deltas")
if err := srv.assertKubernetesDeltas(ctx); err != nil {
return fmt.Errorf("assert k8s deltas: %w", err)
}
srv.deltaUpdates = nil

fmt.Println("🙏waiting for kube bench")
if err := srv.assertKubeBenchReport(ctx); err != nil {
Expand All @@ -123,21 +135,19 @@ func run(ctx context.Context) error {
if err := srv.assertKubeLinter(ctx); err != nil {
return fmt.Errorf("assert kube linter: %w", err)
}
srv.kubeLinterReports = nil

fmt.Println("🙏waiting for image metadata")
if err := srv.assertImageMetadata(ctx); err != nil {
return fmt.Errorf("assert image metadata: %w", err)
}
srv.imageMetadatas = nil

fmt.Println("🙏waiting for flogs")
if err := srv.assertLogs(ctx); err != nil {
return fmt.Errorf("assert logs: %w", err)
}

fmt.Println("🙏waiting for process tree events")
if err := srv.assertProcessTreeEvents(ctx); err != nil {
return fmt.Errorf("assert process tree events: %w", err)
}
srv.logs = nil

fmt.Println("👌e2e finished")

Expand Down Expand Up @@ -201,19 +211,23 @@ var _ castaipb.RuntimeSecurityAgentAPIServer = (*testCASTAIServer)(nil)
type testCASTAIServer struct {
clientset *kubernetes.Clientset

mu sync.Mutex
containerStats []*castaipb.ContainerStatsBatch
events []*castaipb.Event
eventsAsserted bool
logs []*castaipb.LogEvent
deltaUpdates []*castaipb.KubernetesDeltaItem
imageMetadatas []*castaipb.ImageMetadata
kubeBenchReports []*castaipb.KubeBenchReport
kubeLinterReports []*castaipb.KubeLinterReport
processTreeEvents []*castaipb.ProcessTreeEvent
controllerConfig []byte
agentConfig []byte
netflows []*castaipb.Netflow
mu sync.Mutex
containerStats []*castaipb.ContainerStatsBatch
containerStatsAsserted bool
events []*castaipb.Event
eventsAsserted bool
logs []*castaipb.LogEvent
deltaUpdates []*castaipb.KubernetesDeltaItem
imageMetadatas []*castaipb.ImageMetadata
kubeBenchReports []*castaipb.KubeBenchReport
kubeLinterReports []*castaipb.KubeLinterReport
processTreeEvents []*castaipb.ProcessTreeEvent
processTreeEventsAsserted bool
controllerConfig []byte
agentConfig []byte
netflows []*castaipb.Netflow
netflowsAsserted bool
outputReceivedData bool
}

func (t *testCASTAIServer) ProcessEventsWriteStream(server castaipb.RuntimeSecurityAgentAPI_ProcessEventsWriteStreamServer) error {
Expand Down Expand Up @@ -241,10 +255,13 @@ func (t *testCASTAIServer) ProcessEventsWriteStream(server castaipb.RuntimeSecur
if err != nil {
return err
}
if t.processTreeEventsAsserted {
continue
}
data, err := protojson.Marshal(event)
if err != nil {
fmt.Println("received process tree event (cannot marshall to json):", event)
} else {
} else if t.outputReceivedData {
fmt.Println("received process tree event:", string(data))
}
t.mu.Lock()
Expand All @@ -259,7 +276,12 @@ func (t *testCASTAIServer) NetflowWriteStream(server castaipb.RuntimeSecurityAge
if err != nil {
return err
}
fmt.Printf("received netflow: %+v\n", msg)
if t.netflowsAsserted {
continue
}
if t.outputReceivedData {
fmt.Printf("received netflow: %+v\n", msg)
}
t.mu.Lock()
t.netflows = append(t.netflows, msg)
t.mu.Unlock()
Expand Down Expand Up @@ -299,7 +321,12 @@ func (t *testCASTAIServer) ContainerStatsWriteStream(server castaipb.RuntimeSecu
if err != nil {
return err
}
fmt.Println("received container stats:", len(msg.Items))
if t.containerStatsAsserted {
continue
}
if t.outputReceivedData {
fmt.Println("received container stats:", len(msg.Items))
}
t.mu.Lock()
t.containerStats = append(t.containerStats, msg)
t.mu.Unlock()
Expand Down Expand Up @@ -335,8 +362,9 @@ func (t *testCASTAIServer) GetConfiguration(ctx context.Context, req *castaipb.G
if v := req.GetAgent(); v != nil {
t.agentConfig = v
}

fmt.Printf("received configs:\ncontroller=%v\n agent=%v\n", t.controllerConfig, t.agentConfig)
if t.outputReceivedData {
fmt.Printf("received configs:\ncontroller=%v\n agent=%v\n", t.controllerConfig, t.agentConfig)
}

return &castaipb.GetConfigurationResponse{}, nil
}
Expand Down Expand Up @@ -366,12 +394,15 @@ func (t *testCASTAIServer) EventsWriteStream(server castaipb.RuntimeSecurityAgen
if err != nil {
return err
}
fmt.Println("received event:", event)
if !t.eventsAsserted {
t.mu.Lock()
t.events = append(t.events, event)
t.mu.Unlock()
if t.outputReceivedData {
fmt.Printf("received event: %+v\n", event)
}
if t.eventsAsserted {
continue
}
t.mu.Lock()
t.events = append(t.events, event)
t.mu.Unlock()
}
}

Expand All @@ -381,7 +412,9 @@ func (t *testCASTAIServer) LogsWriteStream(server castaipb.RuntimeSecurityAgentA
if err != nil {
return err
}
fmt.Println("received log:", event)
if t.outputReceivedData {
fmt.Println("received log:", event)
}
t.mu.Lock()
t.logs = append(t.logs, event)
t.mu.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions e2e/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ while true; do
done

if [[ $job_result -eq 1 ]]; then
echo "Job failed!"
echo "😞 Job failed! Try to run locally and good luck 🤞: KIND_CONTEXT=tilt IMAGE_TAG=local ./e2e/run.sh"
exit 1
fi
echo "Job succeeded!"
echo "👌 Job succeeded!"
Loading

0 comments on commit c76ae7e

Please sign in to comment.