Skip to content

Commit

Permalink
Merge pull request #496 from jotak/sampling-opt
Browse files Browse the repository at this point in the history
Restore performances in filtering case
  • Loading branch information
msherif1234 authored Jan 9, 2025
2 parents 7c29f2a + 24bceb9 commit 983f777
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 27 deletions.
1 change: 1 addition & 0 deletions bpf/configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

// Constant definitions, to be overridden by the invoker
volatile const u32 sampling = 0;
volatile const u8 has_filter_sampling = 0;
volatile const u8 trace_messages = 0;
volatile const u8 enable_rtt = 0;
volatile const u8 enable_pca = 0;
Expand Down
20 changes: 10 additions & 10 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,13 @@ static inline void add_observed_intf(additional_metrics *value, u32 if_index, u8
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
u32 filter_sampling = 0;

if (!is_filter_enabled()) {
if (!has_filter_sampling) {
// When no filter sampling is defined, run the sampling check at the earliest for better performances
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling > 1 && (bpf_get_prandom_u32() % sampling) != 0) {
do_sampling = 0;
return TC_ACT_OK;
}
filter_sampling = sampling;
do_sampling = 1;
}

Expand All @@ -128,9 +127,10 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}

// check if this packet need to be filtered if filtering feature is enabled
if (is_filter_enabled()) {
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling,
direction);
u32 filter_sampling = 0;
bool skip =
check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling, direction);
if (has_filter_sampling) {
if (filter_sampling == 0) {
filter_sampling = sampling;
}
Expand All @@ -140,9 +140,9 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
return TC_ACT_OK;
}
do_sampling = 1;
if (skip) {
return TC_ACT_OK;
}
}
if (skip) {
return TC_ACT_OK;
}

int dns_errno = 0;
Expand Down
Binary file modified pkg/ebpf/bpf_arm64_bpfel.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_powerpc_bpfel.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_s390_bpfeb.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_x86_bpfel.o
Binary file not shown.
25 changes: 14 additions & 11 deletions pkg/tracer/flow_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,14 @@ type FilterConfig struct {
}

type Filter struct {
// eBPF objs to create/update eBPF maps
objects *ebpf.BpfObjects
config []*FilterConfig
config []*FilterConfig
}

func NewFilter(objects *ebpf.BpfObjects, cfg []*FilterConfig) *Filter {
return &Filter{
objects: objects,
config: cfg,
}
func NewFilter(cfg []*FilterConfig) *Filter {
return &Filter{config: cfg}
}

func (f *Filter) ProgramFilter() error {

func (f *Filter) ProgramFilter(objects *ebpf.BpfObjects) error {
for _, config := range f.config {
log.Infof("Flow filter config: %v", f.config)
key, err := f.getFilterKey(config)
Expand All @@ -55,7 +49,7 @@ func (f *Filter) ProgramFilter() error {
return fmt.Errorf("failed to get filter value: %w", err)
}

err = f.objects.FilterMap.Update(key, val, cilium.UpdateAny)
err = objects.FilterMap.Update(key, val, cilium.UpdateAny)
if err != nil {
return fmt.Errorf("failed to update filter map: %w", err)
}
Expand Down Expand Up @@ -264,3 +258,12 @@ func ConvertFilterPortsToInstr(intPort int32, rangePorts, ports string) intstr.I
}
return intstr.FromInt32(intPort)
}

func (f *Filter) hasSampling() uint8 {
for _, r := range f.config {
if r.FilterSample > 0 {
return 1
}
}
return 0
}
21 changes: 15 additions & 6 deletions pkg/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
dnsLatencyMap = "dns_flows"
// constants defined in flows.c as "volatile const"
constSampling = "sampling"
constHasFilterSampling = "has_filter_sampling"
constTraceMessages = "trace_messages"
constEnableRtt = "enable_rtt"
constEnableDNSTracking = "enable_dns_tracking"
Expand Down Expand Up @@ -114,6 +115,10 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
var err error
objects := ebpf.BpfObjects{}
var pinDir string
var filter *Filter
if cfg.EnableFlowFilter {
filter = NewFilter(cfg.FilterConfig)
}

if !cfg.UseEbpfManager {
if err := rlimit.RemoveMemlock(); err != nil {
Expand Down Expand Up @@ -162,8 +167,10 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
}

enableFlowFiltering := 0
if cfg.EnableFlowFilter {
hasFilterSampling := uint8(0)
if filter != nil {
enableFlowFiltering = 1
hasFilterSampling = filter.hasSampling()
}
enableNetworkEventsMonitoring := 0
if cfg.EnableNetworkEventsMonitoring {
Expand All @@ -178,7 +185,9 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
enablePktTranslation = 1
}
if err := spec.RewriteConstants(map[string]interface{}{
// When adding constants here, remember to delete them in NewPacketFetcher
constSampling: uint32(cfg.Sampling),
constHasFilterSampling: hasFilterSampling,
constTraceMessages: uint8(traceMsgs),
constEnableRtt: uint8(enableRtt),
constEnableDNSTracking: uint8(enableDNSTracking),
Expand Down Expand Up @@ -326,9 +335,8 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
}
}

if cfg.EnableFlowFilter {
f := NewFilter(&objects, cfg.FilterConfig)
if err := f.ProgramFilter(); err != nil {
if filter != nil {
if err := filter.ProgramFilter(&objects); err != nil {
return nil, fmt.Errorf("programming flow filter: %w", err)
}
}
Expand Down Expand Up @@ -1291,6 +1299,7 @@ func NewPacketFetcher(cfg *FlowFetcherConfig) (*PacketFetcher, error) {
delete(spec.Programs, aggregatedFlowsMap)
delete(spec.Programs, additionalFlowMetrics)
delete(spec.Programs, constSampling)
delete(spec.Programs, constHasFilterSampling)
delete(spec.Programs, constTraceMessages)
delete(spec.Programs, constEnableDNSTracking)
delete(spec.Programs, constDNSTrackingPort)
Expand Down Expand Up @@ -1330,8 +1339,8 @@ func NewPacketFetcher(cfg *FlowFetcherConfig) (*PacketFetcher, error) {
},
}

f := NewFilter(&objects, cfg.FilterConfig)
if err := f.ProgramFilter(); err != nil {
f := NewFilter(cfg.FilterConfig)
if err := f.ProgramFilter(&objects); err != nil {
return nil, fmt.Errorf("programming flow filter: %w", err)
}

Expand Down

0 comments on commit 983f777

Please sign in to comment.