Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-2005: Support multiple flow filter rules using json fmt string #473

Merged
merged 3 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@
*/
#include "pkt_translation.h"

static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len) {
static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len,
u32 sampling) {
bpf_spin_lock(&aggregate_flow->lock);
aggregate_flow->packets += 1;
aggregate_flow->bytes += len;
aggregate_flow->end_mono_time_ts = pkt->current_ts;
aggregate_flow->flags |= pkt->flags;
aggregate_flow->dscp = pkt->dscp;
aggregate_flow->sampling = sampling;
bpf_spin_unlock(&aggregate_flow->lock);
}

Expand All @@ -77,15 +79,7 @@ static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt,
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
Copy link
Member

@jotak jotak Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could have optimized this, and if no filter_sampling is defined, run the global sampling check earlier. Keep the late sampling check only when there filter-based sampling defined.
So when user runs with no filter-sampling defined, but just global sampling, it's more efficient and doesn't parse headers.

Copy link
Member

@jotak jotak Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running some perf, while user-space metrics look unchanged, using bpftop I do see a consequent increase in CPU in kernel, around +100%, when using global sampling=50 and no filter sampling, compared to main baseline before this PR was merged.
I've put some numbers here: https://docs.google.com/spreadsheets/d/1BzrAXr-XEWjizBf-tFhtHcKMNyQOvvW2HH4zXeoDxE8/edit?gid=0#gid=0

if (sampling > 1 && (bpf_get_prandom_u32() % sampling) != 0) {
do_sampling = 0;
return TC_ACT_OK;
}
do_sampling = 1;

u16 eth_protocol = 0;

pkt_info pkt;
__builtin_memset(&pkt, 0, sizeof(pkt));

Expand All @@ -109,18 +103,29 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
id.direction = direction;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol);
u32 filter_sampling = 0;
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling);
if (skip) {
return TC_ACT_OK;
}
if (filter_sampling == 0) {
filter_sampling = sampling;
}

// If sampling is defined, will only parse 1 out of "sampling" flows
if (filter_sampling > 1 && (bpf_get_prandom_u32() % filter_sampling) != 0) {
do_sampling = 0;
return TC_ACT_OK;
}
do_sampling = 1;

int dns_errno = 0;
if (enable_dns_tracking) {
dns_errno = track_dns_packet(skb, &pkt);
}
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, len);
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow = {
Expand All @@ -131,6 +136,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
.dscp = pkt.dscp,
.sampling = filter_sampling,
};
__builtin_memcpy(new_flow.dst_mac, eth->h_dest, ETH_ALEN);
__builtin_memcpy(new_flow.src_mac, eth->h_source, ETH_ALEN);
Expand All @@ -144,7 +150,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
flow_metrics *aggregate_flow =
(flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, len);
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
} else {
if (trace_messages) {
bpf_printk("failed to update an exising flow\n");
Expand Down
13 changes: 9 additions & 4 deletions bpf/flows_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static __always_inline int is_equal_ip(u8 *ip1, u8 *ip2, u8 len) {

static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_t *key,
filter_action *action, u8 len, u8 offset,
u16 flags, u32 drop_reason) {
u16 flags, u32 drop_reason, u32 *sampling) {
int result = 0;

struct filter_value_t *rule = (struct filter_value_t *)bpf_map_lookup_elem(&filter_map, key);
Expand Down Expand Up @@ -195,6 +195,11 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_
goto end;
}
}
u32 sample = rule->sample;
if (sample && sampling != NULL) {
BPF_PRINTK("sampling action is set to %d\n", sample);
*sampling = sample;
}
}
end:
BPF_PRINTK("result: %d action %d\n", result, *action);
Expand Down Expand Up @@ -233,7 +238,7 @@ static __always_inline int flow_filter_setup_lookup_key(flow_id *id, struct filt
* check if the flow match filter rule and return >= 1 if the flow is to be dropped
*/
static __always_inline int is_flow_filtered(flow_id *id, filter_action *action, u16 flags,
u32 drop_reason, u16 eth_protocol) {
u32 drop_reason, u16 eth_protocol, u32 *sampling) {
struct filter_key_t key;
u8 len, offset;
int result = 0;
Expand All @@ -247,7 +252,7 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
return result;
}

result = do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason);
result = do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling);
// we have a match so return
if (result > 0) {
return result;
Expand All @@ -259,7 +264,7 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
return result;
}

return do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason);
return do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling);
}

#endif //__FLOWS_FILTER_H__
2 changes: 1 addition & 1 deletion bpf/network_events_monitoring.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
if (skip) {
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/pca.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) {
id.direction = dir;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, NULL);
if (skip) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/pkt_drops.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ static inline int trace_pkt_drop(void *ctx, u8 state, struct sk_buff *skb,
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, reason, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, flags, reason, eth_protocol, NULL);
if (skip) {
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/pkt_translation.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ static inline int trace_nat_manip_pkt(struct nf_conn *ct, struct sk_buff *skb) {
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
if (skip) {
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/rtt_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) {
rtt *= 1000u;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
if (skip) {
return 0;
}
Expand Down
4 changes: 3 additions & 1 deletion bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ typedef __u64 u64;
#define DSCP_SHIFT 2
#define DSCP_MASK 0x3F

#define MAX_FILTER_ENTRIES 1 // we have only one global filter
#define MAX_FILTER_ENTRIES 16
#define MAX_EVENT_MD 8
#define MAX_NETWORK_EVENTS 4

Expand Down Expand Up @@ -99,6 +99,7 @@ typedef struct flow_metrics_t {
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
u8 errno;
u8 dscp;
u32 sampling;
} flow_metrics;

// Force emitting enums/structs into the ELF
Expand Down Expand Up @@ -262,6 +263,7 @@ struct filter_value_t {
filter_action action;
tcp_flags tcpFlags;
u8 filter_drops;
u32 sample;
u8 ip[IP_MAX_LEN];
} __attribute__((packed));

Expand Down
4 changes: 2 additions & 2 deletions bpf/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,11 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, pkt_info *pkt,
* check if flow filter is enabled and if we need to continue processing the packet or not
*/
static inline bool check_and_do_flow_filtering(flow_id *id, u16 flags, u32 drop_reason,
u16 eth_protocol) {
u16 eth_protocol, u32 *sampling) {
// check if this packet need to be filtered if filtering feature is enabled
if (enable_flows_filtering || enable_pca) {
filter_action action = ACCEPT;
if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol) != 0 &&
if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol, sampling) != 0 &&
action != MAX_FILTER_ACTIONS) {
// we have matching rules follow through the actions to decide if we should accept or reject the flow
// and update global counter for both cases
Expand Down
36 changes: 24 additions & 12 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -199,7 +200,29 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
if cfg.LogLevel == logrus.TraceLevel.String() || cfg.LogLevel == logrus.DebugLevel.String() {
debug = true
}
filterRules := make([]*tracer.FilterConfig, 0)
if cfg.EnableFlowFilter {
var flowFilters []*FlowFilter
if err := json.Unmarshal([]byte(cfg.FlowFilterRules), &flowFilters); err != nil {
return nil, err
}

for _, r := range flowFilters {
filterRules = append(filterRules, &tracer.FilterConfig{
FilterAction: r.FilterAction,
FilterDirection: r.FilterDirection,
FilterIPCIDR: r.FilterIPCIDR,
FilterProtocol: r.FilterProtocol,
FilterPeerIP: r.FilterPeerIP,
FilterDestinationPort: tracer.ConvertFilterPortsToInstr(r.FilterDestinationPort, r.FilterDestinationPortRange, r.FilterDestinationPorts),
FilterSourcePort: tracer.ConvertFilterPortsToInstr(r.FilterSourcePort, r.FilterSourcePortRange, r.FilterSourcePorts),
FilterPort: tracer.ConvertFilterPortsToInstr(r.FilterPort, r.FilterPortRange, r.FilterPorts),
FilterTCPFlags: r.FilterTCPFlags,
FilterDrops: r.FilterDrops,
FilterSample: r.FilterSample,
})
}
}
ebpfConfig := &tracer.FlowFetcherConfig{
EnableIngress: ingress,
EnableEgress: egress,
Expand All @@ -216,18 +239,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
EnablePktTranslation: cfg.EnablePktTranslationTracking,
UseEbpfManager: cfg.EbpfProgramManagerMode,
BpfManBpfFSPath: cfg.BpfManBpfFSPath,
FilterConfig: &tracer.FilterConfig{
FilterAction: cfg.FilterAction,
FilterDirection: cfg.FilterDirection,
FilterIPCIDR: cfg.FilterIPCIDR,
FilterProtocol: cfg.FilterProtocol,
FilterPeerIP: cfg.FilterPeerIP,
FilterDestinationPort: tracer.ConvertFilterPortsToInstr(cfg.FilterDestinationPort, cfg.FilterDestinationPortRange, cfg.FilterDestinationPorts),
FilterSourcePort: tracer.ConvertFilterPortsToInstr(cfg.FilterSourcePort, cfg.FilterSourcePortRange, cfg.FilterSourcePorts),
FilterPort: tracer.ConvertFilterPortsToInstr(cfg.FilterPort, cfg.FilterPortRange, cfg.FilterPorts),
FilterTCPFLags: cfg.FilterTCPFlags,
FilterDrops: cfg.FilterDrops,
},
FilterConfig: filterRules,
}

fetcher, err := tracer.NewFlowFetcher(ebpfConfig)
Expand Down
103 changes: 55 additions & 48 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,59 @@ const (
IPIfaceNamedPrefix = "name:"
)

type FlowFilter struct {
// FilterDirection is the direction of the flow filter.
// Possible values are "Ingress" or "Egress".
FilterDirection string `json:"direction,omitempty"`
// FilterIPCIDR is the IP CIDR to filter flows.
// Example: 10.10.10.0/24 or 100:100:100:100::/64, default is 0.0.0.0/0
FilterIPCIDR string `json:"ip_cidr,omitempty"`
// FilterProtocol is the protocol to filter flows.
// supported protocols: TCP, UDP, SCTP, ICMP, ICMPv6
FilterProtocol string `json:"protocol,omitempty"`
// FilterSourcePort is the source port to filter flows.
FilterSourcePort int32 `json:"source_port,omitempty"`
// FilterDestinationPort is the destination port to filter flows.
FilterDestinationPort int32 `json:"destination_port,omitempty"`
// FilterPort is the port to filter flows, can be use for either source or destination port.
FilterPort int32 `json:"port,omitempty"`
// FilterSourcePortRange is the source port range to filter flows.
// Example: 8000-8010
FilterSourcePortRange string `json:"source_port_range,omitempty"`
// FilterSourcePorts is two source ports to filter flows.
// Example: 8000,8010
FilterSourcePorts string `json:"source_ports,omitempty"`
// FilterDestinationPortRange is the destination port range to filter flows.
// Example: 8000-8010
FilterDestinationPortRange string `json:"destination_port_range,omitempty"`
// FilterDestinationPorts is two destination ports to filter flows.
// Example: 8000,8010
FilterDestinationPorts string `json:"destination_ports,omitempty"`
// FilterPortRange is the port range to filter flows, can be used for either source or destination port.
// Example: 8000-8010
FilterPortRange string `json:"port_range,omitempty"`
// FilterPorts is two ports option to filter flows, can be used for either source or destination port.
// Example: 8000,8010
FilterPorts string `json:"ports,omitempty"`
// FilterICMPType is the ICMP type to filter flows.
FilterICMPType int `json:"icmp_type,omitempty"`
// FilterICMPCode is the ICMP code to filter flows.
FilterICMPCode int `json:"icmp_code,omitempty"`
// FilterPeerIP is the IP to filter flows.
// Example: 10.10.10.10
FilterPeerIP string `json:"peer_ip,omitempty"`
// FilterAction is the action to filter flows.
// Possible values are "Accept" or "Reject".
FilterAction string `json:"action,omitempty"`
// FilterTCPFlags is the TCP flags to filter flows.
// possible values are: SYN, SYN-ACK, ACK, FIN, RST, PSH, URG, ECE, CWR, FIN-ACK, RST-ACK
FilterTCPFlags string `json:"tcp_flags,omitempty"`
// FilterDrops allow filtering flows with packet drops, default is false.
FilterDrops bool `json:"drops,omitempty"`
// FilterSample is the sample rate this matching flow will use
FilterSample uint32 `json:"sample,omitempty"`
}

type Config struct {
// AgentIP allows overriding the reported Agent IP address on each flow.
AgentIP string `env:"AGENT_IP"`
Expand Down Expand Up @@ -183,54 +236,8 @@ type Config struct {
MetricsPrefix string `env:"METRICS_PREFIX" envDefault:"ebpf_agent_"`
// EnableFlowFilter enables flow filter, default is false.
EnableFlowFilter bool `env:"ENABLE_FLOW_FILTER" envDefault:"false"`
// FilterDirection is the direction of the flow filter.
// Possible values are "Ingress" or "Egress".
FilterDirection string `env:"FILTER_DIRECTION"`
// FilterIPCIDR is the IP CIDR to filter flows.
// Example: 10.10.10.0/24 or 100:100:100:100::/64
FilterIPCIDR string `env:"FILTER_IP_CIDR" envDefault:"0.0.0.0/0"`
// FilterProtocol is the protocol to filter flows.
// Example: TCP, UDP, SCTP, ICMP, ICMPv6
FilterProtocol string `env:"FILTER_PROTOCOL"`
// FilterSourcePort is the source port to filter flows.
FilterSourcePort int32 `env:"FILTER_SOURCE_PORT"`
// FilterDestinationPort is the destination port to filter flows.
FilterDestinationPort int32 `env:"FILTER_DESTINATION_PORT"`
// FilterPort is the port to filter flows, can be use for either source or destination port.
FilterPort int32 `env:"FILTER_PORT"`
// FilterSourcePortRange is the source port range to filter flows.
// Example: 8000-8010
FilterSourcePortRange string `env:"FILTER_SOURCE_PORT_RANGE"`
// FilterSourcePorts is two source ports to filter flows.
// Example: 8000,8010
FilterSourcePorts string `env:"FILTER_SOURCE_PORTS"`
// FilterDestinationPortRange is the destination port range to filter flows.
// Example: 8000-8010
FilterDestinationPortRange string `env:"FILTER_DESTINATION_PORT_RANGE"`
// FilterDestinationPorts is two destination ports to filter flows.
// Example: 8000,8010
FilterDestinationPorts string `env:"FILTER_DESTINATION_PORTS"`
// FilterPortRange is the port range to filter flows, can be used for either source or destination port.
// Example: 8000-8010
FilterPortRange string `env:"FILTER_PORT_RANGE"`
// FilterPorts is two ports option to filter flows, can be used for either source or destination port.
// Example: 8000,8010
FilterPorts string `env:"FILTER_PORTS"`
// FilterICMPType is the ICMP type to filter flows.
FilterICMPType int `env:"FILTER_ICMP_TYPE"`
// FilterICMPCode is the ICMP code to filter flows.
FilterICMPCode int `env:"FILTER_ICMP_CODE"`
// FilterPeerIP is the IP to filter flows.
// Example: 10.10.10.10
FilterPeerIP string `env:"FILTER_PEER_IP"`
// FilterAction is the action to filter flows.
// Possible values are "Accept" or "Reject".
FilterAction string `env:"FILTER_ACTION" envDefault:"Accept"`
// FilterTCPFlags is the TCP flags to filter flows.
// possible values are: SYN, SYN-ACK, ACK, FIN, RST, PSH, URG, ECE, CWR, FIN-ACK, RST-ACK
FilterTCPFlags string `env:"FILTER_TCP_FLAGS"`
// FilterDrops allow filtering flows with packet drops, default is false.
FilterDrops bool `env:"FILTER_DROPS" envDefault:"false"`
// FlowFilterRules list of flow filter rules
FlowFilterRules string `env:"FLOW_FILTER_RULES"`
// EnableNetworkEventsMonitoring enables monitoring network plugin events, default is false.
EnableNetworkEventsMonitoring bool `env:"ENABLE_NETWORK_EVENTS_MONITORING" envDefault:"false"`
// NetworkEventsMonitoringGroupID to allow ebpf hook to process samples for specific groupID and ignore the rest
Expand Down
Loading
Loading