Skip to content

Commit

Permalink
change aggregation flow map to hashmap instead perCPU hashmap (#118)
Browse files Browse the repository at this point in the history
Signed-off-by: msherif1234 <[email protected]>
  • Loading branch information
msherif1234 authored May 31, 2023
1 parent ec9ca7a commit b6e2b87
Show file tree
Hide file tree
Showing 15 changed files with 57 additions and 147 deletions.
10 changes: 3 additions & 7 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ struct {
} direct_flows SEC(".maps");

// Key: the flow identifier. Value: the flow metrics for that identifier.
// The userspace will aggregate them into a single flow.
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, flow_id);
__type(value, flow_metrics);
__uint(max_entries, 1 << 24);
__uint(map_flags, BPF_F_NO_PREALLOC);
} aggregated_flows SEC(".maps");

// Constant definitions, to be overridden by the invoker
Expand Down Expand Up @@ -260,11 +261,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += skb->len;
aggregate_flow->end_mono_time_ts = current_time;
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = current_time;
}
aggregate_flow->flags |= flags;
long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (trace_messages && ret != 0) {
Expand Down
2 changes: 1 addition & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ flowchart TD
E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer)
style E fill:#990
E --> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer)
E --> |"polls<br/>HashMap"| M(flow.MapTracer)
RB --> |chan *flow.Record| ACC(flow.Accounter)
RB -.-> |flushes| M
ACC --> |"chan []*flow.Record"| DD(flow.Deduper)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type ebpfFlowFetcher interface {
io.Closer
Register(iface ifaces.Interface) error

LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
LookupAndDeleteMap() map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics
ReadRingBuf() (ringbuf.Record, error)
}

Expand Down
74 changes: 15 additions & 59 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ var (
DstPort: 456,
IfIndex: 3,
}
key1Dupe = ebpf.BpfFlowId{
SrcPort: 123,
DstPort: 456,
IfIndex: 4,
}

key2 = ebpf.BpfFlowId{
SrcPort: 333,
Expand All @@ -71,7 +66,7 @@ func TestFlowsAgent_Deduplication(t *testing.T) {
})

exported := export.Get(t, timeout)
assert.Len(t, exported, 2)
assert.Len(t, exported, 1)

receivedKeys := map[ebpf.BpfFlowId]struct{}{}

Expand All @@ -81,21 +76,11 @@ func TestFlowsAgent_Deduplication(t *testing.T) {
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.EqualValues(t, 3, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "foo", f.Interface)
key1Flows = append(key1Flows, f)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "bar", f.Interface)
key1Flows = append(key1Flows, f)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
}
}
assert.Lenf(t, key1Flows, 1, "only one flow should have been forwarded: %#v", key1Flows)
Expand All @@ -112,33 +97,22 @@ func TestFlowsAgent_DeduplicationJustMark(t *testing.T) {
exported := export.Get(t, timeout)
receivedKeys := map[ebpf.BpfFlowId]struct{}{}

assert.Len(t, exported, 3)
assert.Len(t, exported, 1)
duplicates := 0
for _, f := range exported {
require.NotContains(t, receivedKeys, f.Id)
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.EqualValues(t, 3, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes)
if f.Duplicate {
duplicates++
}
assert.Equal(t, "foo", f.Interface)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
if f.Duplicate {
duplicates++
}
assert.Equal(t, "bar", f.Interface)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
}
}
assert.Equalf(t, 1, duplicates, "exported flows should have only one duplicate: %#v", exported)
assert.Equalf(t, 0, duplicates, "exported flows should have only one duplicate: %#v", exported)
}

func TestFlowsAgent_Deduplication_None(t *testing.T) {
Expand All @@ -149,7 +123,7 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
})

exported := export.Get(t, timeout)
assert.Len(t, exported, 3)
assert.Len(t, exported, 1)
receivedKeys := map[ebpf.BpfFlowId]struct{}{}

var key1Flows []*flow.Record
Expand All @@ -158,24 +132,14 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.EqualValues(t, 3, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "foo", f.Interface)
key1Flows = append(key1Flows, f)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "bar", f.Interface)
key1Flows = append(key1Flows, f)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
}
}
assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows)
assert.Lenf(t, key1Flows, 1, "both key1 flows should have been forwarded: %#v", key1Flows)
}

func TestFlowsAgent_Decoration(t *testing.T) {
Expand All @@ -185,7 +149,7 @@ func TestFlowsAgent_Decoration(t *testing.T) {
})

exported := export.Get(t, timeout)
assert.Len(t, exported, 3)
assert.Len(t, exported, 1)

// Tests that the decoration stage has been properly executed. It should
// add the interface name and the agent IP
Expand Down Expand Up @@ -219,18 +183,10 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
})

now := uint64(monotime.Now())
key1Metrics := []ebpf.BpfFlowMetrics{
{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000},
{Packets: 1, Bytes: 22, StartMonoTimeTs: now, EndMonoTimeTs: now + 3000},
}
key2Metrics := []ebpf.BpfFlowMetrics{
{Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000},
}
key1Metrics := ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000}

ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics{
key1: key1Metrics,
key1Dupe: key1Metrics,
key2: key2Metrics,
ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics{
key1: key1Metrics,
})
return export
}
7 changes: 4 additions & 3 deletions pkg/ebpf/bpf_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfeb.o
Binary file not shown.
7 changes: 4 additions & 3 deletions pkg/ebpf/bpf_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfel.o
Binary file not shown.
12 changes: 6 additions & 6 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,25 +308,25 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
// Race conditions here causes that some flows are lost in high-load scenarios
func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId][]BpfFlowMetrics {
func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId]BpfFlowMetrics {
flowMap := m.objects.AggregatedFlows

iterator := flowMap.Iterate()
flows := make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize)
var flow = make(map[BpfFlowId]BpfFlowMetrics, m.cacheMaxSize)

id := BpfFlowId{}
var metrics []BpfFlowMetrics
var metric BpfFlowMetrics
// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
// TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
for iterator.Next(&id, &metrics) {
for iterator.Next(&id, &metric) {
if err := flowMap.Delete(id); err != nil {
log.WithError(err).WithField("flowId", id).
Warnf("couldn't delete flow entry")
}
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
// (probably due to race conditions) so we need to re-join metrics again at userspace
// TODO: instrument how many times the keys are is repeated in the same eviction
flows[id] = append(flows[id], metrics...)
flow[id] = metric
}
return flows
return flow
}
4 changes: 1 addition & 3 deletions pkg/flow/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) {
alog.Debug("exiting account routine")
return
}
if stored, ok := c.entries[record.Id]; ok {
Accumulate(stored, &record.Metrics)
} else {
if _, ok := c.entries[record.Id]; !ok {
if len(c.entries) >= c.maxEntries {
evictingEntries := c.entries
c.entries = map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{}
Expand Down
20 changes: 10 additions & 10 deletions pkg/flow/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func TestEvict_MaxEntries(t *testing.T) {
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
Bytes: 444, Packets: 2, StartMonoTimeTs: 123, EndMonoTimeTs: 789, Flags: 1,
Bytes: 123, Packets: 1, StartMonoTimeTs: 123, EndMonoTimeTs: 123, Flags: 1,
},
},
TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 123) * time.Nanosecond),
},
k2: {
RawRecord: RawRecord{
Expand Down Expand Up @@ -178,31 +178,31 @@ func TestEvict_Period(t *testing.T) {
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
Bytes: 30,
Packets: 3,
Bytes: 10,
Packets: 1,
StartMonoTimeTs: 123,
EndMonoTimeTs: 789,
EndMonoTimeTs: 123,
Flags: 1,
},
},
TimeFlowStart: now.Add(-1000 + 123),
TimeFlowEnd: now.Add(-1000 + 789),
TimeFlowEnd: now.Add(-1000 + 123),
}, *records[0])
records = receiveTimeout(t, evictor)
require.Len(t, records, 1)
assert.Equal(t, Record{
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
Bytes: 20,
Packets: 2,
Bytes: 10,
Packets: 1,
StartMonoTimeTs: 1123,
EndMonoTimeTs: 1456,
EndMonoTimeTs: 1123,
Flags: 1,
},
},
TimeFlowStart: now.Add(-1000 + 1123),
TimeFlowEnd: now.Add(-1000 + 1456),
TimeFlowEnd: now.Add(-1000 + 1123),
}, *records[0])

// no more flows are evicted
Expand Down
13 changes: 0 additions & 13 deletions pkg/flow/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,6 @@ func NewRecord(
}
}

func Accumulate(r *ebpf.BpfFlowMetrics, src *ebpf.BpfFlowMetrics) {
// time == 0 if the value has not been yet set
if r.StartMonoTimeTs == 0 || r.StartMonoTimeTs > src.StartMonoTimeTs {
r.StartMonoTimeTs = src.StartMonoTimeTs
}
if r.EndMonoTimeTs == 0 || r.EndMonoTimeTs < src.EndMonoTimeTs {
r.EndMonoTimeTs = src.EndMonoTimeTs
}
r.Bytes += src.Bytes
r.Packets += src.Packets
r.Flags |= src.Flags
}

// IP returns the net.IP equivalent object
func IP(ia IPAddr) net.IP {
return ia[:]
Expand Down
22 changes: 2 additions & 20 deletions pkg/flow/tracer_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type MapTracer struct {
}

type mapFetcher interface {
LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
LookupAndDeleteMap() map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics
}

func NewMapTracer(fetcher mapFetcher, evictionTimeout time.Duration) *MapTracer {
Expand Down Expand Up @@ -92,7 +92,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, forwardFlows chan<- []*Recor
var forwardingFlows []*Record
laterFlowNs := uint64(0)
for flowKey, flowMetrics := range m.mapFetcher.LookupAndDeleteMap() {
aggregatedMetrics := m.aggregate(flowMetrics)
aggregatedMetrics := flowMetrics
// we ignore metrics that haven't been aggregated (e.g. all the mapped values are ignored)
if aggregatedMetrics.EndMonoTimeTs == 0 {
continue
Expand All @@ -117,21 +117,3 @@ func (m *MapTracer) evictFlows(ctx context.Context, forwardFlows chan<- []*Recor
}
mtlog.Debugf("%d flows evicted", len(forwardingFlows))
}

func (m *MapTracer) aggregate(metrics []ebpf.BpfFlowMetrics) ebpf.BpfFlowMetrics {
if len(metrics) == 0 {
mtlog.Warn("invoked aggregate with no values")
return ebpf.BpfFlowMetrics{}
}
aggr := ebpf.BpfFlowMetrics{}
for _, mt := range metrics {
// eBPF hashmap values are not zeroed when the entry is removed. That causes that we
// might receive entries from previous collect-eviction timeslots.
// We need to check the flow time and discard old flows.
if mt.StartMonoTimeTs <= m.lastEvictionNs || mt.EndMonoTimeTs <= m.lastEvictionNs {
continue
}
Accumulate(&aggr, &mt)
}
return aggr
}
Loading

0 comments on commit b6e2b87

Please sign in to comment.