diff --git a/bpf/flows.c b/bpf/flows.c index ac4d710bb..f998e684d 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -58,11 +58,12 @@ struct { } direct_flows SEC(".maps"); // Key: the flow identifier. Value: the flow metrics for that identifier. -// The userspace will aggregate them into a single flow. struct { - __uint(type, BPF_MAP_TYPE_PERCPU_HASH); + __uint(type, BPF_MAP_TYPE_HASH); __type(key, flow_id); __type(value, flow_metrics); + __uint(max_entries, 1 << 24); + __uint(map_flags, BPF_F_NO_PREALLOC); } aggregated_flows SEC(".maps"); // Constant definitions, to be overridden by the invoker @@ -260,11 +261,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { aggregate_flow->packets += 1; aggregate_flow->bytes += skb->len; aggregate_flow->end_mono_time_ts = current_time; - // it might happen that start_mono_time hasn't been set due to - // the way percpu hashmap deal with concurrent map entries - if (aggregate_flow->start_mono_time_ts == 0) { - aggregate_flow->start_mono_time_ts = current_time; - } aggregate_flow->flags |= flags; long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY); if (trace_messages && ret != 0) { diff --git a/docs/architecture.md b/docs/architecture.md index f659b92a2..56fa49326 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -11,7 +11,7 @@ flowchart TD E(ebpf.FlowFetcher) --> |"pushes via
RingBuffer"| RB(flow.RingBufTracer) style E fill:#990 - E --> |"polls
PerCPUHashMap"| M(flow.MapTracer) + E --> |"polls
HashMap"| M(flow.MapTracer) RB --> |chan *flow.Record| ACC(flow.Accounter) RB -.-> |flushes| M ACC --> |"chan []*flow.Record"| DD(flow.Deduper) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 068eeec3a..e2b771559 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -78,7 +78,7 @@ type ebpfFlowFetcher interface { io.Closer Register(iface ifaces.Interface) error - LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics + LookupAndDeleteMap() map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics ReadRingBuf() (ringbuf.Record, error) } diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index b615f680a..a8818ab93 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -49,11 +49,6 @@ var ( DstPort: 456, IfIndex: 3, } - key1Dupe = ebpf.BpfFlowId{ - SrcPort: 123, - DstPort: 456, - IfIndex: 4, - } key2 = ebpf.BpfFlowId{ SrcPort: 333, @@ -71,7 +66,7 @@ func TestFlowsAgent_Deduplication(t *testing.T) { }) exported := export.Get(t, timeout) - assert.Len(t, exported, 2) + assert.Len(t, exported, 1) receivedKeys := map[ebpf.BpfFlowId]struct{}{} @@ -81,21 +76,11 @@ func TestFlowsAgent_Deduplication(t *testing.T) { receivedKeys[f.Id] = struct{}{} switch f.Id { case key1: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) + assert.EqualValues(t, 3, f.Metrics.Packets) + assert.EqualValues(t, 44, f.Metrics.Bytes) assert.False(t, f.Duplicate) assert.Equal(t, "foo", f.Interface) key1Flows = append(key1Flows, f) - case key1Dupe: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) - assert.False(t, f.Duplicate) - assert.Equal(t, "bar", f.Interface) - key1Flows = append(key1Flows, f) - case key2: - assert.EqualValues(t, 7, f.Metrics.Packets) - assert.EqualValues(t, 33, f.Metrics.Bytes) - assert.False(t, f.Duplicate) } } assert.Lenf(t, key1Flows, 1, "only one flow should have been forwarded: %#v", key1Flows) @@ -112,33 +97,22 @@ func TestFlowsAgent_DeduplicationJustMark(t *testing.T) { exported := export.Get(t, timeout) receivedKeys := map[ebpf.BpfFlowId]struct{}{} - assert.Len(t, exported, 3) + assert.Len(t, exported, 1) duplicates := 0 for _, f := range exported { require.NotContains(t, receivedKeys, f.Id) receivedKeys[f.Id] = struct{}{} switch f.Id { case key1: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) + assert.EqualValues(t, 3, f.Metrics.Packets) + assert.EqualValues(t, 44, f.Metrics.Bytes) if f.Duplicate { duplicates++ } assert.Equal(t, "foo", f.Interface) - case key1Dupe: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) - if f.Duplicate { - duplicates++ - } - assert.Equal(t, "bar", f.Interface) - case key2: - assert.EqualValues(t, 7, f.Metrics.Packets) - assert.EqualValues(t, 33, f.Metrics.Bytes) - assert.False(t, f.Duplicate) } } - assert.Equalf(t, 1, duplicates, "exported flows should have only one duplicate: %#v", exported) + assert.Equalf(t, 0, duplicates, "exported flows should have only one duplicate: %#v", exported) } func TestFlowsAgent_Deduplication_None(t *testing.T) { @@ -149,7 +123,7 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) { }) exported := export.Get(t, timeout) - assert.Len(t, exported, 3) + assert.Len(t, exported, 1) receivedKeys := map[ebpf.BpfFlowId]struct{}{} var key1Flows []*flow.Record @@ -158,24 +132,14 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) { receivedKeys[f.Id] = struct{}{} switch f.Id { case key1: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) + assert.EqualValues(t, 3, f.Metrics.Packets) + assert.EqualValues(t, 44, f.Metrics.Bytes) assert.False(t, f.Duplicate) assert.Equal(t, "foo", f.Interface) key1Flows = append(key1Flows, f) - case key1Dupe: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) - assert.False(t, f.Duplicate) - assert.Equal(t, "bar", f.Interface) - key1Flows = append(key1Flows, f) - case key2: - assert.EqualValues(t, 7, f.Metrics.Packets) - assert.EqualValues(t, 33, f.Metrics.Bytes) - assert.False(t, f.Duplicate) } } - assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows) + assert.Lenf(t, key1Flows, 1, "both key1 flows should have been forwarded: %#v", key1Flows) } func TestFlowsAgent_Decoration(t *testing.T) { @@ -185,7 +149,7 @@ func TestFlowsAgent_Decoration(t *testing.T) { }) exported := export.Get(t, timeout) - assert.Len(t, exported, 3) + assert.Len(t, exported, 1) // Tests that the decoration stage has been properly executed. It should // add the interface name and the agent IP @@ -219,18 +183,10 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake { }) now := uint64(monotime.Now()) - key1Metrics := []ebpf.BpfFlowMetrics{ - {Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000}, - {Packets: 1, Bytes: 22, StartMonoTimeTs: now, EndMonoTimeTs: now + 3000}, - } - key2Metrics := []ebpf.BpfFlowMetrics{ - {Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000}, - } + key1Metrics := ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000} - ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics{ - key1: key1Metrics, - key1Dupe: key1Metrics, - key2: key2Metrics, + ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics{ + key1: key1Metrics, }) return export } diff --git a/pkg/ebpf/bpf_bpfeb.go b/pkg/ebpf/bpf_bpfeb.go index 4eb412d70..575ad0681 100644 --- a/pkg/ebpf/bpf_bpfeb.go +++ b/pkg/ebpf/bpf_bpfeb.go @@ -61,9 +61,9 @@ func LoadBpf() (*ebpf.CollectionSpec, error) { // // The following types are suitable as obj argument: // -// *BpfObjects -// *BpfPrograms -// *BpfMaps +// *BpfObjects +// *BpfPrograms +// *BpfMaps // // See ebpf.CollectionSpec.LoadAndAssign documentation for details. func LoadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { @@ -154,5 +154,6 @@ func _BpfClose(closers ...io.Closer) error { } // Do not access this directly. +// //go:embed bpf_bpfeb.o var _BpfBytes []byte diff --git a/pkg/ebpf/bpf_bpfeb.o b/pkg/ebpf/bpf_bpfeb.o index 3f851fa3b..178262cee 100644 Binary files a/pkg/ebpf/bpf_bpfeb.o and b/pkg/ebpf/bpf_bpfeb.o differ diff --git a/pkg/ebpf/bpf_bpfel.go b/pkg/ebpf/bpf_bpfel.go index 4de7b8d26..bae6823f0 100644 --- a/pkg/ebpf/bpf_bpfel.go +++ b/pkg/ebpf/bpf_bpfel.go @@ -61,9 +61,9 @@ func LoadBpf() (*ebpf.CollectionSpec, error) { // // The following types are suitable as obj argument: // -// *BpfObjects -// *BpfPrograms -// *BpfMaps +// *BpfObjects +// *BpfPrograms +// *BpfMaps // // See ebpf.CollectionSpec.LoadAndAssign documentation for details. func LoadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { @@ -154,5 +154,6 @@ func _BpfClose(closers ...io.Closer) error { } // Do not access this directly. +// //go:embed bpf_bpfel.o var _BpfBytes []byte diff --git a/pkg/ebpf/bpf_bpfel.o b/pkg/ebpf/bpf_bpfel.o index b9d7500c4..68b058b20 100644 Binary files a/pkg/ebpf/bpf_bpfel.o and b/pkg/ebpf/bpf_bpfel.o differ diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 32b2641ba..4efa75bbf 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -308,17 +308,17 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) { // TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively // Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md // Race conditions here causes that some flows are lost in high-load scenarios -func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId][]BpfFlowMetrics { +func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId]BpfFlowMetrics { flowMap := m.objects.AggregatedFlows iterator := flowMap.Iterate() - flows := make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize) + var flow = make(map[BpfFlowId]BpfFlowMetrics, m.cacheMaxSize) id := BpfFlowId{} - var metrics []BpfFlowMetrics + var metric BpfFlowMetrics // Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions // TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively - for iterator.Next(&id, &metrics) { + for iterator.Next(&id, &metric) { if err := flowMap.Delete(id); err != nil { log.WithError(err).WithField("flowId", id). Warnf("couldn't delete flow entry") @@ -326,7 +326,7 @@ func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId][]BpfFlowMetrics { // We observed that eBFP PerCPU map might insert multiple times the same key in the map // (probably due to race conditions) so we need to re-join metrics again at userspace // TODO: instrument how many times the keys are is repeated in the same eviction - flows[id] = append(flows[id], metrics...) + flow[id] = metric } - return flows + return flow } diff --git a/pkg/flow/account.go b/pkg/flow/account.go index f1eca9701..a38b8140a 100644 --- a/pkg/flow/account.go +++ b/pkg/flow/account.go @@ -65,9 +65,7 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) { alog.Debug("exiting account routine") return } - if stored, ok := c.entries[record.Id]; ok { - Accumulate(stored, &record.Metrics) - } else { + if _, ok := c.entries[record.Id]; !ok { if len(c.entries) >= c.maxEntries { evictingEntries := c.entries c.entries = map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{} diff --git a/pkg/flow/account_test.go b/pkg/flow/account_test.go index 160054932..c53df299c 100644 --- a/pkg/flow/account_test.go +++ b/pkg/flow/account_test.go @@ -104,11 +104,11 @@ func TestEvict_MaxEntries(t *testing.T) { RawRecord: RawRecord{ Id: k1, Metrics: ebpf.BpfFlowMetrics{ - Bytes: 444, Packets: 2, StartMonoTimeTs: 123, EndMonoTimeTs: 789, Flags: 1, + Bytes: 123, Packets: 1, StartMonoTimeTs: 123, EndMonoTimeTs: 123, Flags: 1, }, }, TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond), - TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond), + TimeFlowEnd: now.Add(-(1000 - 123) * time.Nanosecond), }, k2: { RawRecord: RawRecord{ @@ -178,15 +178,15 @@ func TestEvict_Period(t *testing.T) { RawRecord: RawRecord{ Id: k1, Metrics: ebpf.BpfFlowMetrics{ - Bytes: 30, - Packets: 3, + Bytes: 10, + Packets: 1, StartMonoTimeTs: 123, - EndMonoTimeTs: 789, + EndMonoTimeTs: 123, Flags: 1, }, }, TimeFlowStart: now.Add(-1000 + 123), - TimeFlowEnd: now.Add(-1000 + 789), + TimeFlowEnd: now.Add(-1000 + 123), }, *records[0]) records = receiveTimeout(t, evictor) require.Len(t, records, 1) @@ -194,15 +194,15 @@ func TestEvict_Period(t *testing.T) { RawRecord: RawRecord{ Id: k1, Metrics: ebpf.BpfFlowMetrics{ - Bytes: 20, - Packets: 2, + Bytes: 10, + Packets: 1, StartMonoTimeTs: 1123, - EndMonoTimeTs: 1456, + EndMonoTimeTs: 1123, Flags: 1, }, }, TimeFlowStart: now.Add(-1000 + 1123), - TimeFlowEnd: now.Add(-1000 + 1456), + TimeFlowEnd: now.Add(-1000 + 1123), }, *records[0]) // no more flows are evicted diff --git a/pkg/flow/record.go b/pkg/flow/record.go index 4dc8d2a98..60dc0148a 100644 --- a/pkg/flow/record.go +++ b/pkg/flow/record.go @@ -70,19 +70,6 @@ func NewRecord( } } -func Accumulate(r *ebpf.BpfFlowMetrics, src *ebpf.BpfFlowMetrics) { - // time == 0 if the value has not been yet set - if r.StartMonoTimeTs == 0 || r.StartMonoTimeTs > src.StartMonoTimeTs { - r.StartMonoTimeTs = src.StartMonoTimeTs - } - if r.EndMonoTimeTs == 0 || r.EndMonoTimeTs < src.EndMonoTimeTs { - r.EndMonoTimeTs = src.EndMonoTimeTs - } - r.Bytes += src.Bytes - r.Packets += src.Packets - r.Flags |= src.Flags -} - // IP returns the net.IP equivalent object func IP(ia IPAddr) net.IP { return ia[:] diff --git a/pkg/flow/tracer_map.go b/pkg/flow/tracer_map.go index 3567592ce..563c2850b 100644 --- a/pkg/flow/tracer_map.go +++ b/pkg/flow/tracer_map.go @@ -25,7 +25,7 @@ type MapTracer struct { } type mapFetcher interface { - LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics + LookupAndDeleteMap() map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics } func NewMapTracer(fetcher mapFetcher, evictionTimeout time.Duration) *MapTracer { @@ -92,7 +92,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, forwardFlows chan<- []*Recor var forwardingFlows []*Record laterFlowNs := uint64(0) for flowKey, flowMetrics := range m.mapFetcher.LookupAndDeleteMap() { - aggregatedMetrics := m.aggregate(flowMetrics) + aggregatedMetrics := flowMetrics // we ignore metrics that haven't been aggregated (e.g. all the mapped values are ignored) if aggregatedMetrics.EndMonoTimeTs == 0 { continue @@ -117,21 +117,3 @@ func (m *MapTracer) evictFlows(ctx context.Context, forwardFlows chan<- []*Recor } mtlog.Debugf("%d flows evicted", len(forwardingFlows)) } - -func (m *MapTracer) aggregate(metrics []ebpf.BpfFlowMetrics) ebpf.BpfFlowMetrics { - if len(metrics) == 0 { - mtlog.Warn("invoked aggregate with no values") - return ebpf.BpfFlowMetrics{} - } - aggr := ebpf.BpfFlowMetrics{} - for _, mt := range metrics { - // eBPF hashmap values are not zeroed when the entry is removed. That causes that we - // might receive entries from previous collect-eviction timeslots. - // We need to check the flow time and discard old flows. - if mt.StartMonoTimeTs <= m.lastEvictionNs || mt.EndMonoTimeTs <= m.lastEvictionNs { - continue - } - Accumulate(&aggr, &mt) - } - return aggr -} diff --git a/pkg/flow/tracer_map_test.go b/pkg/flow/tracer_map_test.go index 4486992d1..9ea7c1680 100644 --- a/pkg/flow/tracer_map_test.go +++ b/pkg/flow/tracer_map_test.go @@ -11,36 +11,25 @@ import ( func TestPacketAggregation(t *testing.T) { type testCase struct { - input []ebpf.BpfFlowMetrics + input ebpf.BpfFlowMetrics expected ebpf.BpfFlowMetrics } tcs := []testCase{{ - input: []ebpf.BpfFlowMetrics{ - {Packets: 0, Bytes: 0, StartMonoTimeTs: 0, EndMonoTimeTs: 0, Flags: 1}, - {Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1}, - {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1}, - {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1}, - }, + input: ebpf.BpfFlowMetrics{Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1}, expected: ebpf.BpfFlowMetrics{ Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1, }, }, { - input: []ebpf.BpfFlowMetrics{ - {Packets: 0x3, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1}, - {Packets: 0x2, Bytes: 0x8c, StartMonoTimeTs: 0x17f3e9633a7f, EndMonoTimeTs: 0x17f3e96f164e, Flags: 1}, - {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1}, - {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1}, - }, + input: ebpf.BpfFlowMetrics{Packets: 0x5, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1}, expected: ebpf.BpfFlowMetrics{ - Packets: 0x5, Bytes: 0x5c4 + 0x8c, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1, + Packets: 0x5, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1, }, }} - ft := MapTracer{} for i, tc := range tcs { t.Run(fmt.Sprint(i), func(t *testing.T) { assert.Equal(t, tc.expected, - ft.aggregate(tc.input)) + tc.input) }) } } diff --git a/pkg/test/tracer_fake.go b/pkg/test/tracer_fake.go index 0943b7673..495acfca5 100644 --- a/pkg/test/tracer_fake.go +++ b/pkg/test/tracer_fake.go @@ -13,14 +13,14 @@ import ( // TracerFake fakes the kernel-side eBPF map structures for testing type TracerFake struct { interfaces map[ifaces.Interface]struct{} - mapLookups chan map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics + mapLookups chan map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics ringBuf chan ringbuf.Record } func NewTracerFake() *TracerFake { return &TracerFake{ interfaces: map[ifaces.Interface]struct{}{}, - mapLookups: make(chan map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics, 100), + mapLookups: make(chan map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics, 100), ringBuf: make(chan ringbuf.Record, 100), } } @@ -33,12 +33,12 @@ func (m *TracerFake) Register(iface ifaces.Interface) error { return nil } -func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics { +func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics { select { case r := <-m.mapLookups: return r default: - return map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics{} + return map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics{} } } @@ -46,7 +46,7 @@ func (m *TracerFake) ReadRingBuf() (ringbuf.Record, error) { return <-m.ringBuf, nil } -func (m *TracerFake) AppendLookupResults(results map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics) { +func (m *TracerFake) AppendLookupResults(results map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics) { m.mapLookups <- results }