Skip to content

Commit

Permalink
add NPM connection batch extractor (#28174)
Browse files Browse the repository at this point in the history
  • Loading branch information
brycekahle authored Aug 5, 2024
1 parent 66de37b commit 7424b24
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 179 deletions.
111 changes: 111 additions & 0 deletions pkg/network/tracer/connection/batch_extractor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build linux_bpf

package connection

import (
"time"

netebpf "github.com/DataDog/datadog-agent/pkg/network/ebpf"
)

const defaultExpiredStateInterval = 60 * time.Second

type batchExtractor struct {
numCPUs int
// stateByCPU contains the state of each batch.
// The slice is indexed by the CPU core number.
stateByCPU []percpuState
expiredStateInterval time.Duration
}

type percpuState struct {
// map of batch id -> offset of conns already processed by GetPendingConns
processed map[uint64]batchState
}

type batchState struct {
offset uint16
updated time.Time
}

func newBatchExtractor(numCPUs int) *batchExtractor {
state := make([]percpuState, numCPUs)
for cpu := 0; cpu < numCPUs; cpu++ {
state[cpu] = percpuState{
processed: make(map[uint64]batchState),
}
}
return &batchExtractor{
numCPUs: numCPUs,
stateByCPU: state,
expiredStateInterval: defaultExpiredStateInterval,
}
}

// NumCPUs returns the number of CPUs the batch extractor has been initialized for
func (e *batchExtractor) NumCPUs() int {
return e.numCPUs
}

// NextConnection returns the next unprocessed connection from the batch.
// Returns nil if no more connections are left.
func (e *batchExtractor) NextConnection(b *netebpf.Batch) *netebpf.Conn {
cpu := int(b.Cpu)
if cpu >= e.numCPUs {
return nil
}
if b.Len == 0 {
return nil
}

batchID := b.Id
cpuState := &e.stateByCPU[cpu]
offset := uint16(0)
if bState, ok := cpuState.processed[batchID]; ok {
offset = bState.offset
if offset >= netebpf.BatchSize {
delete(cpuState.processed, batchID)
return nil
}
if offset >= b.Len {
return nil
}
}

defer func() {
cpuState.processed[batchID] = batchState{
offset: offset + 1,
updated: time.Now(),
}
}()

switch offset {
case 0:
return &b.C0
case 1:
return &b.C1
case 2:
return &b.C2
case 3:
return &b.C3
default:
panic("batch size is out of sync")
}
}

// CleanupExpiredState removes entries from per-cpu state that haven't been updated in the last minute
func (e *batchExtractor) CleanupExpiredState(now time.Time) {
for cpu := 0; cpu < len(e.stateByCPU); cpu++ {
cpuState := &e.stateByCPU[cpu]
for id, s := range cpuState.processed {
if now.Sub(s.updated) >= e.expiredStateInterval {
delete(cpuState.processed, id)
}
}
}
}
70 changes: 70 additions & 0 deletions pkg/network/tracer/connection/batch_extractor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build linux_bpf

package connection

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

netebpf "github.com/DataDog/datadog-agent/pkg/network/ebpf"
)

const (
numTestCPUs = 4
)

func TestBatchExtract(t *testing.T) {
t.Run("normal flush", func(t *testing.T) {
extractor := newBatchExtractor(numTestCPUs)

batch := new(netebpf.Batch)
batch.Len = 4
batch.Id = 0
batch.Cpu = 0
batch.C0.Tup.Pid = 1
batch.C1.Tup.Pid = 2
batch.C2.Tup.Pid = 3
batch.C3.Tup.Pid = 4

var conns []*netebpf.Conn
for rc := extractor.NextConnection(batch); rc != nil; rc = extractor.NextConnection(batch) {
conns = append(conns, rc)
}
require.Len(t, conns, 4)
assert.Equal(t, uint32(1), conns[0].Tup.Pid)
assert.Equal(t, uint32(2), conns[1].Tup.Pid)
assert.Equal(t, uint32(3), conns[2].Tup.Pid)
assert.Equal(t, uint32(4), conns[3].Tup.Pid)
})

t.Run("partial flush", func(t *testing.T) {
extractor := newBatchExtractor(numTestCPUs)
// Simulate a partial flush
extractor.stateByCPU[0].processed = map[uint64]batchState{
0: {offset: 3},
}

batch := new(netebpf.Batch)
batch.Len = 4
batch.Id = 0
batch.Cpu = 0
batch.C0.Tup.Pid = 1
batch.C1.Tup.Pid = 2
batch.C2.Tup.Pid = 3
batch.C3.Tup.Pid = 4

var conns []*netebpf.Conn
for rc := extractor.NextConnection(batch); rc != nil; rc = extractor.NextConnection(batch) {
conns = append(conns, rc)
}
assert.Len(t, conns, 1)
assert.Equal(t, uint32(4), conns[0].Tup.Pid)
})
}
133 changes: 20 additions & 113 deletions pkg/network/tracer/connection/perf_batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@ import (
"time"

manager "github.com/DataDog/ebpf-manager"
cebpf "github.com/cilium/ebpf"

"github.com/DataDog/datadog-agent/pkg/ebpf/maps"
"github.com/DataDog/datadog-agent/pkg/network"
netebpf "github.com/DataDog/datadog-agent/pkg/network/ebpf"
"github.com/DataDog/datadog-agent/pkg/network/ebpf/probes"
)

const defaultExpiredStateInterval = 60 * time.Second

// perfBatchManager is responsible for two things:
//
// * Keeping track of the state of each batch object we read off the perf ring;
Expand All @@ -30,27 +27,19 @@ const defaultExpiredStateInterval = 60 * time.Second
// The motivation is to impose an upper limit on how long a TCP close connection
// event remains stored in the eBPF map before being processed by the NetworkAgent.
type perfBatchManager struct {
// eBPF
batchMap *maps.GenericMap[uint32, netebpf.Batch]

// stateByCPU contains the state of each batch.
// The slice is indexed by the CPU core number.
stateByCPU []percpuState

expiredStateInterval time.Duration

ch *cookieHasher
batchMap *maps.GenericMap[uint32, netebpf.Batch]
extractor *batchExtractor
ch *cookieHasher
}

// newPerfBatchManager returns a new `PerfBatchManager` and initializes the
// eBPF map that holds the tcp_close batch objects.
func newPerfBatchManager(batchMap *maps.GenericMap[uint32, netebpf.Batch], numCPUs uint32) (*perfBatchManager, error) {
func newPerfBatchManager(batchMap *maps.GenericMap[uint32, netebpf.Batch], extractor *batchExtractor) (*perfBatchManager, error) {
if batchMap == nil {
return nil, fmt.Errorf("batchMap is nil")
}

state := make([]percpuState, numCPUs)
for cpu := uint32(0); cpu < numCPUs; cpu++ {
for cpu := uint32(0); cpu < uint32(extractor.NumCPUs()); cpu++ {
b := new(netebpf.Batch)
// Ring buffer events don't have CPU information, so we associate each
// batch entry with a CPU during startup. This information is used by
Expand All @@ -59,132 +48,50 @@ func newPerfBatchManager(batchMap *maps.GenericMap[uint32, netebpf.Batch], numCP
if err := batchMap.Put(&cpu, b); err != nil {
return nil, fmt.Errorf("error initializing perf batch manager maps: %w", err)
}
state[cpu] = percpuState{
processed: make(map[uint64]batchState),
}
}

return &perfBatchManager{
batchMap: batchMap,
stateByCPU: state,
expiredStateInterval: defaultExpiredStateInterval,
ch: newCookieHasher(),
batchMap: batchMap,
extractor: extractor,
ch: newCookieHasher(),
}, nil
}

// ExtractBatchInto extracts from the given batch all connections that haven't been processed yet.
func (p *perfBatchManager) ExtractBatchInto(buffer *network.ConnectionBuffer, b *netebpf.Batch) {
cpu := int(b.Cpu)
if cpu >= len(p.stateByCPU) {
return
}

batchID := b.Id
cpuState := &p.stateByCPU[cpu]
start := uint16(0)
if bState, ok := cpuState.processed[batchID]; ok {
start = bState.offset
for rc := p.extractor.NextConnection(b); rc != nil; rc = p.extractor.NextConnection(b) {
conn := buffer.Next()
populateConnStats(conn, &rc.Tup, &rc.Conn_stats, p.ch)
updateTCPStats(conn, &rc.Tcp_stats, rc.Tcp_retransmits)
}

p.extractBatchInto(buffer, b, start, netebpf.BatchSize)
delete(cpuState.processed, batchID)
}

// GetPendingConns return all connections that are in batches that are not yet full.
// It tracks which connections have been processed by this call, by batch id.
// This prevents double-processing of connections between GetPendingConns and Extract.
func (p *perfBatchManager) GetPendingConns(buffer *network.ConnectionBuffer) {
b := new(netebpf.Batch)
for cpu := uint32(0); cpu < uint32(len(p.stateByCPU)); cpu++ {
cpuState := &p.stateByCPU[cpu]

for cpu := uint32(0); cpu < uint32(p.extractor.NumCPUs()); cpu++ {
err := p.batchMap.Lookup(&cpu, b)
if err != nil {
continue
}

batchLen := b.Len
if batchLen == 0 {
continue
}

// have we already processed these messages?
start := uint16(0)
batchID := b.Id
if bState, ok := cpuState.processed[batchID]; ok {
start = bState.offset
}

p.extractBatchInto(buffer, b, start, batchLen)
// update timestamp regardless since this partial batch still exists
cpuState.processed[batchID] = batchState{offset: batchLen, updated: time.Now()}
}

p.cleanupExpiredState(time.Now())
}

type percpuState struct {
// map of batch id -> offset of conns already processed by GetPendingConns
processed map[uint64]batchState
}

type batchState struct {
offset uint16
updated time.Time
}

// ExtractBatchInto extract network.ConnectionStats objects from the given `batch` into the supplied `buffer`.
// The `start` (inclusive) and `end` (exclusive) arguments represent the offsets of the connections we're interested in.
func (p *perfBatchManager) extractBatchInto(buffer *network.ConnectionBuffer, b *netebpf.Batch, start, end uint16) {
if start >= end || end > netebpf.BatchSize {
return
}

var ct netebpf.Conn
for i := start; i < end; i++ {
switch i {
case 0:
ct = b.C0
case 1:
ct = b.C1
case 2:
ct = b.C2
case 3:
ct = b.C3
default:
panic("batch size is out of sync")
}

conn := buffer.Next()
populateConnStats(conn, &ct.Tup, &ct.Conn_stats, p.ch)
updateTCPStats(conn, &ct.Tcp_stats, ct.Tcp_retransmits)
}
}

func (p *perfBatchManager) cleanupExpiredState(now time.Time) {
for cpu := 0; cpu < len(p.stateByCPU); cpu++ {
cpuState := &p.stateByCPU[cpu]
for id, s := range cpuState.processed {
if now.Sub(s.updated) >= p.expiredStateInterval {
delete(cpuState.processed, id)
}
for rc := p.extractor.NextConnection(b); rc != nil; rc = p.extractor.NextConnection(b) {
c := buffer.Next()
populateConnStats(c, &rc.Tup, &rc.Conn_stats, p.ch)
updateTCPStats(c, &rc.Tcp_stats, rc.Tcp_retransmits)
}
}
p.extractor.CleanupExpiredState(time.Now())
}

func newConnBatchManager(mgr *manager.Manager) (*perfBatchManager, error) {
func newConnBatchManager(mgr *manager.Manager, extractor *batchExtractor) (*perfBatchManager, error) {
connCloseMap, err := maps.GetMap[uint32, netebpf.Batch](mgr, probes.ConnCloseBatchMap)
if err != nil {
return nil, fmt.Errorf("unable to get map %s: %s", probes.ConnCloseBatchMap, err)
}
numCPUs, err := cebpf.PossibleCPU()
if err != nil {
return nil, fmt.Errorf("unable to get number of CPUs: %s", err)
}
if err != nil {
return nil, err
}
batchMgr, err := newPerfBatchManager(connCloseMap, uint32(numCPUs))
batchMgr, err := newPerfBatchManager(connCloseMap, extractor)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 7424b24

Please sign in to comment.