Skip to content

Commit

Permalink
NETOBSERV-1996: in-kernel de-duplication
Browse files Browse the repository at this point in the history
- Remove interface from the flow key; instead, use it as flow value. The
  first interface+dir seen for a given flow is the one taken into account
for counters. Other interfaces+dirs are stored in a separate map for this
flow. This algorithm is more or less the deduper algo that we had in
userspace.
- Remove user-space deduper
- Adapt user-space model for the new interfaces+directions array
  provided directly from ebpf structs
- Remove "decorator" (which was doing the interface naming). This is
  just for simplification. This enrichment is now done in a more
straightforward way, when creating the Record objects

try optimizing alignment
  • Loading branch information
jotak committed Dec 9, 2024
1 parent f8c24aa commit 93f3d03
Show file tree
Hide file tree
Showing 40 changed files with 603 additions and 1,181 deletions.
40 changes: 31 additions & 9 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@ static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt,
}
}

static inline void add_observed_intf(additional_metrics *value, u32 if_index, u8 direction) {
if (value->nb_observed_intf < MAX_OBSERVED_INTERFACES) {
for (u8 i = 0; i < value->nb_observed_intf; i++) {
if (value->observed_intf[i].if_index == if_index &&
value->observed_intf[i].direction == direction) {
return;
}
}
value->observed_intf[value->nb_observed_intf].if_index = if_index;
value->observed_intf[value->nb_observed_intf].direction = direction;
value->nb_observed_intf++;
}
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling > 1 && (bpf_get_prandom_u32() % sampling) != 0) {
Expand Down Expand Up @@ -100,12 +114,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
return TC_ACT_OK;
}

//Set extra fields
id.if_index = skb->ifindex;
id.direction = direction;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, direction);
if (skip) {
return TC_ACT_OK;
}
Expand All @@ -116,10 +126,25 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, len);
if (aggregate_flow->if_index_first_seen == skb->ifindex) {
update_existing_flow(aggregate_flow, &pkt, len);
} else {
// Only add info that we've seen this interface
additional_metrics *extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
add_observed_intf(extra_metrics, skb->ifindex, direction);
} else {
additional_metrics new_metrics = {};
add_observed_intf(&new_metrics, skb->ifindex, direction);
bpf_map_update_elem(&additional_flow_metrics, &id, &new_metrics, BPF_NOEXIST);
}
}
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow = {
.if_index_first_seen = skb->ifindex,
.direction_first_seen = direction,
.packets = 1,
.bytes = len,
.eth_protocol = eth_protocol,
Expand Down Expand Up @@ -172,9 +197,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {

// Update additional metrics (per-CPU map)
if (pkt.dns_id != 0 || dns_errno != 0) {
// hack on id will be removed with dedup-in-kernel work
id.direction = 0;
id.if_index = 0;
additional_metrics *extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
Expand Down
12 changes: 6 additions & 6 deletions bpf/flows_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static __always_inline int is_equal_ip(u8 *ip1, u8 *ip2, u8 len) {

static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_t *key,
filter_action *action, u8 len, u8 offset,
u16 flags, u32 drop_reason) {
u16 flags, u32 drop_reason, u8 direction) {
int result = 0;

struct filter_value_t *rule = (struct filter_value_t *)bpf_map_lookup_elem(&filter_map, key);
Expand Down Expand Up @@ -157,7 +157,7 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_

if (!is_zero_ip(rule->ip, len)) {
// for Ingress side we can filter using dstIP and for Egress side we can filter using srcIP
if (id->direction == INGRESS) {
if (direction == INGRESS) {
if (is_equal_ip(rule->ip, id->dst_ip + offset, len)) {
BPF_PRINTK("dstIP matched\n");
result++;
Expand All @@ -177,7 +177,7 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_
}

if (rule->direction != MAX_DIRECTION) {
if (rule->direction == id->direction) {
if (rule->direction == direction) {
BPF_PRINTK("direction matched\n");
result++;
} else {
Expand Down Expand Up @@ -233,7 +233,7 @@ static __always_inline int flow_filter_setup_lookup_key(flow_id *id, struct filt
* check if the flow match filter rule and return >= 1 if the flow is to be dropped
*/
static __always_inline int is_flow_filtered(flow_id *id, filter_action *action, u16 flags,
u32 drop_reason, u16 eth_protocol) {
u32 drop_reason, u16 eth_protocol, u8 direction) {
struct filter_key_t key;
u8 len, offset;
int result = 0;
Expand All @@ -247,7 +247,7 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
return result;
}

result = do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason);
result = do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, direction);
// we have a match so return
if (result > 0) {
return result;
Expand All @@ -259,7 +259,7 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
return result;
}

return do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason);
return do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, direction);
}

#endif //__FLOWS_FILTER_H__
11 changes: 4 additions & 7 deletions bpf/network_events_monitoring.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,14 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, 0);
if (skip) {
return 0;
}

for (direction dir = INGRESS; dir < MAX_DIRECTION; dir++) {
id.direction = dir;
ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
if (ret == 0) {
return ret;
}
ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
if (ret == 0) {
return ret;
}

// there is no matching flows so lets create new one and add the network event metadata
Expand Down
6 changes: 1 addition & 5 deletions bpf/pca.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,8 @@ static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) {
return false;
}

//Set extra fields
id.if_index = skb->ifindex;
id.direction = dir;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, dir);
if (skip) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/pkt_drops.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ static inline int trace_pkt_drop(void *ctx, u8 state, struct sk_buff *skb,
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, reason, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, flags, reason, eth_protocol, 0);
if (skip) {
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/rtt_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) {
rtt *= 1000u;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, 0);
if (skip) {
return 0;
}
Expand Down
39 changes: 21 additions & 18 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ typedef __u64 u64;
#define MAX_FILTER_ENTRIES 1 // we have only one global filter
#define MAX_EVENT_MD 8
#define MAX_NETWORK_EVENTS 4
#define MAX_OBSERVED_INTERFACES 4

// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
typedef enum direction_t {
Expand All @@ -80,19 +81,22 @@ const enum direction_t *unused1 __attribute__((unused));
const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};

typedef struct flow_metrics_t {
struct bpf_spin_lock lock;
u16 eth_protocol;
// L2 data link layer
u8 src_mac[ETH_ALEN];
u8 dst_mac[ETH_ALEN];
u32 packets;
u64 bytes;
// Flow start and end times as monotomic timestamps in nanoseconds
// as output from bpf_ktime_get_ns()
u64 start_mono_time_ts;
u64 end_mono_time_ts;
u64 bytes;
u32 packets;
u16 eth_protocol;
// TCP Flags from https://www.ietf.org/rfc/rfc793.txt
u16 flags;
// L2 data link layer
u8 src_mac[ETH_ALEN];
u8 dst_mac[ETH_ALEN];
// OS interface index
u32 if_index_first_seen;
struct bpf_spin_lock lock;
u8 direction_first_seen;
// The positive errno of a failed map insertion that caused a flow
// to be sent via ringbuffer.
// 0 otherwise
Expand All @@ -106,35 +110,36 @@ const struct flow_metrics_t *unused2 __attribute__((unused));

typedef struct additional_metrics_t {
struct dns_record_t {
u64 latency;
u16 id;
u16 flags;
u64 latency;
u8 errno;
} dns_record;
struct pkt_drops_t {
u32 packets;
u64 bytes;
u32 packets;
u32 latest_drop_cause;
u16 latest_flags;
u8 latest_state;
u32 latest_drop_cause;
} pkt_drops;
u64 flow_rtt;
u8 network_events_idx;
u8 network_events[MAX_NETWORK_EVENTS][MAX_EVENT_MD];
struct observed_intf_t {
u8 direction;
u32 if_index;
} __attribute__((packed)) observed_intf[MAX_OBSERVED_INTERFACES];
u8 network_events_idx;
u8 nb_observed_intf;
} additional_metrics;

// Force emitting enums/structs into the ELF
const struct additional_metrics_t *unused3 __attribute__((unused));

// Force emitting enums/structs into the ELF
const struct dns_record_t *unused4 __attribute__((unused));

// Force emitting enums/structs into the ELF
const struct pkt_drops_t *unused5 __attribute__((unused));
const struct observed_intf_t *unused9 __attribute__((unused));

// Attributes that uniquely identify a flow
typedef struct flow_id_t {
u8 direction;
// L3 network layer
// IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96
// as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2
Expand All @@ -147,8 +152,6 @@ typedef struct flow_id_t {
// ICMP protocol
u8 icmp_type;
u8 icmp_code;
// OS interface index
u32 if_index;
} flow_id;

// Force emitting enums/structs into the ELF
Expand Down
4 changes: 2 additions & 2 deletions bpf/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,11 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, pkt_info *pkt,
* check if flow filter is enabled and if we need to continue processing the packet or not
*/
static inline bool check_and_do_flow_filtering(flow_id *id, u16 flags, u32 drop_reason,
u16 eth_protocol) {
u16 eth_protocol, u8 direction) {
// check if this packet need to be filtered if filtering feature is enabled
if (enable_flows_filtering || enable_pca) {
filter_action action = ACCEPT;
if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol) != 0 &&
if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol, direction) != 0 &&
action != MAX_FILTER_ACTIONS) {
// we have matching rules follow through the actions to decide if we should accept or reject the flow
// and update global counter for both cases
Expand Down
3 changes: 0 additions & 3 deletions cmd/netobserv-ebpf-agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ func main() {
Error("PProf HTTP listener stopped working")
}()
}
if config.DeduperFCExpiry == 0 {
config.DeduperFCExpiry = 2 * config.CacheActiveTimeout
}

logrus.WithField("configuration", fmt.Sprintf("%#v", config)).Debugf("configuration loaded")

Expand Down
53 changes: 16 additions & 37 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,8 @@ type Flows struct {
rbTracer *flow.RingBufTracer
accounter *flow.Accounter
limiter *flow.CapacityLimiter
deduper node.MiddleFunc[[]*model.Record, []*model.Record]
exporter node.TerminalFunc[[]*model.Record]

// elements used to decorate flows with extra information
interfaceNamer flow.InterfaceNamer
agentIP net.IP

status Status
promoServer *http.Server
sampleDecoder *ovnobserv.SampleDecoder
Expand Down Expand Up @@ -275,6 +270,8 @@ func flowsAgent(cfg *Config, m *metrics.Metrics,
}
return iface
}
model.SetGlobals(agentIP, interfaceNamer)

var promoServer *http.Server
if cfg.MetricsEnable {
promoServer = promo.InitializePrometheus(m.Settings)
Expand All @@ -287,26 +284,19 @@ func flowsAgent(cfg *Config, m *metrics.Metrics,
rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout, m)
accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m)
limiter := flow.NewCapacityLimiter(m)
var deduper node.MiddleFunc[[]*model.Record, []*model.Record]
if cfg.Deduper == DeduperFirstCome {
deduper = flow.Dedupe(cfg.DeduperFCExpiry, cfg.DeduperJustMark, cfg.DeduperMerge, interfaceNamer, m)
}

return &Flows{
ebpf: fetcher,
exporter: exporter,
interfaces: registerer,
filter: filter,
cfg: cfg,
mapTracer: mapTracer,
rbTracer: rbTracer,
accounter: accounter,
limiter: limiter,
deduper: deduper,
agentIP: agentIP,
interfaceNamer: interfaceNamer,
promoServer: promoServer,
sampleDecoder: s,
ebpf: fetcher,
exporter: exporter,
interfaces: registerer,
filter: filter,
cfg: cfg,
mapTracer: mapTracer,
rbTracer: rbTracer,
accounter: accounter,
limiter: limiter,
promoServer: promoServer,
sampleDecoder: s,
}, nil
}

Expand Down Expand Up @@ -501,9 +491,6 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*mo
limiter := node.AsMiddle(f.limiter.Limit,
node.ChannelBufferLen(f.cfg.BuffersLength))

decorator := node.AsMiddle(flow.Decorate(f.agentIP, f.interfaceNamer),
node.ChannelBufferLen(f.cfg.BuffersLength))

ebl := f.cfg.ExporterBufferLength
if ebl == 0 {
ebl = f.cfg.BuffersLength
Expand All @@ -514,17 +501,9 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*mo

rbTracer.SendsTo(accounter)

if f.deduper != nil {
deduper := node.AsMiddle(f.deduper, node.ChannelBufferLen(f.cfg.BuffersLength))
mapTracer.SendsTo(deduper)
accounter.SendsTo(deduper)
deduper.SendsTo(limiter)
} else {
mapTracer.SendsTo(limiter)
accounter.SendsTo(limiter)
}
limiter.SendsTo(decorator)
decorator.SendsTo(export)
mapTracer.SendsTo(limiter)
accounter.SendsTo(limiter)
limiter.SendsTo(export)

alog.Debug("starting graph")
mapTracer.Start()
Expand Down
Loading

0 comments on commit 93f3d03

Please sign in to comment.