Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1996: in-kernel de-duplication #470

Merged
merged 12 commits into from
Jan 8, 2025
77 changes: 58 additions & 19 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt,
}
}

static inline void add_observed_intf(additional_metrics *value, u32 if_index, u8 direction) {
if (value->nb_observed_intf < MAX_OBSERVED_INTERFACES) {
for (u8 i = 0; i < value->nb_observed_intf; i++) {
if (value->observed_intf[i].if_index == if_index &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should check if_index make sure its not 0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm do you think I should also ignore setting if_index_first_seen to skb->ifindex if it's 0 ?
(in

        flow_metrics new_flow = {
            .if_index_first_seen = skb->ifindex,
            .direction_first_seen = direction,
            .packets = 1,
            .bytes = len,
// ...

)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(idk in what cases it would be 0 and if it's something to handle in a special way)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added else if (skb->ifindex != 0) earlier in the code path

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are certian hooks that don't populate if_index u can see examples of that in the current code where we skip populating flow if

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok but here it's the tc hook ; maybe that's something not relevant now that other hooks use a different map? anyway it won't hurt to keep these checks, I guess...

value->observed_intf[i].direction == direction) {
return;
}
}
value->observed_intf[value->nb_observed_intf].if_index = if_index;
value->observed_intf[value->nb_observed_intf].direction = direction;
value->nb_observed_intf++;
jotak marked this conversation as resolved.
Show resolved Hide resolved
} else {
increase_counter(OBSERVED_INTF_MISSED);
BPF_PRINTK("observed interface missed (array capacity reached) for ifindex %d\n", if_index);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need an else here and log a msg and probably updated a counter I assume the value of 4 is based on what we see today but depends on how often we come to else we might need to increase it.

what happens when we hit the above condition is that a bad flow that we should drop or u can return error code in this function and do the print and counter update at the caller but we can't just be silent when this condition happens

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, +1

Copy link
Member Author

@jotak jotak Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's the same thing with MAX_NETWORK_EVENTS that we may silently ignore. I'll check to add that at the same time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

network events I return error so that should be fine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I didn't notice the rolling increase here
So when capacity is reached, it erases the oldest entry, but it do so silently, or am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(anyway, I won't touch that in this PR, will focus just on observed_intf)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that was by design newer events will overwrite older ones but as of now we can't get more than 3 events max per flow so we should be fine for now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done here f2a7a94

}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
u32 filter_sampling = 0;

Expand Down Expand Up @@ -110,13 +127,10 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
return TC_ACT_OK;
}

//Set extra fields
id.if_index = skb->ifindex;
id.direction = direction;

// check if this packet need to be filtered if filtering feature is enabled
if (is_filter_enabled()) {
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling);
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling,
direction);
if (filter_sampling == 0) {
filter_sampling = sampling;
}
Expand All @@ -137,19 +151,47 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
if (aggregate_flow->if_index_first_seen == skb->ifindex) {
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
} else if (skb->ifindex != 0) {
// Only add info that we've seen this interface
additional_metrics *extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
add_observed_intf(extra_metrics, skb->ifindex, direction);
} else {
additional_metrics new_metrics = {
.eth_protocol = eth_protocol,
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
};
add_observed_intf(&new_metrics, skb->ifindex, direction);
long ret =
bpf_map_update_elem(&additional_flow_metrics, &id, &new_metrics, BPF_NOEXIST);
if (ret == -EEXIST) {
extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
add_observed_intf(extra_metrics, skb->ifindex, direction);
}
} else if (ret != 0 && trace_messages) {
bpf_printk("error creating new observed_intf: %d\n", ret);
}
}
}
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow = {
.packets = 1,
.bytes = len,
.eth_protocol = eth_protocol,
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
.dscp = pkt.dscp,
.sampling = filter_sampling,
};
flow_metrics new_flow;
__builtin_memset(&new_flow, 0, sizeof(new_flow));
new_flow.if_index_first_seen = skb->ifindex;
new_flow.direction_first_seen = direction;
new_flow.packets = 1;
new_flow.bytes = len;
new_flow.eth_protocol = eth_protocol;
new_flow.start_mono_time_ts = pkt.current_ts;
new_flow.end_mono_time_ts = pkt.current_ts;
new_flow.dscp = pkt.dscp;
new_flow.sampling = filter_sampling;
__builtin_memcpy(new_flow.dst_mac, eth->h_dest, ETH_ALEN);
__builtin_memcpy(new_flow.src_mac, eth->h_source, ETH_ALEN);

Expand Down Expand Up @@ -194,9 +236,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {

// Update additional metrics (per-CPU map)
if (pkt.dns_id != 0 || dns_errno != 0) {
// hack on id will be removed with dedup-in-kernel work
id.direction = 0;
id.if_index = 0;
additional_metrics *extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
Expand Down
16 changes: 10 additions & 6 deletions bpf/flows_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ static __always_inline int is_equal_ip(u8 *ip1, u8 *ip2, u8 len) {

static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_t *key,
filter_action *action, u8 len, u8 offset,
u16 flags, u32 drop_reason, u32 *sampling) {
u16 flags, u32 drop_reason, u32 *sampling,
u8 direction) {
int result = 0;

struct filter_value_t *rule = (struct filter_value_t *)bpf_map_lookup_elem(&filter_map, key);
Expand Down Expand Up @@ -161,7 +162,7 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_

if (!is_zero_ip(rule->ip, len)) {
// for Ingress side we can filter using dstIP and for Egress side we can filter using srcIP
if (id->direction == INGRESS) {
if (direction == INGRESS) {
if (is_equal_ip(rule->ip, id->dst_ip + offset, len)) {
BPF_PRINTK("dstIP matched\n");
result++;
Expand All @@ -181,7 +182,7 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_
}

if (rule->direction != MAX_DIRECTION) {
if (rule->direction == id->direction) {
if (rule->direction == direction) {
BPF_PRINTK("direction matched\n");
result++;
} else {
Expand Down Expand Up @@ -237,7 +238,8 @@ static __always_inline int flow_filter_setup_lookup_key(flow_id *id, struct filt
* check if the flow match filter rule and return >= 1 if the flow is to be dropped
*/
static __always_inline int is_flow_filtered(flow_id *id, filter_action *action, u16 flags,
u32 drop_reason, u16 eth_protocol, u32 *sampling) {
u32 drop_reason, u16 eth_protocol, u32 *sampling,
u8 direction) {
struct filter_key_t key;
u8 len, offset;
int result = 0;
Expand All @@ -251,7 +253,8 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
return result;
}

result = do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling);
result = do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling,
direction);
// we have a match so return
if (result > 0) {
return result;
Expand All @@ -263,7 +266,8 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
return result;
}

return do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling);
return do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling,
direction);
}

#endif //__FLOWS_FILTER_H__
11 changes: 4 additions & 7 deletions bpf/network_events_monitoring.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,14 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL, 0);
if (skip) {
return 0;
}

for (direction dir = INGRESS; dir < MAX_DIRECTION; dir++) {
id.direction = dir;
ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
if (ret == 0) {
return ret;
}
ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
if (ret == 0) {
return ret;
}

// there is no matching flows so lets create new one and add the network event metadata
Expand Down
6 changes: 1 addition & 5 deletions bpf/pca.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,8 @@ static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) {
return false;
}

//Set extra fields
id.if_index = skb->ifindex;
id.direction = dir;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, NULL);
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, NULL, dir);
if (skip) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/pkt_drops.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ static inline int trace_pkt_drop(void *ctx, u8 state, struct sk_buff *skb,
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, reason, eth_protocol, NULL);
bool skip = check_and_do_flow_filtering(&id, flags, reason, eth_protocol, NULL, 0);
if (skip) {
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/pkt_translation.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ static inline int trace_nat_manip_pkt(struct nf_conn *ct, struct sk_buff *skb) {
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL, 0);
if (skip) {
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/rtt_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ static inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) {
rtt *= 1000u;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL, 0);
if (skip) {
return 0;
}
Expand Down
48 changes: 25 additions & 23 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ typedef __u64 u64;
#define MAX_FILTER_ENTRIES 16
#define MAX_EVENT_MD 8
#define MAX_NETWORK_EVENTS 4
#define MAX_OBSERVED_INTERFACES 4

// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
typedef enum direction_t {
Expand All @@ -80,26 +81,29 @@ const enum direction_t *unused1 __attribute__((unused));
const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};

typedef struct flow_metrics_t {
struct bpf_spin_lock lock;
u16 eth_protocol;
// L2 data link layer
u8 src_mac[ETH_ALEN];
u8 dst_mac[ETH_ALEN];
u32 packets;
u64 bytes;
// Flow start and end times as monotomic timestamps in nanoseconds
// as output from bpf_ktime_get_ns()
u64 start_mono_time_ts;
u64 end_mono_time_ts;
u64 bytes;
u32 packets;
u16 eth_protocol;
// TCP Flags from https://www.ietf.org/rfc/rfc793.txt
u16 flags;
// L2 data link layer
u8 src_mac[ETH_ALEN];
u8 dst_mac[ETH_ALEN];
// OS interface index
u32 if_index_first_seen;
struct bpf_spin_lock lock;
u32 sampling;
u8 direction_first_seen;
// The positive errno of a failed map insertion that caused a flow
// to be sent via ringbuffer.
// 0 otherwise
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
u8 errno;
u8 dscp;
u32 sampling;
} flow_metrics;

// Force emitting enums/structs into the ELF
Expand All @@ -109,22 +113,20 @@ typedef struct additional_metrics_t {
u64 start_mono_time_ts;
u64 end_mono_time_ts;
struct dns_record_t {
u64 latency;
u16 id;
u16 flags;
u64 latency;
u8 errno;
} dns_record;
struct pkt_drops_t {
u32 packets;
u64 bytes;
u32 packets;
u32 latest_drop_cause;
u16 latest_flags;
u8 latest_state;
u32 latest_drop_cause;
} pkt_drops;
u64 flow_rtt;
u8 network_events_idx;
u8 network_events[MAX_NETWORK_EVENTS][MAX_EVENT_MD];
u16 eth_protocol;
struct translated_flow_t {
u8 saddr[IP_MAX_LEN];
u8 daddr[IP_MAX_LEN];
Expand All @@ -133,23 +135,24 @@ typedef struct additional_metrics_t {
u16 zone_id;
u8 icmp_id;
} translated_flow;
struct observed_intf_t {
u8 direction;
u32 if_index;
} observed_intf[MAX_OBSERVED_INTERFACES];
u16 eth_protocol;
u8 network_events_idx;
u8 nb_observed_intf;
} additional_metrics;

// Force emitting enums/structs into the ELF
const struct additional_metrics_t *unused3 __attribute__((unused));

// Force emitting enums/structs into the ELF
const struct dns_record_t *unused4 __attribute__((unused));

// Force emitting enums/structs into the ELF
const struct pkt_drops_t *unused5 __attribute__((unused));

// Force emitting struct translated_flow_t into the ELF.
const struct translated_flow_t *unused6 __attribute__((unused));
const struct observed_intf_t *unused13 __attribute__((unused));

// Attributes that uniquely identify a flow
typedef struct flow_id_t {
u8 direction;
// L3 network layer
// IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96
// as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2
Expand All @@ -162,8 +165,6 @@ typedef struct flow_id_t {
// ICMP protocol
u8 icmp_type;
u8 icmp_code;
// OS interface index
u32 if_index;
} flow_id;

// Force emitting enums/structs into the ELF
Expand Down Expand Up @@ -220,6 +221,7 @@ typedef enum global_counters_key_t {
NETWORK_EVENTS_ERR_GROUPID_MISMATCH,
NETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS,
NETWORK_EVENTS_GOOD,
OBSERVED_INTF_MISSED,
MAX_COUNTERS,
} global_counters_key;

Expand All @@ -230,7 +232,7 @@ const enum global_counters_key_t *unused9 __attribute__((unused));
struct filter_key_t {
u32 prefix_len;
u8 ip_data[IP_MAX_LEN];
} __attribute__((packed));
} filter_key;

// Force emitting enums/structs into the ELF
const struct filter_key_t *unused10 __attribute__((unused));
Expand Down
8 changes: 5 additions & 3 deletions bpf/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,14 @@ static inline bool is_filter_enabled() {
/*
* check if flow filter is enabled and if we need to continue processing the packet or not
*/
static inline bool check_and_do_flow_filtering(flow_id *id, u16 flags, u32 drop_reason,
u16 eth_protocol, u32 *sampling) {
static __always_inline bool check_and_do_flow_filtering(flow_id *id, u16 flags, u32 drop_reason,
u16 eth_protocol, u32 *sampling,
u8 direction) {
// check if this packet need to be filtered if filtering feature is enabled
if (is_filter_enabled()) {
filter_action action = ACCEPT;
if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol, sampling) != 0 &&
if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol, sampling, direction) !=
0 &&
action != MAX_FILTER_ACTIONS) {
// we have matching rules follow through the actions to decide if we should accept or reject the flow
// and update global counter for both cases
Expand Down
3 changes: 0 additions & 3 deletions cmd/netobserv-ebpf-agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ func main() {
Error("PProf HTTP listener stopped working")
}()
}
if config.DeduperFCExpiry == 0 {
config.DeduperFCExpiry = 2 * config.CacheActiveTimeout
}

logrus.WithField("configuration", fmt.Sprintf("%#v", config)).Debugf("configuration loaded")

Expand Down
Loading
Loading