Skip to content

Commit

Permalink
Add an option to report deltas during a flow period instead of cumula…
Browse files Browse the repository at this point in the history
…tive stats (#38223) (#38537)

* add an option to report delta flows

* adding unit test and doc update

* fix linting issues

* fix imports

* addressing review comments

(cherry picked from commit b98b763)

Co-authored-by: kvalliyurnatt <[email protected]>
  • Loading branch information
mergify[bot] and kvalliyurnatt authored Mar 21, 2024
1 parent 62a1cb5 commit 2b50d68
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 21 deletions.
2 changes: 2 additions & 0 deletions packetbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ type Flows struct {
KeepNull bool `config:"keep_null"`
// Index is used to overwrite the index where flows are published
Index string `config:"index"`
// DeltaFlowReports when enabled will report flow network stats(bytes, packets) as delta values
EnableDeltaFlowReports bool `config:"enable_delta_flow_reports"`
}

type ProtocolCommon struct {
Expand Down
6 changes: 6 additions & 0 deletions packetbeat/docs/packetbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,12 @@ in time. Periodical reporting can be disabled by setting the value to -1. If
disabled, flows are still reported once being timed out. The default value is
10s.

[float]
==== `enable_delta_flow_reports`

Configure network.bytes and network.packets to be a delta
value instead of a cumlative sum for each flow period. The default value is false.

[float]
[[packetbeat-configuration-flows-fields]]
==== `fields`
Expand Down
2 changes: 1 addition & 1 deletion packetbeat/flows/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewFlows(pub Reporter, watcher *procs.ProcessesWatcher, config *config.Flow

counter := &counterReg{}

worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period)
worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period, config.EnableDeltaFlowReports)
if err != nil {
logp.Err("failed to configure flows processing intervals: %v", err)
return nil, err
Expand Down
38 changes: 23 additions & 15 deletions packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (w *worker) periodically(tick time.Duration, fn func() error) {
// reporting will be done at flow lifetime end.
// Flows are published via the pub Reporter after being enriched with process information
// by watcher.
func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration) (*worker, error) {
func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration, enableDeltaFlowReports bool) (*worker, error) {
if timeout < time.Second {
return nil, ErrInvalidTimeout
}
Expand Down Expand Up @@ -161,10 +161,11 @@ func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMe

defaultBatchSize := 1024
processor := &flowsProcessor{
table: table,
watcher: watcher,
counters: counters,
timeout: timeout,
table: table,
watcher: watcher,
counters: counters,
timeout: timeout,
enableDeltaFlowReporting: enableDeltaFlowReports,
}
processor.spool.init(pub, defaultBatchSize)

Expand Down Expand Up @@ -221,11 +222,12 @@ func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period i
}

type flowsProcessor struct {
spool spool
watcher *procs.ProcessesWatcher
table *flowMetaTable
counters *counterReg
timeout time.Duration
spool spool
watcher *procs.ProcessesWatcher
table *flowMetaTable
counters *counterReg
timeout time.Duration
enableDeltaFlowReporting bool
}

func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastReport bool) {
Expand Down Expand Up @@ -281,13 +283,13 @@ func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastRe
}

func (fw *flowsProcessor) report(w *worker, ts time.Time, flow *biFlow, isOver bool, intNames, uintNames, floatNames []string) {
event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames)
event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames, fw.enableDeltaFlowReporting)

debugf("add event: %v", event)
fw.spool.publish(event)
}

func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string) beat.Event {
func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string, enableDeltaFlowReporting bool) beat.Event {
timestamp := ts

event := mapstr.M{
Expand Down Expand Up @@ -418,7 +420,7 @@ func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOve
var totalBytes, totalPackets uint64
if f.stats[0] != nil {
// Source stats.
stats := encodeStats(f.stats[0], intNames, uintNames, floatNames)
stats := encodeStats(f.stats[0], intNames, uintNames, floatNames, enableDeltaFlowReporting)
for k, v := range stats {
switch k {
case "icmpV4TypeCode":
Expand Down Expand Up @@ -449,7 +451,7 @@ func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOve
}
if f.stats[1] != nil {
// Destination stats.
stats := encodeStats(f.stats[1], intNames, uintNames, floatNames)
stats := encodeStats(f.stats[1], intNames, uintNames, floatNames, enableDeltaFlowReporting)
for k, v := range stats {
switch k {
case "icmpV4TypeCode", "icmpV6TypeCode":
Expand Down Expand Up @@ -533,7 +535,7 @@ func formatHardwareAddr(addr net.HardwareAddr) string {
return string(buf)
}

func encodeStats(stats *flowStats, ints, uints, floats []string) map[string]interface{} {
func encodeStats(stats *flowStats, ints, uints, floats []string, enableDeltaFlowReporting bool) map[string]interface{} {
report := make(map[string]interface{})

i := 0
Expand All @@ -551,6 +553,12 @@ func encodeStats(stats *flowStats, ints, uints, floats []string) map[string]inte
for m := mask; m != 0; m >>= 1 {
if (m & 1) == 1 {
report[uints[i]] = stats.uints[i]
if enableDeltaFlowReporting && (uints[i] == "bytes" || uints[i] == "packets") {
// If Delta Flow Reporting is enabled, reset bytes and packets at each period.
// Only the bytes and packets received during the flow period will be reported.
// This should be thread safe as it is called under the flowmetadatatable lock.
stats.uints[i] = 0
}
}
i++
}
Expand Down
41 changes: 36 additions & 5 deletions packetbeat/flows/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ import (
"encoding/json"
"flag"
"os"
"reflect"
"testing"
"time"

"github.com/elastic/go-lookslike/isdef"

"github.com/elastic/go-lookslike"
"gotest.tools/assert"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/packetbeat/procs"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/isdef"
)

// Use `go test -data` to update sample event files.
Expand Down Expand Up @@ -65,7 +66,7 @@ func TestCreateEvent(t *testing.T) {
}
bif.stats[0] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{10, 1}}
bif.stats[1] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{460, 2}}
event := createEvent(&procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil)
event := createEvent(&procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil, false)

// Validate the contents of the event.
validate := lookslike.MustCompile(map[string]interface{}{
Expand Down Expand Up @@ -116,7 +117,7 @@ func TestCreateEvent(t *testing.T) {

// Write the event to disk if -data is used.
if *dataFlag {
event.Fields.Put("@timestamp", common.Time(end)) //nolint:errcheck // Never fails.
event.Fields.Put("@timestamp", common.Time(end))
output, err := json.MarshalIndent(&event.Fields, "", " ")
if err != nil {
t.Fatal(err)
Expand All @@ -126,4 +127,34 @@ func TestCreateEvent(t *testing.T) {
t.Fatal(err)
}
}

// when enableDeltaFlowReporting is true, the flow stats should be reset
expectbiFlow := &biFlow{
id: id.rawFlowID,
killed: 1,
createTS: start,
ts: end,
dir: flowDirForward,
}
expectbiFlow.stats[0] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{0, 0}}
expectbiFlow.stats[1] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{0, 0}}

// Assert the biflow is not 0 before the test
assert.Assert(t, !reflect.DeepEqual(expectbiFlow.stats[0].uints, bif.stats[0].uints))
assert.Assert(t, !reflect.DeepEqual(expectbiFlow.stats[1].uints, bif.stats[1].uints))

event = createEvent(&procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil, true)
result = validate(event.Fields)
if errs := result.Errors(); len(errs) > 0 {
for _, err := range errs {
t.Error(err)
}
t.FailNow()
}

// Assert the biflow is 0 after the test
assert.DeepEqual(t, expectbiFlow.stats[0].uintFlags, bif.stats[0].uintFlags)
assert.DeepEqual(t, expectbiFlow.stats[0].uints, bif.stats[0].uints)
assert.DeepEqual(t, expectbiFlow.stats[1].uintFlags, bif.stats[1].uintFlags)
assert.DeepEqual(t, expectbiFlow.stats[1].uints, bif.stats[1].uints)
}

0 comments on commit 2b50d68

Please sign in to comment.