Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-588 Add IPv4/6 DSCP field to the exported flow #158

Merged
merged 1 commit into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
aggregate_flow->start_mono_time_ts = pkt.current_ts;
}
aggregate_flow->flags |= pkt.flags;

aggregate_flow->dscp = pkt.dscp;
Copy link
Member

@jotak jotak Sep 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not asking any change here, but just for curiosity, do you know if a single flow might have packets with different dscp values ?
(if that's the case, then only the last one would be taken here; which can be OK I guess

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was reading about this and it sounds the DSCP value is per flow this how they classify flow as higher pri over the others this is where I get that
https://datatracker.ietf.org/doc/html/rfc8837#name-dscp-mappings

// Does not matter the gate. Will be zero if not enabled.
if (pkt.rtt > aggregate_flow->flow_rtt) {
aggregate_flow->flow_rtt = pkt.rtt;
Expand All @@ -109,7 +109,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
.flow_rtt = pkt.rtt
.flow_rtt = pkt.rtt,
.dscp = pkt.dscp,
};

// even if we know that the entry is new, another CPU might be concurrently inserting a flow
Expand Down
4 changes: 4 additions & 0 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ typedef __u64 u64;
#define ETH_P_IPV6 0x86DD
#define ETH_P_ARP 0x0806
#define IPPROTO_ICMPV6 58
#define DSCP_SHIFT 2
#define DSCP_MASK 0x3F

// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
typedef enum {
Expand All @@ -75,6 +77,7 @@ typedef struct flow_metrics_t {
// 0 otherwise
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
u8 errno;
u8 dscp;
struct pkt_drops_t {
u32 packets;
u64 bytes;
Expand Down Expand Up @@ -155,6 +158,7 @@ typedef struct pkt_info_t {
u16 flags; // TCP specific
void *l4_hdr; // Stores the actual l4 header
u64 rtt; // rtt calculated from the flow if possible. else zero
u8 dscp; // IPv4/6 DSCP value
} pkt_info;

// Structure for payload metadata
Expand Down
12 changes: 10 additions & 2 deletions bpf/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ static inline void fill_l4info(void *l4_hdr_start, void *data_end, u8 protocol,
}
}

static inline u8 ipv4_get_dscp(const struct iphdr *iph) {
return (iph->tos >> DSCP_SHIFT) & DSCP_MASK;
}

static inline u8 ipv6_get_dscp(const struct ipv6hdr *ipv6h) {
return ((bpf_ntohs(*(const __be16 *)ipv6h) >> 4) >> DSCP_SHIFT) & DSCP_MASK;
}

// sets flow fields from IPv4 header information
static inline int fill_iphdr(struct iphdr *ip, void *data_end, pkt_info *pkt) {
void *l4_hdr_start;
Expand All @@ -100,7 +108,7 @@ static inline int fill_iphdr(struct iphdr *ip, void *data_end, pkt_info *pkt) {
__builtin_memcpy(id->dst_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->src_ip + sizeof(ip4in6), &ip->saddr, sizeof(ip->saddr));
__builtin_memcpy(id->dst_ip + sizeof(ip4in6), &ip->daddr, sizeof(ip->daddr));

pkt->dscp = ipv4_get_dscp(ip);
/* fill l4 header which will be added to id in flow_monitor function.*/
fill_l4info(l4_hdr_start, data_end, ip->protocol, pkt);
return SUBMIT;
Expand All @@ -118,7 +126,7 @@ static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, pkt_info *pkt)
/* Save the IP Address to id directly. copy once. */
__builtin_memcpy(id->src_ip, ip->saddr.in6_u.u6_addr8, IP_MAX_LEN);
__builtin_memcpy(id->dst_ip, ip->daddr.in6_u.u6_addr8, IP_MAX_LEN);

pkt->dscp = ipv6_get_dscp(ip);
/* fill l4 header which will be added to id in flow_monitor function.*/
fill_l4info(l4_hdr_start, data_end, ip->nexthdr, pkt);
return SUBMIT;
Expand Down
6 changes: 4 additions & 2 deletions examples/flowlogs-dump/server/flowlogs-dump-collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ func main() {
for records := range receivedRecords {
for _, record := range records.Entries {
if record.EthProtocol == ipv6 {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n",
log.Printf("%s: %v %s IP %s:%d > %s:%d: dscp: 0x%x protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n",
ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface,
net.IP(record.Network.GetSrcAddr().GetIpv6()).To16(),
record.Transport.SrcPort,
net.IP(record.Network.GetDstAddr().GetIpv6()).To16(),
record.Transport.DstPort,
record.Network.GetDscp(),
protocolByNumber[record.Transport.Protocol],
record.IcmpType,
record.IcmpCode,
Expand All @@ -97,14 +98,15 @@ func main() {
record.GetPktDropLatestDropCause(),
)
} else {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n",
log.Printf("%s: %v %s IP %s:%d > %s:%d: dscp: 0x%x protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n",
ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface,
ipIntToNetIP(record.Network.GetSrcAddr().GetIpv4()).String(),
record.Transport.SrcPort,
ipIntToNetIP(record.Network.GetDstAddr().GetIpv4()).String(),
record.Transport.DstPort,
record.Network.GetDscp(),
protocolByNumber[record.Transport.Protocol],
record.IcmpType,
record.IcmpCode,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/cilium/ebpf v0.11.0
github.com/fsnotify/fsnotify v1.5.1
github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424
github.com/golang/protobuf v1.5.3
github.com/google/gopacket v1.1.19
github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f
github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118
Expand Down Expand Up @@ -41,6 +40,7 @@ require (
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions pkg/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
out["SrcAddr"] = ipToStr(flow.Network.GetSrcAddr())
out["DstAddr"] = ipToStr(flow.Network.GetDstAddr())
out["Proto"] = flow.Transport.GetProtocol()
out["Dscp"] = flow.Network.GetDscp()
proto := flow.Transport.GetProtocol()
if proto == syscall.IPPROTO_ICMP || proto == syscall.IPPROTO_ICMPV6 {
out["IcmpType"] = flow.GetIcmpType()
Expand Down
12 changes: 12 additions & 0 deletions pkg/decode/decode_protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestDecodeProtobuf(t *testing.T) {
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
Dscp: 64,
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
Expand All @@ -61,6 +62,7 @@ func TestDecodeProtobuf(t *testing.T) {
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": uint32(64),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"Duplicate": false,
Expand Down Expand Up @@ -93,6 +95,7 @@ func TestDecodeProtobuf(t *testing.T) {
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
Dscp: 64,
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
Expand All @@ -112,6 +115,7 @@ func TestDecodeProtobuf(t *testing.T) {
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": uint32(64),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"Duplicate": false,
Expand Down Expand Up @@ -143,6 +147,7 @@ func TestDecodeProtobuf(t *testing.T) {
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
Dscp: 64,
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
Expand All @@ -162,6 +167,7 @@ func TestDecodeProtobuf(t *testing.T) {
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": uint32(64),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"Duplicate": false,
Expand Down Expand Up @@ -193,6 +199,7 @@ func TestDecodeProtobuf(t *testing.T) {
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv6{Ipv6: []byte{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26}},
},
Dscp: 64,
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
Expand All @@ -212,6 +219,7 @@ func TestDecodeProtobuf(t *testing.T) {
"Bytes": uint64(456),
"SrcAddr": "102:304:506:708:90a:b0c:d0e:f10",
"DstAddr": "b0c:d0e:f10:1112:1314:1516:1718:191a",
"Dscp": uint32(64),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"Duplicate": false,
Expand Down Expand Up @@ -288,6 +296,7 @@ func TestDecodeProtobuf(t *testing.T) {
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
Dscp: 64,
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
Expand Down Expand Up @@ -317,6 +326,7 @@ func TestDecodeProtobuf(t *testing.T) {
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": uint32(64),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"Duplicate": false,
Expand Down Expand Up @@ -374,6 +384,7 @@ func TestPBFlowToMap(t *testing.T) {
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
Dscp: 64,
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
Expand Down Expand Up @@ -407,6 +418,7 @@ func TestPBFlowToMap(t *testing.T) {
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": uint32(64),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"SrcPort": uint32(23000),
Expand Down
1 change: 1 addition & 0 deletions pkg/ebpf/bpf_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfeb.o
Binary file not shown.
1 change: 1 addition & 0 deletions pkg/ebpf/bpf_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfel.o
Binary file not shown.
2 changes: 2 additions & 0 deletions pkg/exporter/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record {
Network: &pbflow.Network{
SrcAddr: &pbflow.IP{IpFamily: &pbflow.IP_Ipv4{Ipv4: flow.IntEncodeV4(fr.Id.SrcIp)}},
DstAddr: &pbflow.IP{IpFamily: &pbflow.IP_Ipv4{Ipv4: flow.IntEncodeV4(fr.Id.DstIp)}},
Dscp: uint32(fr.Metrics.Dscp),
},
Transport: &pbflow.Transport{
Protocol: uint32(fr.Id.TransportProtocol),
Expand Down Expand Up @@ -97,6 +98,7 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record {
Network: &pbflow.Network{
SrcAddr: &pbflow.IP{IpFamily: &pbflow.IP_Ipv6{Ipv6: fr.Id.SrcIp[:]}},
DstAddr: &pbflow.IP{IpFamily: &pbflow.IP_Ipv6{Ipv6: fr.Id.DstIp[:]}},
Dscp: uint32(fr.Metrics.Dscp),
},
Transport: &pbflow.Transport{
Protocol: uint32(fr.Id.TransportProtocol),
Expand Down
4 changes: 4 additions & 0 deletions pkg/flow/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func Accumulate(r *ebpf.BpfFlowMetrics, src *ebpf.BpfFlowMetrics) {
if r.FlowRtt < src.FlowRtt {
r.FlowRtt = src.FlowRtt
}
// Accumulate DSCP
if src.Dscp != 0 {
r.Dscp = src.Dscp
}
}

// IP returns the net.IP equivalent object
Expand Down
2 changes: 2 additions & 0 deletions pkg/flow/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestRecordBinaryEncoding(t *testing.T) {
0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 flow_end_time
0x13, 0x14, //flags
0x33, // u8 errno
0x60, // u8 dscp
// pkt_drops structure
0x10, 0x11, 0x12, 0x13, // u32 packets
0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, 0x1b, // u64 bytes
Expand Down Expand Up @@ -69,6 +70,7 @@ func TestRecordBinaryEncoding(t *testing.T) {
EndMonoTimeTs: 0x1a19181716151413,
Flags: 0x1413,
Errno: 0x33,
Dscp: 0x60,
PktDrops: ebpf.BpfPktDropsT{
Packets: 0x13121110,
Bytes: 0x1b1a191817161514,
Expand Down
Loading