From b6e2b87386e7d3242fa079768539c5e854ed6789 Mon Sep 17 00:00:00 2001 From: "Mohamed S. Mahmoud" Date: Wed, 31 May 2023 08:46:23 -0400 Subject: [PATCH] change aggregation flow map to hashmap instead perCPU hashmap (#118) Signed-off-by: msherif1234 --- bpf/flows.c | 10 ++--- docs/architecture.md | 2 +- pkg/agent/agent.go | 2 +- pkg/agent/agent_test.go | 74 ++++++++---------------------------- pkg/ebpf/bpf_bpfeb.go | 7 ++-- pkg/ebpf/bpf_bpfeb.o | Bin 27752 -> 27208 bytes pkg/ebpf/bpf_bpfel.go | 7 ++-- pkg/ebpf/bpf_bpfel.o | Bin 27816 -> 27280 bytes pkg/ebpf/tracer.go | 12 +++--- pkg/flow/account.go | 4 +- pkg/flow/account_test.go | 20 +++++----- pkg/flow/record.go | 13 ------- pkg/flow/tracer_map.go | 22 +---------- pkg/flow/tracer_map_test.go | 21 +++------- pkg/test/tracer_fake.go | 10 ++--- 15 files changed, 57 insertions(+), 147 deletions(-) diff --git a/bpf/flows.c b/bpf/flows.c index ac4d710bb..f998e684d 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -58,11 +58,12 @@ struct { } direct_flows SEC(".maps"); // Key: the flow identifier. Value: the flow metrics for that identifier. -// The userspace will aggregate them into a single flow. struct { - __uint(type, BPF_MAP_TYPE_PERCPU_HASH); + __uint(type, BPF_MAP_TYPE_HASH); __type(key, flow_id); __type(value, flow_metrics); + __uint(max_entries, 1 << 24); + __uint(map_flags, BPF_F_NO_PREALLOC); } aggregated_flows SEC(".maps"); // Constant definitions, to be overridden by the invoker @@ -260,11 +261,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { aggregate_flow->packets += 1; aggregate_flow->bytes += skb->len; aggregate_flow->end_mono_time_ts = current_time; - // it might happen that start_mono_time hasn't been set due to - // the way percpu hashmap deal with concurrent map entries - if (aggregate_flow->start_mono_time_ts == 0) { - aggregate_flow->start_mono_time_ts = current_time; - } aggregate_flow->flags |= flags; long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY); if (trace_messages && ret != 0) { diff --git a/docs/architecture.md b/docs/architecture.md index f659b92a2..56fa49326 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -11,7 +11,7 @@ flowchart TD E(ebpf.FlowFetcher) --> |"pushes via
RingBuffer"| RB(flow.RingBufTracer) style E fill:#990 - E --> |"polls
PerCPUHashMap"| M(flow.MapTracer) + E --> |"polls
HashMap"| M(flow.MapTracer) RB --> |chan *flow.Record| ACC(flow.Accounter) RB -.-> |flushes| M ACC --> |"chan []*flow.Record"| DD(flow.Deduper) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 068eeec3a..e2b771559 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -78,7 +78,7 @@ type ebpfFlowFetcher interface { io.Closer Register(iface ifaces.Interface) error - LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics + LookupAndDeleteMap() map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics ReadRingBuf() (ringbuf.Record, error) } diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index b615f680a..a8818ab93 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -49,11 +49,6 @@ var ( DstPort: 456, IfIndex: 3, } - key1Dupe = ebpf.BpfFlowId{ - SrcPort: 123, - DstPort: 456, - IfIndex: 4, - } key2 = ebpf.BpfFlowId{ SrcPort: 333, @@ -71,7 +66,7 @@ func TestFlowsAgent_Deduplication(t *testing.T) { }) exported := export.Get(t, timeout) - assert.Len(t, exported, 2) + assert.Len(t, exported, 1) receivedKeys := map[ebpf.BpfFlowId]struct{}{} @@ -81,21 +76,11 @@ func TestFlowsAgent_Deduplication(t *testing.T) { receivedKeys[f.Id] = struct{}{} switch f.Id { case key1: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) + assert.EqualValues(t, 3, f.Metrics.Packets) + assert.EqualValues(t, 44, f.Metrics.Bytes) assert.False(t, f.Duplicate) assert.Equal(t, "foo", f.Interface) key1Flows = append(key1Flows, f) - case key1Dupe: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) - assert.False(t, f.Duplicate) - assert.Equal(t, "bar", f.Interface) - key1Flows = append(key1Flows, f) - case key2: - assert.EqualValues(t, 7, f.Metrics.Packets) - assert.EqualValues(t, 33, f.Metrics.Bytes) - assert.False(t, f.Duplicate) } } assert.Lenf(t, key1Flows, 1, "only one flow should have been forwarded: %#v", key1Flows) @@ -112,33 +97,22 @@ func TestFlowsAgent_DeduplicationJustMark(t *testing.T) { exported := export.Get(t, timeout) receivedKeys := map[ebpf.BpfFlowId]struct{}{} - assert.Len(t, exported, 3) + assert.Len(t, exported, 1) duplicates := 0 for _, f := range exported { require.NotContains(t, receivedKeys, f.Id) receivedKeys[f.Id] = struct{}{} switch f.Id { case key1: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) + assert.EqualValues(t, 3, f.Metrics.Packets) + assert.EqualValues(t, 44, f.Metrics.Bytes) if f.Duplicate { duplicates++ } assert.Equal(t, "foo", f.Interface) - case key1Dupe: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) - if f.Duplicate { - duplicates++ - } - assert.Equal(t, "bar", f.Interface) - case key2: - assert.EqualValues(t, 7, f.Metrics.Packets) - assert.EqualValues(t, 33, f.Metrics.Bytes) - assert.False(t, f.Duplicate) } } - assert.Equalf(t, 1, duplicates, "exported flows should have only one duplicate: %#v", exported) + assert.Equalf(t, 0, duplicates, "exported flows should have only one duplicate: %#v", exported) } func TestFlowsAgent_Deduplication_None(t *testing.T) { @@ -149,7 +123,7 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) { }) exported := export.Get(t, timeout) - assert.Len(t, exported, 3) + assert.Len(t, exported, 1) receivedKeys := map[ebpf.BpfFlowId]struct{}{} var key1Flows []*flow.Record @@ -158,24 +132,14 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) { receivedKeys[f.Id] = struct{}{} switch f.Id { case key1: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) + assert.EqualValues(t, 3, f.Metrics.Packets) + assert.EqualValues(t, 44, f.Metrics.Bytes) assert.False(t, f.Duplicate) assert.Equal(t, "foo", f.Interface) key1Flows = append(key1Flows, f) - case key1Dupe: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) - assert.False(t, f.Duplicate) - assert.Equal(t, "bar", f.Interface) - key1Flows = append(key1Flows, f) - case key2: - assert.EqualValues(t, 7, f.Metrics.Packets) - assert.EqualValues(t, 33, f.Metrics.Bytes) - assert.False(t, f.Duplicate) } } - assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows) + assert.Lenf(t, key1Flows, 1, "both key1 flows should have been forwarded: %#v", key1Flows) } func TestFlowsAgent_Decoration(t *testing.T) { @@ -185,7 +149,7 @@ func TestFlowsAgent_Decoration(t *testing.T) { }) exported := export.Get(t, timeout) - assert.Len(t, exported, 3) + assert.Len(t, exported, 1) // Tests that the decoration stage has been properly executed. It should // add the interface name and the agent IP @@ -219,18 +183,10 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake { }) now := uint64(monotime.Now()) - key1Metrics := []ebpf.BpfFlowMetrics{ - {Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000}, - {Packets: 1, Bytes: 22, StartMonoTimeTs: now, EndMonoTimeTs: now + 3000}, - } - key2Metrics := []ebpf.BpfFlowMetrics{ - {Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000}, - } + key1Metrics := ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000} - ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics{ - key1: key1Metrics, - key1Dupe: key1Metrics, - key2: key2Metrics, + ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics{ + key1: key1Metrics, }) return export } diff --git a/pkg/ebpf/bpf_bpfeb.go b/pkg/ebpf/bpf_bpfeb.go index 4eb412d70..575ad0681 100644 --- a/pkg/ebpf/bpf_bpfeb.go +++ b/pkg/ebpf/bpf_bpfeb.go @@ -61,9 +61,9 @@ func LoadBpf() (*ebpf.CollectionSpec, error) { // // The following types are suitable as obj argument: // -// *BpfObjects -// *BpfPrograms -// *BpfMaps +// *BpfObjects +// *BpfPrograms +// *BpfMaps // // See ebpf.CollectionSpec.LoadAndAssign documentation for details. func LoadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { @@ -154,5 +154,6 @@ func _BpfClose(closers ...io.Closer) error { } // Do not access this directly. +// //go:embed bpf_bpfeb.o var _BpfBytes []byte diff --git a/pkg/ebpf/bpf_bpfeb.o b/pkg/ebpf/bpf_bpfeb.o index 3f851fa3b967abc67d6c304185b236ec3f827939..178262cee0d96cf99054e00274c1c09101a17faa 100644 GIT binary patch delta 8513 zcmai34RBo5b-wStm3CKuyZYJHN_OmLOUNq$&iV&g)W%*rPAr0n6k7zxl(LR(Bx4gq zL~4gHvpc~Ak)cKIg^5cDBS?xx!n8`{8LKlP-b|nssRz`CPL#wADxniMq>1=xsy6PZ z-*?}6((_I-z2kS!_nmX@z2}~L-+gzjOEcEg>(F-2#HhliMg;A*0&~pnjB7dZ}M0SNQc}(wc-l!cUR_Y;( zwY~j(Snm(_Z@+)xIr3X__BzTkr5>d`=d$KNMyhbZ$j)mYQ8^;9tIt0>@srR4c7=7( z*XT^>NZ#+&wf=$L=(WwXlHP{SEWAq&`Hsjrk-ri7YmxsU@>e3?7Wqq&|1Q#->6wLJ zkn4H-L)ZUIEdL_%qR5|${E5gPi}W&gX5mNrDSKx#LVJ&f^CPU4`rMZmPE#I8>E`Gs z^PeHUH?C~T{y1wDxz|+=K~EiP*j*!3zb7O2W|T>OPswJE_r#LQ=4mn;X&R?ldL2t^C?Pcb;YiJzYWI{6ME>6Hz%jprw z*ayh{jPb73{Rs>Ip_z1dwumNJ{>xVQKj@G^b2`qt>xoIpZZ9lD|BgFUTe z=R6>Hx@$Op%V4eZcQ)j0IXMr~e&H;0i)2D!EyWmk^Zg%R6czOlT0^g=K8-oL#eHaxM z$#H=P1&;#X4~)hk>c1#B3w%n?u54`j3N+Y&p&~TTdR8Xe{-%z*jZIuUv@I(@^CB<~ zy0TH=S9I3hW&M*Lc3Z5A`USV;z#Fixg5~E`m@jZU9#stm1Rh)L>J#9#h;j@3Sg^@L zFJlckg&JV4$yyqKe~5}W1AI|uR<-2cGW#DJMyc`_Pb%|@Gys1eUUldNrTp&&gO^eNAU|dd`LvoXB^t54SmDp7(Tq)B)YB9C{*FTy*iAHcuBP?#hlBssEk0|0-oA99T z==G2?zvzeA01xcZoAvnWT_)UZ(3m2_yAtjnd0G=LaRIGvg%8&>4xEF;rt=Es#C4%8q%1|8`;j;c z{hvU_FxMRLLBWSqUd0DUkO9c}F~LWG2L;~+e4pT>z$XO5H~xU&yMYUWKMjmHO`k|s zd>EL)p+A4CsRHzmQJ+UOf+c($k>Q`hWdG32>YjpKW>)tMFb|F_+I}MffPxKIf4lTU zRTCcW4V3!BsEDCAZFvbnw(4)d?9@$oTs%C983dks%t-B~4)`;uI50{*4;-c&r9Q9S zw#Ie->qd<~S9HC&_u1PiQ04%o{z*O1){@UG1^S03Y`p+?wsHfR%P7_G=2F`RzPvKY z2J`lSW0DChC|_z$GEoiraXc;_&as?34Sgh0^TV*7XVIUcW_f zx8A*GbG{ed*wC=IKZf)t8AwE;|D+k!q?PO4h!9W`e@cuvK@&y~jL;H~h#rSRVuU#5 zP3cMW!-E22MY#l+HeML60}0+$&if28BB}(QAV#N{f8x7>Yk*%890R_fyVEV!4t*@W z#=1#Ap59QqA1*9>VbC~vK+nK-=U=q1vF_1n)W>yi`u+>zns{r|xr+Rw$juZ&zkn`~Fq@mkAEpDR7~d++i{%<5XdV)0P3$ireZTF% z*jT2m_&TsT2^BKO6>N(ft84n?VxZ)Wp+`BBWWM7Q$|**{F@0kv=l3HM;LE_f0(=8_ zHo$4`4RG%M!2oCf1sr&0fS)PnLOT>Aooat6_% z?TY`)%KrLFaLG{FyUHG;z00$gCzL&j0qkz!J<9$Ld9K8s9~H$T%05enu@Z;LD?4x^ z9uGGwy9j@0pYZ#X{Svvf5C1^fQ>($p#ZJB)So#b9C$`+RT>mN0!H?LPmIicShw~nf zbSis>mp2vQ_23dv%tkbjSuc&XmwT z;jb!t7E2P!3IC;Xeub2W3LY$_GDkqH;md{-|K>2*Yj{d@Fk0XQ5Mr0_Q z3Gh5p6z&qm?aJw*LpH!^?-qViIoBY7aBqMkL*W78&nqWG`(S|6E*XiuimNa(7#91?#$Hhn?%RF;*Rh}IKN+yE0WStP?bE_9 ztI%?ss+A{%zmEOK4k}My*`TW1<4Q_Zug9ydQ=!%dd|jl2sy|nuG`?q55|H6of~ryR zpTvHqeMa~@D%6V!SIv678tWh0hoiDeQd}ht)%(O@y$YScfYk-zUsRz7&|&qk@TYOu zVF{{bz-ph5iM@yyfOg41_3u>Z>np$|puABa;%bSgW`zokwcz(r#Ot8u0Tp@y9n{2y ze_4gban#nNOL#{3iz;-Mc9}p7AEB4<0j}@%>@iGe3K>s$0pwL|KYoZ|Mu|CbcnCkh zu$lO!swGn%#|Ep^2Be>*=0LPLg3jxmd!@^sXm8E?IJdYXH zQW1yZbijcfjs-Xa91n2pndD@Evwczc&C1%%0j2_+_UQm;|FiP_uVn)ca4F!x0pI&a5k9txRR2>x&_~gRG0YgS9S(VmXeIskz*65dd2=NWpBi; zPwfkEEO|ba6UE!OywE{XSVs>*_jLvOC)IG|X}(tJ&Y zE?0p_zcF&$0}Q_r64!X#e9eC)I{6<1u#DirWrNoz7Bk+p;QT>JK7cC7Pk6ixDP^(7 zQNg~*=Y$`j6@g*ayuOF=`DWq0kvtzhX1KXH8i&6VKIu7xg@;DLJMpQ0(6g6~m2mpg z{+eemN9R}YX^&T653!WbqUyx@`wCtNz{4!qzA@+V z#r6g8JO_LO0Htvh75V>?ffrGM*WvTb(m0EXd@~t#I%n9ulmWiVwoQU)sKIC#}0m_Eha=>0CfUbigg=+==pW` zixpqehX(cR-lQEK)tNnO?Zmv!?P**$?R%ncwv#@1Z}p3{Wl3K(t&4kFZeXzD$=i-! ztbfG2L>r5|E&Q{J^#yS8ui2;Fy_>JB>H60*CMWi0e#T_{v~~CT#xKYuIEwb7i(w0+~sT0-K8ac zldA{&`mMB{>C^e6xWZJ3pPwE`Yj;2&FR_!a!h?*mL|s&?@d)QF`3iml9Cyh@`zSDu zX!89KinrWg1Kw(b4XVe$r2&w+r1I|$euuo22K{R?;4AgcPh6dU2`G&xga@T&8WmUH z=NEx-xG(bef!VXzV}K?ypQpe%b%zEV`t;@0e9n>EmUw538cjqZI$^cD0o^+1eG%-oDnneVy~hM@`e)5h=cZxC4A?Vd}8at{dUEgYtkKS zuWql2*G;^6xY~-ZyI$?yv18pGT@)McSg#lEY<1Ul3frhY`{~aP9X_R~-BS#^F%&x_4l!-IUQI1D*EXqMjlf)R7}w?Y=?X4be2Bhsg?hifmj* z?%HZMx9V<+v>v`|Yeh5Hn59ck-_=>soHInVJ+0$MJMF#^-FABqn#DE i^m@EzP*20@U{OboZC%|n>1p{N8YWM!#~06z)c*mNTE#p7 delta 8905 zcmZ`;eQ;dWb-(XJyIOti>T_3XgP$zhT45VxBPj~lAdHDc<3#coHVLidU|BL4EF)9f z*zGEQV>~Tw)yf?^gIlQ`hY^%c1a*o4rx6J)N}-7$ZsWGesK4KR z=Sj~y;f~)uzu!6Mo_p`PAMdR^aoL)gvIcLqduE<#_(SWhXU?6pUKw3y#jW^XDV49z z_bC?Ze3$-`eWwGaFXeBnIQHV)F!lW}&P|DYOK;5I6rJ&z-r)R5-y1Eq?R~l{y6>)s z=FZTtqtCvdGEb?mQ!Yd&Qz|-OMDyyesZJuXt79*YzT+OZZQEtE?0huqD!mYGh4JoU zjX|B6`$Xi&B4+?~`*WA|sjNqo>#5y~EqwaP!3P*H;R z8rHB6DwncnkTvWL%mnP3HS8U#fU>89wXqixv~Ob#djr-NrKf>4?5!$B(B7D(P--mc z%Y%K&ChF-koQ@7r`ys;>xG6op%s)7wEGpXDj9Q64(Y{Hz-(jM?N%)|ViS}oGelXji z$_VF2?x}L}_N}ZCPd8&eF%nS>9nGptJHerrF1-;pPIr;?jG4`JuCd#mAn%$0+nvr3oiy5HDG{-sh z)D7hAraMTi7z;Wu`cUc@>*GZ`W2tsqAz4|}YY2Y!wK@+Bir&z+@w_{`{+ivPPvcctU82b7BAN&db*k{+}2Mq3vB z(#qq7k_&x3|Hkbza$Tx5d)_)21l(M-$;A6D{b4iH1fSKo7-~nK6bLedk z&y9jxf$If#0pBcm0QfV4hta=%r;u^r&k3Fe-r{2i!u~RIZ;Ffw9Z-hJnX) zX2~XNLia7HwcgavEvb$D0M;5<-qzzwl6MJH_d$z|@2oYOSE-fg{ADaFXN8q_xlGIq ziu)=m;s)S1b;_&FzGH3$>>#BgqbHQPJaDl@z6-B1L_vxCtzgi|_lWVJS%lGU!GpaW zc|))Vd|5DhiTn_l{;gO?j)b@k`u8B$kahsDti{s>!w33xuh#mxp7oN|+$4>7P$K__ zirB;J#?oF*a`&fLV>e`^q;|lX~Z}MJ|6JxkQ^e09=lr))oj8@ zrfZTr*w<}%P|Qe(2jKGs;nG2;5|6kiOVDyV=oLm^u+ssuLyn! zcv|qo!0!v*4?H9I5nwJ7+dc|BEBFBLRbURTZ(+Zv0qD6EZo#=$g9_g%7#aTY>i)Eu zm1C8InU(W`xff3>+E&qkg5y{Is>Gp+mkoCTN;x++G4!S_HZ51RJo_&IpU30j;R)v; z@RYv|xtTg(ezY=;DCPf->I(*{@(&@u=ws8!21eWYQv_Nb%pRSEeae3gimXJau88Q4 z<+YD|Yhmo4mIHIUGnrl}nGJYNeQ}{}BbeeXD^~(nRev_EF2q3c-oc!e8Obbq?IX4 zBF!k3KPJY80j)H8VC18cXNh{8{>onwr_8!lMG%KK0=kUhWN?lfApCVGb9^U$1C(Jz zg-TT|Ax0oNxeZ210(pTWZ@;i9YxayzuIBaj~ z8PxCUM5fOAiEhc%E&ouae!r|Zwk}K6|DZ~v=_F^NPh{5G$(X*7Sz#w{)YF+2WyvP# z;4$N`lACp^{$@M5MYq*2$-`WeJN5qht#-0gkJnd~Bzrh#aeaLB{rab^(cfEHP@w<3 zX{A2dRCV*dy`><;2Od4pd1&7w-JOT}z4hz8 zhI;Rq9&4(8wz*zkYO2UHUfOA1p(mO$qxH={E6Bc%BZ$3XL-MTHA3-7srXYn5Z!4}h z%PI`D1@d#)Qe4u&?=E2MG1FFj?O&p8p>nSki7nu0XJVd*9wloW?@`VamD5oPel5h&fdwDNcmGW}51_pz@$+hx(?vdu>yQq44=QI5 zoGl4pIKHEpO=H)}*^hv>C;Vk)4`a96lA*jAWxqgsi*Jt-0b zcP4y?V8qU(G@t_mUh{eWDrH~3!Q2HVBZlMbyE35sZOWdaJt6+&w5NnWfjy1^xRSwq z@-nQy+aeCnD|?a-ts#yZ$K56TlCrO0NnA-`{+XvrvejlFZew6N6Ja%Vg2)D#<4e)vltE$;RTh-@sPsEl;6N`3@|b+{7&U8Lkc6a z*LM)P7P2E=)blwq<}@Hf(M*VEk)mjWD7Go5kq#{(PJ65HCzO-H0HSRnjtoV+gg>vG zRkZhpIPC+%FDa)58H`GXn87Sl7#;Q<3Xr1b!1IF{zp9+8De!5r{}kH+0rO?X1+KQ;K8mVj_D_JSb5 zYsy`Q3>Ea{_y_F+|4FfnKi*CLfM~D7!14o*{l@_m%7}~bYdnQ(w{RwK@a;u|*9R;L zcYwED(Ufm5x=p#62D|`ehDGSG0r86^LP!`b10@Z0g4pj`%3N>2Nr z*bgeJk?~{#rR3Q4$>ES48A*vbHmRo4o%^sh~qg$bq~|_^*}q02}m%IPHBoe8A^rNoDP!|EUnC|6mRu&Wd7< zvi7sV*$`)gksLlQyj@v`=sywS^q4t9-NA7^$}_Cin3Q>&8km@ICgRMpwG*1 zP<8`$ef3a?W6i6Fh3`~$BkgBHoc0UCpH_Ao<4=TmG1k9&(pOXzD0_1~9Hv6t1D_V& zqU;XZXNB)ob^__b)nGDGL5=~ZC1VxeQFa9eltvbqF~c#SbV~TU%1+f_{nO~gI8Z@{ z2A@}!DtixJHtE(7$1jO=o6oB@D!Ui&igZVaBUXBs@T1D!PkWcom6UX^&*L8}`$X2l z1%!_T^QB3FktX6QuPF50Za94~6Z7qk-vRh^Hh+P1z7NSVhzB1lE`HCkAgofW!QUeQ zN2k~EBR z+JyBFQpEdRZCc0x?{o4XAn$APsLY7h@pbkYpO^j(BntZr+f$y;OMgTI?5(J>2sq!M z3yM$&sR5q{14s?~d_KUq&&xg!iISQ|h4y9p$Z%3vk)I*}_fV0)NCu7xX8BjeAOqFI z(1HhhfJamPR>(f>^T5udrrHFQ`f~e*GXAi&jzNQ0 zwJAsRWJlFnzSVDe%rU`;f^HlnxOEPUC=msN_T9nN|!L6 z6l)`Z#4c{pGaa?-|9(NkkDs7eZ(H~R7KZ!C6ns@%+kqrPd`P!#xxIMLf~IFR!*%L3!EkRGJTdO*LlZAs};SU4dR z{yI>#dhzx}*%6>+c*MewUga(Dk|AJxO9pm+w1)jlX287#JHJ8s9}0m#zd6JH@jh^G z!LEOIdrkHwa4w^Z2c_f!D%{%h{1WiW5WfPIsw8}@H+VSPW9pg|~cw_{HlR11khtFQm`R6CIw$i?c zBP<_XxBG;Rv(Hj;4ppd!l1n-Kat?n#;F;C+D{onKQ@pf%^tA_ztwf93ykWzwookI; z)44`}uxD9jZ4PKW}&cSijLzsn_pa zW2gIc-`-|>$B;g|_YONfqGurPYtxw?uu0tuao;X|mTXqfKrGs$GrbVKy0`a^fpsjUvN+Qw>UFgPQfsSUNJB~GeXb&NGt`fyB>TH~XZws!3Q@3W5GUdQR4 zIjr@qwf6I@z0U^rzUmCT>9dGp}Mu&VCOhef`kFU(w#c~nTRj=YgOg%L`9J+9O_u2TEE^yVq;XaDGj zCT7h^3t>|#Lw$=vJqZdrGN`7Y_r*}>?QSy$`dMzS89o;6b*H|L;J~IEqFem1Rrfb+ z1g%z4VAw{e?%wx;k@dbS@`T8DME*_W+amue^0>%jBL5;%X4?CfVVxOSe&*}q`n|{_ zB3~2vJCU!7l%?x^g zS(fJVpwjhj#3=2s8-FASXkHNbX~+9e&3O_Z33_WsH52dfC;T(S?@TjFyB~HKooWot zXG=gvMi2mD8i+;%|9Rq%2CtwUX5&8zIfT-@G4RuF2GlByehkkKcnSs#lk)c`1H(3q zlmXqf!OU!X!q|fq==LNYVeGiuV>8OEoCo>I1l*HpR{vYEQp4ejH3tjts}0+YT}_vf z?Y}$V{pSRd1FMJHU0Y%dw!PNaXMV<>GWO!n*ga$Cbg*ws`@T}_Aedp1zE`GNF;8E*SzWr7vhFF8TG8L$_;YGZFfRXWTJy&x&<3LgC39=pSct%l4D zy&%bKdt9%}FDf(B@q!HLjt7~u4e!YeJD#C`nf=3Qx0*K9evPx|#+;>2@9@5yS=oBD zq%?Npbfx&@={HOEJI?UKrT=i4kI|IgGiy%qEYxInj^eokt(s9l?$QTmmG}=rt3WKh zk3kO>RjDD-J=lLLdL#6oi{1wPkm!BTUlx53`X5Cfg??Q03Fse+&Q$(5(0#>(oQ@@B z$Bi_K-Ui)+o`k&*dQ$APr9^LpOUa(Mp)iP>gVLuUbI<58jiQs}vO&ceu0vQEK|1LzwV z&4GRh`l~L{eQxJIw&ldaFkts z4ErK6W?+9%_wY;$_DW`$tp6`Qg`7u~9Vygh5~fYHMscpS2&zK>J3a$C=j!1(4>~Jw zeyjA`6)ToWzs!3U9YZO?Z1fExEwW^_LN+4=Eel<=Rh8XM9Df2KbE0$ z_{`frA3A5pvPyK0Z8d&`OMI#5jj&${olUi24p)i35g(&yhd>AB*D3ln(7Qz64E;vY zw?OX^y%YK^qHl%XC;GL}cZhx+^nU0ZXcrzyA+w<0G4^Jq2H?0GFNg0h3;O5H?E8ob zXSN6WO2jte`7vKCUO5wKoql(AQSsII8;(Ngmxz8U^e>2B2fbDF z2Iy-dg(BxlG=n#wHt2%tInE}%6hE)iSL5e4eP{KYDj#v2`{jt? z>t{l;+zC4`J>#f+Poe%@^#b?)LY-4niaYFlYbopaCofKJUY4oN|{F+q$7^{ie( zuDU{fVDS?Da$N!vw~o4MeXPE2xU&98tbMZ5Q z-p+^oF4xOg@wJ7-rZja?-4qA)3h-ndjT&Fb8DO@#F9`D$9nE;uNaT~id%&^Lh~n(& zCl`U4?;)3fc@qnHA-K_E`dciX5AL#B{-Qa@Oh?=xG22Nm!*xoy=--h;u=ON8AHb&G5{!YPwtuQ1yA}W*1 zFI)(Y3*QfatP3k3d;nbbkZVqV#=l4ZURO1Vzks`X#8qpBo5??O&E1Caq2C{pfSnA$ zjRkeg0K$N=v<4EW%m!v6-!evBg)thwk^Btm3qK34KxM45@smrzW5Rz2&pYJ$>aZBX z4za4H0s}ITm6=#NcDNYXdGcXb@roSs3~)kt9sNgKoRa7F$EG`9~5MBxYyc7blQj8Bf$P}hO1O9qpvx1ATODy(h zz|bTHvxN27HWtqSuN5{+I5(o&EY1V>2%8nG#kR6I54>C0tRU{LgTh!r-z?#X7@ra4a5GEyCNU zVgHxsN7bMh9zj4EGO7_Bv4QPi`UiwV|Dfk4F8>8x|@Vk)Z$nX=w{tSE% zOV%PK`CY__!hjy(1(5kY=s@^($n0C9YOnAEkhwViM$>pqnD*S==ur6Ikh30&s>y(T zMMEhJiXUSPU>0N&dnP4(3uNwNQB^B^FJ!?G-f0@Q2tP&t6H(P8T!0VN%x9vi1D6f9 zzW`i@JvfQ)z7xUwzXFDA{N@zr_Z0zO=a=nB)O_j4AAsGXj;a@b44=B<4;N!O$lG8Mvxv*E_1;;uv_q;#}~c#R>3Ui}S%l z7PJ0{a1!n3H#%w*>~n@R#9G{tpdhFeI#i60m168>kiD#tzovl~^1HH>Pom@Gi!;IclxNad2CaeE%6hLze_R z$_gELM;6DyeQ7)(e1P#?j@oT;96XrDL&ASzd=FlUZyDk+jHdCpu#0PKeji?m#c}XN z8lMm@XM8_ifyHq!|9Zw1r=RS{#n8wKxSpgej)QB3!=trDINTLo!dIYv32rBS7H5O| zg&&8%e37emTU@pX-~T}|9A`kitA>P=*smoQ;R0cCHh4_<0r;z1Ts3a72R#FzxL^o}D^f4~F#IQX;67n- z4!A}5Fb_Iw!H#E^~cQqzSChj7?Im#{gQ`dxGN#;{G{z<+=PD(j>FU9-s< z!n|tFMPwhI3OuiZSqAWseS8grg4~z~?YJL$pAf!A{0Veai?}>| z+F3e;JB3FA_SLV%FeU*Z&-WFUze6}&kv`$Dy+Pq{LOgz1KlJl-U4j19N*o3( zJU;D!IQA~NTl}@cVSI~l*g=dd$x?>=_<1MCid+pZR4IM%x)z9&ata7lyzCBYUfqg|@R&XdwEr;EKlY)_oG ziJj+&U4s9k;a9e__=7?G0(17;6<&~rn5cHyBKDO2#U(|T^T4%Brg(Wy+T~oaccsPS zJC72avv&M1w9{IoM_boADP6qgUZ-0>zNW}&)d$xkeeP3U`dL^#iHBEamXLX64!IHv zugD>LPl@@aZwHD7&szIG)89W3urEzN-cM5aB zDZTZwWlrAkPcA!W+B*Kph=mTG4r6KjXNx)i57OBD!B*X$-Shpv?>n#WJnp$-;QP+dC1)V*raO;S_d9PL8+qJ0KeEOtb>iPtQKi(k zbChzJPtjfOh7d?!jNUQp$l<|{pjDZ}6 zj4|xY`F)qgBI-$IR_z8=4w_sAamn#!qhRYyM-Gp?A3p555mTG@acv*6YpAV~8Tx24 zmFfDx??%@R$*)WPQ_25Z^8b?jn&ej{e_!(VB$r`z{lu`2hBn^*12MfL`DMx9mi#Tr zFG((O>U#4d{hWJ8QKsu>#&Be6zsY;QaopssJ$iZUKJO_rk{&l^@(u)zlhNbwSEIi!7<{EBX z;5+oVGkkjxnVjjq{UP6;g9h2Un1FBU{KAt+s}K4{CPk7n9qmMOdZ4{MO+6SdKxb_ z7Tcl5I1HGDl8G1nhCOZUePS>C4SUMi9}zppjR-w%GQ|k_kw}>K)mfQ4M#6-yevV(@ z%Thq5Cet+AC^CKi5YHYHJ4VC~XX5^%wDlZmChjj@Tc0xeD94YUSru_)nK5OoF{S%$n7Ez zLgul;&ZV%_z9@1%hj(eyzx0-1|u5b{~QuXM9B zroUBM;e1bDDy?Y!KCET1UP6x1=E^!$Kt@Cb_In|7sVzs(ka8E{=R0T{80Hdt3Gz}} zkj{(Dt@=IvK&rxf3mT*-7v%`H7PjwEZtWK!`wLoyUlbXx;g?0`iaQVaHsoB`Z-|_N zd{Jb!@{ah|L+0vWoeg-dK);OG1YlZ@rv=YXbbeWd_fweVy#<}(pu=qPL+Do;jZ(i9 zIR$q7-5(7W>+kZPhuw`LN7E?FMVK|zYQ?cu!f7=e*zj$TIo1@OHIP|=aHsIRByv6ES0Qu7v|tRci@Y0elJG@=PK<9{;YJ#s9m`UyxXE37Rd?F#P~>usHt{@;$yb|lc#!T=SgR3s;!j}MW^@QG44ZRRWc1AWP-N_2 z=YK`UkdjkyY2mJBJ)5wJFl?@RzfzgQ&@4Dn#)ZT!6&Y_{r&?s3nNE$!Ga%Q^(SNkC z-1!InmkSp<-`3w{UJCpZAs|QS1on7CiI_G z&6~pe&aQ-hzG~ZoW;mh}^i!z^`GdsKclZ^UQ}N-Es_L&fBQGzSI#qvIyGXxYJ7?*h z_EfsHqvN5rj@E;1&Fy;+99ptIbFlTHgU$O6>_5NB=7X8kx^=0Vs?-sE+p3Lv z<*I^+anYYyRjKP%#dT^`a-?U~)v4YY#PSxN*DzQvt!u~`7p7`F8$u=yh8N?v3B9`u zc?6Rm_|Vc!6XE7tuh~SjZef{lXR47Lk^ay6%bM51~lz4k><*u>g4v zI4-QgvAu|Z@OQwagRZI*o}~Sxs~Uu-@f0}as{4fRB%gN8{l^SY?B1|AcvJx3V>%F2 zFqTz85|LsByt7BnxT;gQi#&?*!lU5wbFLb-crF-0Hs!CvzGTex6r%cx1}7|s2wIOK zbZl@jR%gjYSH&%!1x^ZYr+wTtH#_>1XMxjVKMbCWRbFH9%-1}GOcZ}ZhYPN1vUn!A zQ}`V0b1ox`Ue&TnHuog9f6)v`1jC5jGL5!LG5`n*~L^C`<@fhE#*a zv%pQlCWPsby3gX7;BH|P!NnoBKBIubA4vWG=k7Iw15hJu5(Rpz4 zCPpm$9(d+0j2Mv$?D?>#gt7ZfN(<-UV4EKet9s#G;8<>0H3;`k!~QQV2&)UCI0=Wc zcvz*-5F6+P(_Sz9D!62RSoH}9{ui?RC$jwc2_4k85cMSk_N0NdC>*?JsOe*lg0?~S~TUsr8Qx7T=)dIU}ad13VZ(pMP6N4jf;Xj7d$CE zA9-FAB7%1-D|`T4))H1}VGmr~8CG>!_F>_#!(Pyj`h`DWeP0Qy37x7?xz2bCOwFpnL{F$)o6b>PuJ&GwbjQI}~{o+tX zhjU>yEc_VqqKlXUT($*+L(E+gc^H^9!duz6F`-hx-7`>vz9#r`n3 z@VcY=$T)u#6?stfiv#~cRd&-+$1N@e4+^iL{g$JKEiMJ07Vd#PZ>6I~EslZ5EY1gy zgFW_;1(HxqSPli?NsC#*b>XkVzhJGSZdn`$^Y2oukM<-uZgD9%X>l<)mBjuJ3b2B- zIK03H8XQ$)aU5J{aT45MaVfaT;$rZ97PEY(@Q+#l4opc>-v3m@p?Jh{AomOZiUsa> z)PTit@bN4j5>DWjTCf{aWN{pPI*Z3VQ7mJDy^gwIaU49J#goGP%W}a%Oo_#D@U1M4 z;*X1r@P7J#5mRDu9PA~t6lvidW73MBQh1+ zxoQjcU(`~hw%~#xTt|l{TquOw!G&>LAS})U*9kujdszYQ6Bd_(n}pw{J&7xsu!#uP zpXwGxK0b#^cDt(2;_2Xi;XCl@QQhgP0gF@M*xK`02 zU|z$>`FNKz@!DlGFDZ<`IkEE{PYxo$H>Axx-Zxuz9_O1m|3L(Je$yeSkmoZw7$J{e z+aAYWCw~iv6H{6^DBmFbL$P;e`S*i8yMdvs3dXW3m=yjDwklH;TZ`rQ2&aYng&Tyw zA_DRczq-J=WU>4{$0@z7U>^tYj=|R?(Ki!9MM;{C!F2-7uzem3OMl# zf{DKK@$ml46fp140WW~S>vF(l5P1C!nB{rh4VeD^pMCuL=+E0L6Y4Kll@NGcWnzOY zz}qPkxlF&fqr!Vms^o!Njpww*i@@hBW>_v;%ocbtW@34EG3XMPH$#vsEt1)Tq{V5a z)~orJf@9!~oQVab1}@kAE$V~L67 z@zz&Se8Vsq9(lC&OYWynT7zX1r?S`%7VQ_a>{Uyv7uPIVG$Vi3$kg4looJofvT@_G z=4E#o z2)54ku_6Eeqg7hp=v*DUAoPJ=wr90Fj9T`rkDc4-xA(68=APBD?p;0&>Kky0x9jSz X_35Flz7n5;CNU^{n+1nmuxtMhTJNP+ diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 32b2641ba..4efa75bbf 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -308,17 +308,17 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) { // TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively // Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md // Race conditions here causes that some flows are lost in high-load scenarios -func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId][]BpfFlowMetrics { +func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId]BpfFlowMetrics { flowMap := m.objects.AggregatedFlows iterator := flowMap.Iterate() - flows := make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize) + var flow = make(map[BpfFlowId]BpfFlowMetrics, m.cacheMaxSize) id := BpfFlowId{} - var metrics []BpfFlowMetrics + var metric BpfFlowMetrics // Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions // TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively - for iterator.Next(&id, &metrics) { + for iterator.Next(&id, &metric) { if err := flowMap.Delete(id); err != nil { log.WithError(err).WithField("flowId", id). Warnf("couldn't delete flow entry") @@ -326,7 +326,7 @@ func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId][]BpfFlowMetrics { // We observed that eBFP PerCPU map might insert multiple times the same key in the map // (probably due to race conditions) so we need to re-join metrics again at userspace // TODO: instrument how many times the keys are is repeated in the same eviction - flows[id] = append(flows[id], metrics...) + flow[id] = metric } - return flows + return flow } diff --git a/pkg/flow/account.go b/pkg/flow/account.go index f1eca9701..a38b8140a 100644 --- a/pkg/flow/account.go +++ b/pkg/flow/account.go @@ -65,9 +65,7 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) { alog.Debug("exiting account routine") return } - if stored, ok := c.entries[record.Id]; ok { - Accumulate(stored, &record.Metrics) - } else { + if _, ok := c.entries[record.Id]; !ok { if len(c.entries) >= c.maxEntries { evictingEntries := c.entries c.entries = map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{} diff --git a/pkg/flow/account_test.go b/pkg/flow/account_test.go index 160054932..c53df299c 100644 --- a/pkg/flow/account_test.go +++ b/pkg/flow/account_test.go @@ -104,11 +104,11 @@ func TestEvict_MaxEntries(t *testing.T) { RawRecord: RawRecord{ Id: k1, Metrics: ebpf.BpfFlowMetrics{ - Bytes: 444, Packets: 2, StartMonoTimeTs: 123, EndMonoTimeTs: 789, Flags: 1, + Bytes: 123, Packets: 1, StartMonoTimeTs: 123, EndMonoTimeTs: 123, Flags: 1, }, }, TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond), - TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond), + TimeFlowEnd: now.Add(-(1000 - 123) * time.Nanosecond), }, k2: { RawRecord: RawRecord{ @@ -178,15 +178,15 @@ func TestEvict_Period(t *testing.T) { RawRecord: RawRecord{ Id: k1, Metrics: ebpf.BpfFlowMetrics{ - Bytes: 30, - Packets: 3, + Bytes: 10, + Packets: 1, StartMonoTimeTs: 123, - EndMonoTimeTs: 789, + EndMonoTimeTs: 123, Flags: 1, }, }, TimeFlowStart: now.Add(-1000 + 123), - TimeFlowEnd: now.Add(-1000 + 789), + TimeFlowEnd: now.Add(-1000 + 123), }, *records[0]) records = receiveTimeout(t, evictor) require.Len(t, records, 1) @@ -194,15 +194,15 @@ func TestEvict_Period(t *testing.T) { RawRecord: RawRecord{ Id: k1, Metrics: ebpf.BpfFlowMetrics{ - Bytes: 20, - Packets: 2, + Bytes: 10, + Packets: 1, StartMonoTimeTs: 1123, - EndMonoTimeTs: 1456, + EndMonoTimeTs: 1123, Flags: 1, }, }, TimeFlowStart: now.Add(-1000 + 1123), - TimeFlowEnd: now.Add(-1000 + 1456), + TimeFlowEnd: now.Add(-1000 + 1123), }, *records[0]) // no more flows are evicted diff --git a/pkg/flow/record.go b/pkg/flow/record.go index 4dc8d2a98..60dc0148a 100644 --- a/pkg/flow/record.go +++ b/pkg/flow/record.go @@ -70,19 +70,6 @@ func NewRecord( } } -func Accumulate(r *ebpf.BpfFlowMetrics, src *ebpf.BpfFlowMetrics) { - // time == 0 if the value has not been yet set - if r.StartMonoTimeTs == 0 || r.StartMonoTimeTs > src.StartMonoTimeTs { - r.StartMonoTimeTs = src.StartMonoTimeTs - } - if r.EndMonoTimeTs == 0 || r.EndMonoTimeTs < src.EndMonoTimeTs { - r.EndMonoTimeTs = src.EndMonoTimeTs - } - r.Bytes += src.Bytes - r.Packets += src.Packets - r.Flags |= src.Flags -} - // IP returns the net.IP equivalent object func IP(ia IPAddr) net.IP { return ia[:] diff --git a/pkg/flow/tracer_map.go b/pkg/flow/tracer_map.go index 3567592ce..563c2850b 100644 --- a/pkg/flow/tracer_map.go +++ b/pkg/flow/tracer_map.go @@ -25,7 +25,7 @@ type MapTracer struct { } type mapFetcher interface { - LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics + LookupAndDeleteMap() map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics } func NewMapTracer(fetcher mapFetcher, evictionTimeout time.Duration) *MapTracer { @@ -92,7 +92,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, forwardFlows chan<- []*Recor var forwardingFlows []*Record laterFlowNs := uint64(0) for flowKey, flowMetrics := range m.mapFetcher.LookupAndDeleteMap() { - aggregatedMetrics := m.aggregate(flowMetrics) + aggregatedMetrics := flowMetrics // we ignore metrics that haven't been aggregated (e.g. all the mapped values are ignored) if aggregatedMetrics.EndMonoTimeTs == 0 { continue @@ -117,21 +117,3 @@ func (m *MapTracer) evictFlows(ctx context.Context, forwardFlows chan<- []*Recor } mtlog.Debugf("%d flows evicted", len(forwardingFlows)) } - -func (m *MapTracer) aggregate(metrics []ebpf.BpfFlowMetrics) ebpf.BpfFlowMetrics { - if len(metrics) == 0 { - mtlog.Warn("invoked aggregate with no values") - return ebpf.BpfFlowMetrics{} - } - aggr := ebpf.BpfFlowMetrics{} - for _, mt := range metrics { - // eBPF hashmap values are not zeroed when the entry is removed. That causes that we - // might receive entries from previous collect-eviction timeslots. - // We need to check the flow time and discard old flows. - if mt.StartMonoTimeTs <= m.lastEvictionNs || mt.EndMonoTimeTs <= m.lastEvictionNs { - continue - } - Accumulate(&aggr, &mt) - } - return aggr -} diff --git a/pkg/flow/tracer_map_test.go b/pkg/flow/tracer_map_test.go index 4486992d1..9ea7c1680 100644 --- a/pkg/flow/tracer_map_test.go +++ b/pkg/flow/tracer_map_test.go @@ -11,36 +11,25 @@ import ( func TestPacketAggregation(t *testing.T) { type testCase struct { - input []ebpf.BpfFlowMetrics + input ebpf.BpfFlowMetrics expected ebpf.BpfFlowMetrics } tcs := []testCase{{ - input: []ebpf.BpfFlowMetrics{ - {Packets: 0, Bytes: 0, StartMonoTimeTs: 0, EndMonoTimeTs: 0, Flags: 1}, - {Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1}, - {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1}, - {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1}, - }, + input: ebpf.BpfFlowMetrics{Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1}, expected: ebpf.BpfFlowMetrics{ Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1, }, }, { - input: []ebpf.BpfFlowMetrics{ - {Packets: 0x3, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1}, - {Packets: 0x2, Bytes: 0x8c, StartMonoTimeTs: 0x17f3e9633a7f, EndMonoTimeTs: 0x17f3e96f164e, Flags: 1}, - {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1}, - {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1}, - }, + input: ebpf.BpfFlowMetrics{Packets: 0x5, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1}, expected: ebpf.BpfFlowMetrics{ - Packets: 0x5, Bytes: 0x5c4 + 0x8c, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1, + Packets: 0x5, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1, }, }} - ft := MapTracer{} for i, tc := range tcs { t.Run(fmt.Sprint(i), func(t *testing.T) { assert.Equal(t, tc.expected, - ft.aggregate(tc.input)) + tc.input) }) } } diff --git a/pkg/test/tracer_fake.go b/pkg/test/tracer_fake.go index 0943b7673..495acfca5 100644 --- a/pkg/test/tracer_fake.go +++ b/pkg/test/tracer_fake.go @@ -13,14 +13,14 @@ import ( // TracerFake fakes the kernel-side eBPF map structures for testing type TracerFake struct { interfaces map[ifaces.Interface]struct{} - mapLookups chan map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics + mapLookups chan map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics ringBuf chan ringbuf.Record } func NewTracerFake() *TracerFake { return &TracerFake{ interfaces: map[ifaces.Interface]struct{}{}, - mapLookups: make(chan map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics, 100), + mapLookups: make(chan map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics, 100), ringBuf: make(chan ringbuf.Record, 100), } } @@ -33,12 +33,12 @@ func (m *TracerFake) Register(iface ifaces.Interface) error { return nil } -func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics { +func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics { select { case r := <-m.mapLookups: return r default: - return map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics{} + return map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics{} } } @@ -46,7 +46,7 @@ func (m *TracerFake) ReadRingBuf() (ringbuf.Record, error) { return <-m.ringBuf, nil } -func (m *TracerFake) AppendLookupResults(results map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics) { +func (m *TracerFake) AppendLookupResults(results map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics) { m.mapLookups <- results }