Skip to content

Commit

Permalink
Use ebpf ringbuf (#420)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Dec 5, 2024
1 parent 8addfcc commit 7b61e8b
Show file tree
Hide file tree
Showing 27 changed files with 1,013 additions and 931 deletions.
18 changes: 9 additions & 9 deletions cmd/agent/daemon/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ type Config struct {
State state.Config `json:"state"`
ContainerStatsEnabled bool `json:"containerStatsEnabled"`
EBPFEventsEnabled bool `json:"EBPFEventsEnabled"`
EBPFEventsPerCPUBuffer int `validate:"required" json:"EBPFEventsPerCPUBuffer"`
EBPFEventsOutputChanSize int `validate:"required" json:"EBPFEventsOutputChanSize"`
EBPFEventsStdioExporterEnabled bool `json:"EBPFEventsStdioExporterEnabled"`
EBPFMetrics EBPFMetricsConfig `json:"EBPFMetrics"`
EBPFEventsPolicyConfig ebpftracer.EventsPolicyConfig `json:"EBPFEventsPolicyConfig"`
EBPFSignalEventsRingBufferSize uint32 `json:"EBPFSignalEventsRingBufferSize"`
EBPFEventsRingBufferSize uint32 `json:"EBPFEventsRingBufferSize"`
EBPFSkbEventsRingBufferSize uint32 `json:"EBPFSkbEventsRingBufferSize"`
MutedNamespaces []string `json:"mutedNamespaces"`
SignatureEngineConfig signature.SignatureEngineConfig `json:"signatureEngineConfig"`
Castai castai.Config `json:"castai"`
Expand All @@ -88,7 +90,6 @@ type EnricherConfig struct {
type NetflowConfig struct {
Enabled bool `json:"enabled"`
SampleSubmitIntervalSeconds uint64 `json:"sampleSubmitIntervalSeconds"`
OutputChanSize int `json:"outputChanSize"`
Grouping ebpftracer.NetflowGrouping `json:"grouping"`
}

Expand Down Expand Up @@ -285,7 +286,9 @@ func (a *App) Run(ctx context.Context) error {

tracer := ebpftracer.New(log, ebpftracer.Config{
BTFPath: a.cfg.BTFPath,
EventsPerCPUBuffer: a.cfg.EBPFEventsPerCPUBuffer,
SignalEventsRingBufferSize: a.cfg.EBPFSignalEventsRingBufferSize,
EventsRingBufferSize: a.cfg.EBPFEventsRingBufferSize,
SkbEventsRingBufferSize: a.cfg.EBPFSkbEventsRingBufferSize,
EventsOutputChanSize: a.cfg.EBPFEventsOutputChanSize,
DefaultCgroupsVersion: cgroupClient.DefaultCgroupVersion().String(),
ContainerClient: containersClient,
Expand All @@ -294,7 +297,6 @@ func (a *App) Run(ctx context.Context) error {
SignatureEngine: signatureEngine,
MountNamespacePIDStore: mountNamespacePIDStore,
HomePIDNS: pidNSID,
NetflowOutputChanSize: a.cfg.Netflow.OutputChanSize,
NetflowSampleSubmitIntervalSeconds: a.cfg.Netflow.SampleSubmitIntervalSeconds,
NetflowGrouping: a.cfg.Netflow.Grouping,
TrackSyscallStats: cfg.ContainerStatsEnabled,
Expand All @@ -303,6 +305,7 @@ func (a *App) Run(ctx context.Context) error {
ProgramMetricsEnabled: cfg.EBPFMetrics.ProgramMetricsEnabled,
TracerMetricsEnabled: cfg.EBPFMetrics.TracerMetricsEnabled,
},
PodName: podName,
})
if err := tracer.Load(); err != nil {
return fmt.Errorf("loading tracer: %w", err)
Expand Down Expand Up @@ -417,11 +420,8 @@ func buildEBPFPolicy(log *logging.Logger, cfg *Config, exporters *state.Exporter
switch enabledEvent {
case events.SockSetState:
policy.Events = append(policy.Events, &ebpftracer.EventPolicy{
ID: events.SockSetState,
FilterGenerator: ebpftracer.RateLimitPrivateIP(ebpftracer.RateLimitPolicy{
Rate: 100,
Burst: 1,
}),
ID: events.SockSetState,
FilterGenerator: ebpftracer.SkipPrivateIP(), // TODO: Move private ip skip to kernel side.
})
case events.NetPacketDNSBase:
policy.Events = append(policy.Events, dnsEventPolicy)
Expand Down
19 changes: 11 additions & 8 deletions cmd/agent/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,20 @@ func NewRunCommand(version string) *cobra.Command {
events.NetPacketDNSBase,
events.MagicWrite,
events.ProcessOomKilled,
events.StdioViaSocket,
// events.StdioViaSocket, // TODO(anjmao): Tracing this event via syscall hooks is very expensive. Rework the whole syscall tracing.
events.TtyWrite,
events.NetPacketSSHBase,
},
}
ebpfEventsStdioExporterEnabled = command.Flags().Bool("ebpf-events-stdio-exporter-enabled", false, "Export ebpf event to stdio")
ebpfEventsPerCPUBuffer = command.Flags().Int("ebpf-events-per-cpu-buffer", os.Getpagesize()*64, "Ebpf per cpu buffer size")
ebpfEventsOutputChanSize = command.Flags().Int("ebpf-events-output-queue-size", 4096, "Ebpf user spaces output channel size")
ebpfTracerMetricsEnabled = command.Flags().Bool("ebpf-tracer-metrics-enabled", false, "Enables the export of tracer related metrics from eBPF")
ebpfTracerMetricsEnabled = command.Flags().Bool("ebpf-tracer-metrics-enabled", true, "Enables the export of tracer related metrics from eBPF")
ebpfProgramMetricsEnabled = command.Flags().Bool("ebpf-program-metrics-enabled", false, "Enables the export of metrics about eBPF programs")

EBPFSignalEventsRingBufferSize = command.Flags().Uint32("ebpf-signal-events-ring-buffer-size", 1<<20, "Ebpf ring buffer size in bytes for priority events. Should be power of 2")
EBPFEventsRingBufferSize = command.Flags().Uint32("ebpf-events-ring-buffer-size", 1<<20, "Ebpf ring buffer size in bytes for events. Should be power of 2")
EBPFSkbEventsRingBufferSize = command.Flags().Uint32("ebpf-skb-events-ring-buffer-size", 1<<20, "Ebpf ring buffer size in bytes for skb network events. Should be power of 2")

mutedNamespaces = command.Flags().StringSlice("ignored-namespaces", []string{"kube-system", "calico", "calico-system"},
"List of namespaces to ignore tracing events for. To ignore multiple namespaces, separate by comma or pass flag multiple times."+
" For example: --ignored-namespaces=kube-system,calico-system")
Expand All @@ -89,7 +92,6 @@ func NewRunCommand(version string) *cobra.Command {

netflowEnabled = command.Flags().Bool("netflow-enabled", false, "Enables netflow tracking")
netflowSampleSubmitIntervalSeconds = command.Flags().Uint64("netflow-sample-submit-interval-seconds", 15, "Netflow sample submit interval")
netflowOutputChanSize = command.Flags().Int("netflow-output-queue-size", 4096, "Netflow output queue size")
netflowExportInterval = command.Flags().Duration("netflow-export-interval", 15*time.Second, "Netflow export interval")
netflowGrouping = ebpftracer.NetflowGroupingDropSrcPort

Expand Down Expand Up @@ -152,14 +154,16 @@ func NewRunCommand(version string) *cobra.Command {
},
EBPFEventsEnabled: *ebpfEventsEnabled,
EBPFEventsStdioExporterEnabled: *ebpfEventsStdioExporterEnabled,
EBPFEventsPerCPUBuffer: *ebpfEventsPerCPUBuffer,
EBPFEventsOutputChanSize: *ebpfEventsOutputChanSize,
EBPFMetrics: app.EBPFMetricsConfig{
TracerMetricsEnabled: *ebpfTracerMetricsEnabled,
ProgramMetricsEnabled: *ebpfProgramMetricsEnabled,
},
EBPFEventsPolicyConfig: ebpfEventsPolicy,
MutedNamespaces: *mutedNamespaces,
EBPFEventsPolicyConfig: ebpfEventsPolicy,
EBPFSignalEventsRingBufferSize: *EBPFSignalEventsRingBufferSize,
EBPFEventsRingBufferSize: *EBPFEventsRingBufferSize,
EBPFSkbEventsRingBufferSize: *EBPFSkbEventsRingBufferSize,
MutedNamespaces: *mutedNamespaces,
SignatureEngineConfig: signature.SignatureEngineConfig{
InputChanSize: *signatureEngineInputEventChanSize,
OutputChanSize: *signatureEngineOutputEventChanSize,
Expand All @@ -178,7 +182,6 @@ func NewRunCommand(version string) *cobra.Command {
Netflow: app.NetflowConfig{
Enabled: *netflowEnabled,
SampleSubmitIntervalSeconds: *netflowSampleSubmitIntervalSeconds,
OutputChanSize: *netflowOutputChanSize,
Grouping: netflowGrouping,
},
Clickhouse: app.ClickhouseConfig{
Expand Down
5 changes: 0 additions & 5 deletions cmd/agent/daemon/metrics/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ var (
Help: "Counter for tracking pulled events bytes from kernel rate",
}, []string{EventTypeLabel})

AgentKernelLostEventsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "kvisor_agent_kernel_lost_events_total",
Help: "Counter for tracking lost events from kernel rate",
})

AgentSkippedEventsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kvisor_agent_skipped_events_total",
Help: "Counter for tracking skipped events rate",
Expand Down
1 change: 0 additions & 1 deletion cmd/agent/daemon/state/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type netStatsReader interface {

type ebpfTracer interface {
Events() <-chan *types.Event
NetflowEvents() <-chan *types.Event
MuteEventsFromCgroup(cgroup uint64) error
MuteEventsFromCgroups(cgroups []uint64) error
UnmuteEventsFromCgroup(cgroup uint64) error
Expand Down
25 changes: 13 additions & 12 deletions e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,18 +659,19 @@ func (t *testCASTAIServer) validateEvents(ctx context.Context, timeout time.Dura
castaipb.EventType_EVENT_PROCESS_OOM: func(e *castaipb.Event) error {
return nil
},
castaipb.EventType_EVENT_STDIO_VIA_SOCKET: func(e *castaipb.Event) error {
if _, ok := netip.AddrFromSlice(e.GetStdioViaSocket().Ip); !ok {
return fmt.Errorf("invalid address %v", string(e.GetStdioViaSocket().Ip))
}
if e.GetStdioViaSocket().Port == 0 {
return fmt.Errorf("empyt port")
}
if fd := e.GetStdioViaSocket().Socketfd; fd > 2 {
return fmt.Errorf("invalid %d fd", fd)
}
return nil
},
// TODO(anjmao): Rework syscall tracing.
//castaipb.EventType_EVENT_STDIO_VIA_SOCKET: func(e *castaipb.Event) error {
// if _, ok := netip.AddrFromSlice(e.GetStdioViaSocket().Ip); !ok {
// return fmt.Errorf("invalid address %v", string(e.GetStdioViaSocket().Ip))
// }
// if e.GetStdioViaSocket().Port == 0 {
// return fmt.Errorf("empyt port")
// }
// if fd := e.GetStdioViaSocket().Socketfd; fd > 2 {
// return fmt.Errorf("invalid %d fd", fd)
// }
// return nil
//},
castaipb.EventType_EVENT_TTY_WRITE: func(e *castaipb.Event) error {
if e.GetFile().Path == "" {
return fmt.Errorf("empyt file path")
Expand Down
14 changes: 9 additions & 5 deletions pkg/ebpftracer/c/headers/common/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,11 @@ statfunc int save_args_to_submit_buf(event_data_t *event, args_t *args)
return arg_num;
}

#define events_perf_submit(p, id, ret) do_perf_submit(&events, p, id, ret, true)
#define signal_events_perf_submit(p, id, ret) do_perf_submit(&signal_events, p, id, ret, true)
#define events_ringbuf_submit(p, id, ret) do_ringbuf_submit(&events, p, id, ret, true, EVENTS_RINGBUF_DISCARD)
#define signal_events_ringbuf_submit(p, id, ret) do_ringbuf_submit(&signal_events, p, id, ret, true, SIGNAL_EVENTS_RINGBUF_DISCARD)

statfunc int do_ringbuf_submit(void *target, program_data_t *p, u32 id, long ret, bool init_task_ctx, enum metric m) {

statfunc int do_perf_submit(void *target, program_data_t *p, u32 id, long ret, bool init_task_ctx)
{
p->event->context.eventid = id;
p->event->context.retval = ret;

Expand All @@ -500,7 +500,11 @@ statfunc int do_perf_submit(void *target, program_data_t *p, u32 id, long ret, b
:
: [size] "r"(size), [max_size] "i"(MAX_EVENT_SIZE));

return bpf_perf_event_output(p->ctx, target, BPF_F_CURRENT_CPU, p->event, size);
if (bpf_ringbuf_output(target, p->event, size, 0) < 0) {
metrics_increase(m);
return 1;
}
return 0;
}

statfunc event_data_t *find_next_free_scratch_buf(void *scratch_map)
Expand Down
17 changes: 15 additions & 2 deletions pkg/ebpftracer/c/headers/maps.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,24 @@ BPF_ARRAY(config_map, config_t, 1);

// TODO(patrick.pichler): think about maybe removing this
BPF_PERF_OUTPUT(logs, 1024); // logs submission
BPF_PERF_OUTPUT(events, 8192); // events submission
BPF_PERF_OUTPUT(signal_events, 8192); // signal events submission
BPF_PERF_OUTPUT(file_writes, 1024); // file writes events submission
BPF_PERF_OUTPUT(signals, 1024); // control plane signals submissions

struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1); // Actual size is set in user space before loading.
} signal_events SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1); // Actual size is set in user space before loading.
} events SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1); // Actual size is set in user space before loading.
} skb_events SEC(".maps");

// Network Maps

#define MAX_NETFLOWS 65535
Expand Down
16 changes: 4 additions & 12 deletions pkg/ebpftracer/c/headers/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ enum metric {
NO_FREE_SCRATCH_BUFFER_SOCKET_SET_STATE,
NO_FREE_SCRATCH_BUFFER_NETFLOWS,

SIGNAL_EVENTS_RINGBUF_DISCARD,
EVENTS_RINGBUF_DISCARD,
SKB_EVENTS_RINGBUF_DISCARD,

MAX_METRIC,
};

Expand Down Expand Up @@ -408,18 +412,6 @@ typedef struct net_event_contextmd {
u8 captured; // packet has already been captured
} __attribute__((__packed__)) net_event_contextmd_t;

typedef struct net_event_context {
event_context_t eventctx;
u8 argnum;
struct { // event arguments (needs packing), use anonymous struct to ...
u8 index0;
u32 bytes;
// ... (payload sent by bpf_perf_event_output)
} __attribute__((__packed__)); // ... avoid address-of-packed-member warns
// members bellow this point are metadata (not part of event to be sent)
net_event_contextmd_t md;
} __attribute__((__packed__)) net_event_context_t;

// network related maps

typedef struct net_task_context {
Expand Down
Loading

0 comments on commit 7b61e8b

Please sign in to comment.