Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1995: Use global map, spinlock, split maps #469

Merged
merged 8 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ linters:
- stylecheck
- typecheck
- unused
run:
go: "1.22"
linters-settings:
stylecheck:
go: "1.22"
gocritic:
enabled-checks:
- hugeParam
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ prereqs: ## Check if prerequisites are met, and install missing dependencies
fmt: ## Run go fmt against code.
@echo "### Formatting code"
go fmt ./...
find ./bpf -type f -not -path "./bpf/headers/*" -name "*.[ch]" | xargs clang-format -i --Werror
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have this rule as part of the lint target we can make it part of the fmt but in that case we need to remove it from the lint so we won't run it twice when build ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's for two different things: the format rule allows to auto-format as part of the local build process on your machine, whereas the lint rule is still useful on the CI to make sure we didn't commit & push unformatted code.


.PHONY: lint
lint: prereqs ## Lint the code
Expand Down
92 changes: 66 additions & 26 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,25 @@
*/
#include "network_events_monitoring.h"

static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, int dns_errno,
u64 len) {
static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len) {
bpf_spin_lock(&aggregate_flow->lock);
aggregate_flow->packets += 1;
aggregate_flow->bytes += len;
aggregate_flow->end_mono_time_ts = pkt->current_ts;
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = pkt->current_ts;
}
aggregate_flow->flags |= pkt->flags;
aggregate_flow->dscp = pkt->dscp;
aggregate_flow->dns_record.id = pkt->dns_id;
aggregate_flow->dns_record.flags = pkt->dns_flags;
aggregate_flow->dns_record.latency = pkt->dns_latency;
aggregate_flow->dns_record.errno = dns_errno;
bpf_spin_unlock(&aggregate_flow->lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we no longer needs

// it might happen that start_mono_time hasn't been set due to

as that was specific to perCPU map

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, good catch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}

static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt, int dns_errno) {
if (pkt->dns_id != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

u can check dns_err here and if its 0 then add DNS info else add dns_error

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there can be an error and still have DNS data: when the dns request wasn't found, we can't generate latency, but we still provide the other data, and return ENOENT.
So it shouldn't be a if/else here, if we want to still provide what we have

extra_metrics->dns_record.id = pkt->dns_id;
extra_metrics->dns_record.flags = pkt->dns_flags;
extra_metrics->dns_record.latency = pkt->dns_latency;
}
if (dns_errno != 0) {
extra_metrics->dns_record.errno = dns_errno;
}
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
Expand All @@ -76,6 +79,9 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
return TC_ACT_OK;
}
do_sampling = 1;

u16 eth_protocol = 0;

pkt_info pkt;
__builtin_memset(&pkt, 0, sizeof(pkt));

Expand All @@ -90,7 +96,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
struct ethhdr *eth = (struct ethhdr *)data;
u64 len = skb->len;

if (fill_ethhdr(eth, data_end, &pkt) == DISCARD) {
if (fill_ethhdr(eth, data_end, &pkt, &eth_protocol) == DISCARD) {
return TC_ACT_OK;
}

Expand All @@ -99,7 +105,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
id.direction = direction;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0);
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol);
if (skip) {
return TC_ACT_OK;
}
Expand All @@ -108,30 +114,22 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
if (enable_dns_tracking) {
dns_errno = track_dns_packet(skb, &pkt);
}
// TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, dns_errno, len);
update_existing_flow(aggregate_flow, &pkt, len);
} else {
// Key does not exist in the map, and will need to create a new entry.
u64 rtt = 0;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@msherif1234 I don't understand the rationale for this code. We're setting default RTT for TCP flows, how could that be accurate? I don't see how this wouldn't produce fake data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this the min value of ythe hook isn't fire IIRC it was a define in the kernel and I believe we doc that as the default min RTT, when I recall more I will update here

if (enable_rtt && id.transport_protocol == IPPROTO_TCP) {
rtt = MIN_RTT;
}
flow_metrics new_flow = {
.packets = 1,
.bytes = len,
.eth_protocol = eth_protocol,
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
.dscp = pkt.dscp,
.dns_record.id = pkt.dns_id,
.dns_record.flags = pkt.dns_flags,
.dns_record.latency = pkt.dns_latency,
.dns_record.errno = dns_errno,
.flow_rtt = rtt,
};
__builtin_memcpy(new_flow.dst_mac, eth->h_dest, ETH_ALEN);
__builtin_memcpy(new_flow.src_mac, eth->h_source, ETH_ALEN);

long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will probably need to spin lock/unlock here as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the lock is on the value object, not the key, and it's a new object in that case, so I don't think it makes sense to lock

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if u have concurrent new flows on different cpus one should create and the other will update ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what the BPF_NOEXIST flag does: If two CPUs call this concurrently, one of them will fail with EEXIST, and then fall back to the lookup/update path (which does take the lock)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with eexist lookup and update we need to take the lock as well what about BPF_F_LOCK can I set that flag to takecare of the lock/unlock for me ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BPF_F_LOCK is for when you want to replace the whole contents of the map value; but that's not what you're doing, you are increasing a couple of counters. So you just re-use exactly the same update code as for when the map lookup succeeds in the first place :)

if (ret != 0) {
Expand All @@ -142,7 +140,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
flow_metrics *aggregate_flow =
(flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, dns_errno, len);
update_existing_flow(aggregate_flow, &pkt, len);
} else {
if (trace_messages) {
bpf_printk("failed to update an exising flow\n");
Expand Down Expand Up @@ -171,6 +169,48 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}
}
}

// Update additional metrics (per-CPU map)
if (pkt.dns_id != 0 || dns_errno != 0) {
// hack on id will be removed with dedup-in-kernel work
id.direction = 0;
id.if_index = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will the feature work with this hack ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, it's precisely here to make it work :-)
On user-space we assume all additional map entries are without dir/interface. This is temporary, my other PR is removing that, since eventually we want to entirely get rid of interface/direction in flow key.

additional_metrics *extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
update_dns(extra_metrics, &pkt, dns_errno);
} else {
additional_metrics new_metrics = {
.dns_record.id = pkt.dns_id,
.dns_record.flags = pkt.dns_flags,
.dns_record.latency = pkt.dns_latency,
.dns_record.errno = dns_errno,
};
long ret =
bpf_map_update_elem(&additional_flow_metrics, &id, &new_metrics, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages && ret != -EEXIST) {
bpf_printk("error adding DNS %d\n", ret);
}
if (ret == -EEXIST) {
// Concurrent write from another CPU; retry
additional_metrics *extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
update_dns(extra_metrics, &pkt, dns_errno);
} else {
if (trace_messages) {
bpf_printk("failed to update DNS\n");
}
increase_counter(HASHMAP_FAIL_UPDATE_DNS);
}
} else {
increase_counter(HASHMAP_FAIL_UPDATE_DNS);
}
}
}
}

return TC_ACT_OK;
}

Expand Down
13 changes: 7 additions & 6 deletions bpf/flows_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_
}

static __always_inline int flow_filter_setup_lookup_key(flow_id *id, struct filter_key_t *key,
u8 *len, u8 *offset, bool use_src_ip) {
u8 *len, u8 *offset, bool use_src_ip,
u16 eth_protocol) {

if (id->eth_protocol == ETH_P_IP) {
if (eth_protocol == ETH_P_IP) {
*len = sizeof(u32);
*offset = sizeof(ip4in6);
if (use_src_ip) {
Expand All @@ -213,7 +214,7 @@ static __always_inline int flow_filter_setup_lookup_key(flow_id *id, struct filt
__builtin_memcpy(key->ip_data, id->dst_ip + *offset, *len);
}
key->prefix_len = 32;
} else if (id->eth_protocol == ETH_P_IPV6) {
} else if (eth_protocol == ETH_P_IPV6) {
*len = IP_MAX_LEN;
*offset = 0;
if (use_src_ip) {
Expand All @@ -232,7 +233,7 @@ static __always_inline int flow_filter_setup_lookup_key(flow_id *id, struct filt
* check if the flow match filter rule and return >= 1 if the flow is to be dropped
*/
static __always_inline int is_flow_filtered(flow_id *id, filter_action *action, u16 flags,
u32 drop_reason) {
u32 drop_reason, u16 eth_protocol) {
struct filter_key_t key;
u8 len, offset;
int result = 0;
Expand All @@ -241,7 +242,7 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
*action = MAX_FILTER_ACTIONS;

// Lets do first CIDR match using srcIP.
result = flow_filter_setup_lookup_key(id, &key, &len, &offset, true);
result = flow_filter_setup_lookup_key(id, &key, &len, &offset, true, eth_protocol);
if (result < 0) {
return result;
}
Expand All @@ -253,7 +254,7 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
}

// if we can't find a match then Lets do second CIDR match using dstIP.
result = flow_filter_setup_lookup_key(id, &key, &len, &offset, false);
result = flow_filter_setup_lookup_key(id, &key, &len, &offset, false, eth_protocol);
if (result < 0) {
return result;
}
Expand Down
11 changes: 10 additions & 1 deletion bpf/maps_definition.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,22 @@ struct {

// Key: the flow identifier. Value: the flow metrics for that identifier.
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, flow_id);
__type(value, flow_metrics);
__uint(max_entries, 1 << 24);
__uint(map_flags, BPF_F_NO_PREALLOC);
} aggregated_flows SEC(".maps");

// Key: the flow identifier. Value: extra metrics for that identifier.
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__type(key, flow_id);
__type(value, additional_metrics);
__uint(max_entries, 1 << 24);
__uint(map_flags, BPF_F_NO_PREALLOC);
} additional_flow_metrics SEC(".maps");

//PerfEvent Array for Packet Payloads
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
Expand Down
35 changes: 11 additions & 24 deletions bpf/network_events_monitoring.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ static inline int lookup_and_update_existing_flow_network_events(flow_id *id, u8

bpf_probe_read(cookie, md_len, user_cookie);

flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id);
if (aggregate_flow != NULL) {
u8 idx = aggregate_flow->network_events_idx;
aggregate_flow->end_mono_time_ts = bpf_ktime_get_ns();
additional_metrics *extra_metrics = bpf_map_lookup_elem(&additional_flow_metrics, id);
if (extra_metrics != NULL) {
u8 idx = extra_metrics->network_events_idx;
// Needed to check length here again to keep JIT verifier happy
if (idx < MAX_NETWORK_EVENTS && md_len <= MAX_EVENT_MD) {
if (!md_already_exists(aggregate_flow->network_events, (u8 *)cookie)) {
__builtin_memcpy(aggregate_flow->network_events[idx], cookie, MAX_EVENT_MD);
aggregate_flow->network_events_idx = (idx + 1) % MAX_NETWORK_EVENTS;
if (!md_already_exists(extra_metrics->network_events, (u8 *)cookie)) {
__builtin_memcpy(extra_metrics->network_events[idx], cookie, MAX_EVENT_MD);
extra_metrics->network_events_idx = (idx + 1) % MAX_NETWORK_EVENTS;
}
return 0;
}
Expand All @@ -53,10 +52,9 @@ static inline int lookup_and_update_existing_flow_network_events(flow_id *id, u8

static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_metadata *md) {
u8 dscp = 0, protocol = 0, md_len = 0;
u16 family = 0, flags = 0;
u16 family = 0, flags = 0, eth_protocol = 0;
u8 *user_cookie = NULL;
long ret = 0;
u64 len = 0;
flow_id id;

__builtin_memset(&id, 0, sizeof(id));
Expand All @@ -67,12 +65,8 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
return -1;
}

id.if_index = BPF_CORE_READ(md, in_ifindex);

len = BPF_CORE_READ(skb, len);

// read L2 info
core_fill_in_l2(skb, &id, &family);
core_fill_in_l2(skb, &eth_protocol, &family);

// read L3 info
core_fill_in_l3(skb, &id, family, &protocol, &dscp);
Expand All @@ -99,7 +93,7 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol);
if (skip) {
return 0;
}
Expand All @@ -113,19 +107,12 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
}

// there is no matching flows so lets create new one and add the network event metadata
u64 current_time = bpf_ktime_get_ns();
id.direction = INGRESS;
flow_metrics new_flow = {
.packets = 1,
.bytes = len,
.start_mono_time_ts = current_time,
.end_mono_time_ts = current_time,
.flags = flags,
additional_metrics new_flow = {
.network_events_idx = 0,
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are instances where pkt hit the hook but not the tcx in that case u will see flows with o bytes and pkts did u check for that ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what sort of instances? My problem with that is it would duplicate counts, if this hook is called before the tc - or if the TC flow was just flushed - hence leading to over-estimate. I would rather keep strict responsibility boundaries : TC for counters, other hooks for additional features, and not mix up roles.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then u need to make sure u have no rows in the UI with no counters as that is misleading

bpf_probe_read(new_flow.network_events[0], md_len, user_cookie);
new_flow.network_events_idx++;
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST);
ret = bpf_map_update_elem(&additional_flow_metrics, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages && ret != -EEXIST) {
bpf_printk("error network events creating new flow %d\n", ret);
Expand Down
5 changes: 3 additions & 2 deletions bpf/pca.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) {
__builtin_memset(&pkt, 0, sizeof(pkt));
flow_id id;
__builtin_memset(&id, 0, sizeof(id));
u16 eth_protocol = 0;

pkt.id = &id;

void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;
struct ethhdr *eth = (struct ethhdr *)data;

if (fill_ethhdr(eth, data_end, &pkt) == DISCARD) {
if (fill_ethhdr(eth, data_end, &pkt, &eth_protocol) == DISCARD) {
return false;
}

Expand All @@ -57,7 +58,7 @@ static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) {
id.direction = dir;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0);
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol);
if (skip) {
return false;
}
Expand Down
Loading
Loading