diff --git a/docs/api.md b/docs/api.md index acb46e414..34f838586 100644 --- a/docs/api.md +++ b/docs/api.md @@ -241,6 +241,7 @@ Following is the supported API format for specifying connection tracking: last: last splitAB: When true, 2 output fields will be created. One for A->B and one for B->A flows. input: The input field to base the operation on. When omitted, 'name' is used + reportMissing: When true, missing input will produce MissingFieldError metric and error logs scheduling: list of timeouts and intervals to apply per selector selector: key-value map to match against connection fields to apply this scheduling endConnectionTimeout: duration of time to wait from the last flow log to end a connection diff --git a/pkg/api/conntrack.go b/pkg/api/conntrack.go index 01bf30779..ff18998b7 100644 --- a/pkg/api/conntrack.go +++ b/pkg/api/conntrack.go @@ -70,10 +70,11 @@ type ConnTrackHash struct { } type OutputField struct { - Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"output field name"` - Operation string `yaml:"operation,omitempty" json:"operation,omitempty" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"` - SplitAB bool `yaml:"splitAB,omitempty" json:"splitAB,omitempty" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."` - Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"The input field to base the operation on. When omitted, 'name' is used"` + Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"output field name"` + Operation string `yaml:"operation,omitempty" json:"operation,omitempty" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"` + SplitAB bool `yaml:"splitAB,omitempty" json:"splitAB,omitempty" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."` + Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"The input field to base the operation on. When omitted, 'name' is used"` + ReportMissing bool `yaml:"reportMissing,omitempty" json:"reportMissing,omitempty" doc:"When true, missing input will produce MissingFieldError metric and error logs"` } type ConnTrackOperationEnum struct { diff --git a/pkg/pipeline/decode/decode_protobuf.go b/pkg/pipeline/decode/decode_protobuf.go index f2f1ead1a..756d6cebe 100644 --- a/pkg/pipeline/decode/decode_protobuf.go +++ b/pkg/pipeline/decode/decode_protobuf.go @@ -40,13 +40,11 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap { } out := config.GenericMap{ "FlowDirection": int(flow.Direction.Number()), - "Bytes": flow.Bytes, "SrcAddr": ipToStr(flow.Network.GetSrcAddr()), "DstAddr": ipToStr(flow.Network.GetDstAddr()), "SrcMac": macToStr(flow.DataLink.GetSrcMac()), "DstMac": macToStr(flow.DataLink.GetDstMac()), "Etype": flow.EthProtocol, - "Packets": flow.Packets, "Duplicate": flow.Duplicate, "Proto": flow.Transport.GetProtocol(), "TimeFlowStartMs": flow.TimeFlowStart.AsTime().UnixMilli(), @@ -56,6 +54,14 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap { "AgentIP": ipToStr(flow.AgentIp), } + if flow.Bytes != 0 { + out["Bytes"] = flow.Bytes + } + + if flow.Packets != 0 { + out["Packets"] = flow.Packets + } + proto := flow.Transport.GetProtocol() if proto == syscall.IPPROTO_ICMP || proto == syscall.IPPROTO_ICMPV6 { out["IcmpType"] = flow.GetIcmpType() diff --git a/pkg/pipeline/extract/conntrack/aggregator.go b/pkg/pipeline/extract/conntrack/aggregator.go index 1dd35d4d8..3e84d7a52 100644 --- a/pkg/pipeline/extract/conntrack/aggregator.go +++ b/pkg/pipeline/extract/conntrack/aggregator.go @@ -37,11 +37,12 @@ type aggregator interface { } type aggregateBase struct { - inputField string - outputField string - splitAB bool - initVal interface{} - metrics *metricsType + inputField string + outputField string + splitAB bool + initVal interface{} + metrics *metricsType + reportMissing bool } type aSum struct{ aggregateBase } @@ -64,7 +65,7 @@ func newAggregator(of api.OutputField, metrics *metricsType) (aggregator, error) } else { inputField = of.Name } - aggBase := aggregateBase{inputField: inputField, outputField: of.Name, splitAB: of.SplitAB, metrics: metrics} + aggBase := aggregateBase{inputField: inputField, outputField: of.Name, splitAB: of.SplitAB, metrics: metrics, reportMissing: of.ReportMissing} var agg aggregator switch of.Operation { case api.ConnTrackOperationName("Sum"): @@ -109,10 +110,15 @@ func (agg *aggregateBase) getOutputField(d direction) string { func (agg *aggregateBase) getInputFieldValue(flowLog config.GenericMap) (float64, error) { rawValue, ok := flowLog[agg.inputField] if !ok { - if agg.metrics != nil { - agg.metrics.aggregatorErrors.WithLabelValues("MissingFieldError", agg.inputField).Inc() + // error only if explicitly specified as FLP skip empty fields by default to reduce storage size + if agg.reportMissing { + if agg.metrics != nil { + agg.metrics.aggregatorErrors.WithLabelValues("MissingFieldError", agg.inputField).Inc() + } + return 0, fmt.Errorf("missing field %v", agg.inputField) } - return 0, fmt.Errorf("missing field %v", agg.inputField) + // fallback on 0 without error + return 0, nil } floatValue, err := utils.ConvertToFloat64(rawValue) if err != nil { @@ -185,5 +191,7 @@ func (cp *aFirst) update(conn connection, flowLog config.GenericMap, d direction } func (cp *aLast) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) { - conn.updateAggValue(cp.outputField, flowLog[cp.inputField]) + if flowLog[cp.inputField] != nil { + conn.updateAggValue(cp.outputField, flowLog[cp.inputField]) + } } diff --git a/pkg/pipeline/extract/conntrack/aggregator_test.go b/pkg/pipeline/extract/conntrack/aggregator_test.go index 1a48e98b2..21b9eb948 100644 --- a/pkg/pipeline/extract/conntrack/aggregator_test.go +++ b/pkg/pipeline/extract/conntrack/aggregator_test.go @@ -65,52 +65,52 @@ func TestNewAggregator_Valid(t *testing.T) { { name: "Default SplitAB", outputField: api.OutputField{Name: "MyAgg", Operation: "sum"}, - expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}}, + expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil, false}}, }, { name: "Default input", outputField: api.OutputField{Name: "MyAgg", Operation: "sum", SplitAB: true}, - expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, float64(0), nil}}, + expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, float64(0), nil, false}}, }, { name: "Custom input", outputField: api.OutputField{Name: "MyAgg", Operation: "sum", Input: "MyInput"}, - expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, float64(0), nil}}, + expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, float64(0), nil, false}}, }, { - name: "OperationType sum", - outputField: api.OutputField{Name: "MyAgg", Operation: "sum"}, - expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}}, + name: "OperationType sum with errors", + outputField: api.OutputField{Name: "MyAgg", Operation: "sum", ReportMissing: true}, + expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil, true}}, }, { - name: "OperationType count", - outputField: api.OutputField{Name: "MyAgg", Operation: "count"}, - expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}}, + name: "OperationType count with errors", + outputField: api.OutputField{Name: "MyAgg", Operation: "count", ReportMissing: true}, + expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil, true}}, }, { name: "OperationType max", outputField: api.OutputField{Name: "MyAgg", Operation: "max"}, - expected: &aMax{aggregateBase{"MyAgg", "MyAgg", false, -math.MaxFloat64, nil}}, + expected: &aMax{aggregateBase{"MyAgg", "MyAgg", false, -math.MaxFloat64, nil, false}}, }, { name: "OperationType min", outputField: api.OutputField{Name: "MyAgg", Operation: "min"}, - expected: &aMin{aggregateBase{"MyAgg", "MyAgg", false, math.MaxFloat64, nil}}, + expected: &aMin{aggregateBase{"MyAgg", "MyAgg", false, math.MaxFloat64, nil, false}}, }, { name: "Default first", outputField: api.OutputField{Name: "MyCp", Operation: "first"}, - expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil, nil}}, + expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil, nil, false}}, }, { name: "Custom input first", outputField: api.OutputField{Name: "MyCp", Operation: "first", Input: "MyInput"}, - expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil, nil}}, + expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil, nil, false}}, }, { name: "Default last", outputField: api.OutputField{Name: "MyCp", Operation: "last"}, - expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil, nil}}, + expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil, nil, false}}, }, } @@ -132,6 +132,7 @@ func TestAddField_and_Update(t *testing.T) { {Name: "maxFlowLogBytes", Operation: "max", Input: "Bytes"}, {Name: "FirstFlowDirection", Operation: "first", Input: "FlowDirection"}, {Name: "LastFlowDirection", Operation: "last", Input: "FlowDirection"}, + {Name: "PktDropLatestDropCause", Operation: "last", Input: "PktDropLatestDropCause"}, } var aggs []aggregator for _, of := range ofs { @@ -158,13 +159,49 @@ func TestAddField_and_Update(t *testing.T) { name: "flowLog 1", flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirA, 100, 10, false), direction: dirAB, - expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(0), "Packets": float64(10), "maxFlowLogBytes": float64(100), "minFlowLogBytes": float64(100), "numFlowLogs": float64(1), "FirstFlowDirection": 0, "LastFlowDirection": 0}, + expected: map[string]interface{}{ + "Bytes_AB": float64(100), + "Bytes_BA": float64(0), + "Packets": float64(10), + "maxFlowLogBytes": float64(100), + "minFlowLogBytes": float64(100), + "numFlowLogs": float64(1), + "FirstFlowDirection": 0, + "LastFlowDirection": 0, + "PktDropLatestDropCause": nil, + }, }, { name: "flowLog 2", - flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirB, 200, 20, false), + flowLog: config.GenericMap{"SrcAddr": ipA, "DstAddr": ipB, "Bytes": 100, "FlowDirection": flowDirA, "PktDropLatestDropCause": "SKB_DROP_REASON_NO_SOCKET"}, + direction: dirAB, + expected: map[string]interface{}{ + "Bytes_AB": float64(200), // updated bytes count + "Bytes_BA": float64(0), + "Packets": float64(10), + "maxFlowLogBytes": float64(100), + "minFlowLogBytes": float64(100), + "numFlowLogs": float64(2), // updated flow count + "FirstFlowDirection": 0, + "LastFlowDirection": 0, + "PktDropLatestDropCause": "SKB_DROP_REASON_NO_SOCKET", // added drop cause + }, + }, + { + name: "flowLog 3", + flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirB, 300, 20, false), direction: dirBA, - expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(200), "Packets": float64(30), "maxFlowLogBytes": float64(200), "minFlowLogBytes": float64(100), "numFlowLogs": float64(2), "FirstFlowDirection": 0, "LastFlowDirection": 1}, + expected: map[string]interface{}{ + "Bytes_AB": float64(200), + "Bytes_BA": float64(300), // updated reverse direction byte count + "Packets": float64(30), + "maxFlowLogBytes": float64(300), // updated max bytes from any direction + "minFlowLogBytes": float64(100), + "numFlowLogs": float64(3), // updated count + "FirstFlowDirection": 0, + "LastFlowDirection": 1, + "PktDropLatestDropCause": "SKB_DROP_REASON_NO_SOCKET", // missing field is kept to its last available value + }, }, } @@ -172,7 +209,17 @@ func TestAddField_and_Update(t *testing.T) { for _, agg := range aggs { agg.addField(conn) } - expectedInits := map[string]interface{}{"Bytes_AB": float64(0), "Bytes_BA": float64(0), "Packets": float64(0), "maxFlowLogBytes": float64(-math.MaxFloat64), "minFlowLogBytes": float64(math.MaxFloat64), "numFlowLogs": float64(0), "FirstFlowDirection": nil, "LastFlowDirection": nil} + expectedInits := map[string]interface{}{ + "Bytes_AB": float64(0), + "Bytes_BA": float64(0), + "Packets": float64(0), + "maxFlowLogBytes": float64(-math.MaxFloat64), + "minFlowLogBytes": float64(math.MaxFloat64), + "numFlowLogs": float64(0), + "FirstFlowDirection": nil, + "LastFlowDirection": nil, + "PktDropLatestDropCause": nil, + } require.Equal(t, expectedInits, conn.(*connType).aggFields) for i, test := range table { @@ -188,7 +235,7 @@ func TestAddField_and_Update(t *testing.T) { func TestMissingFieldError(t *testing.T) { test.ResetPromRegistry() metrics := newMetrics(opMetrics) - agg, err := newAggregator(api.OutputField{Name: "Bytes", Operation: "sum", SplitAB: true}, metrics) + agg, err := newAggregator(api.OutputField{Name: "Bytes", Operation: "sum", SplitAB: true, ReportMissing: true}, metrics) require.NoError(t, err) conn := NewConnBuilder(metrics).Build() @@ -201,6 +248,22 @@ func TestMissingFieldError(t *testing.T) { require.Contains(t, exposed, `conntrack_aggregator_errors{error="MissingFieldError",field="Bytes"} 1`) } +func TestSkipMissingFieldError(t *testing.T) { + test.ResetPromRegistry() + metrics := newMetrics(opMetrics) + agg, err := newAggregator(api.OutputField{Name: "Bytes", Operation: "sum", SplitAB: true}, metrics) + require.NoError(t, err) + + conn := NewConnBuilder(metrics).Build() + agg.addField(conn) + + flowLog := config.GenericMap{} + agg.update(conn, flowLog, dirAB, true) + + exposed := test.ReadExposedMetrics(t) + require.NotContains(t, exposed, `conntrack_aggregator_errors{error="MissingFieldError",field="Bytes"}`) +} + func TestFloat64ConversionError(t *testing.T) { test.ResetPromRegistry() metrics := newMetrics(opMetrics) diff --git a/pkg/pipeline/extract/conntrack/conn.go b/pkg/pipeline/extract/conntrack/conn.go index 8cf00ff87..d3b9594e6 100644 --- a/pkg/pipeline/extract/conntrack/conn.go +++ b/pkg/pipeline/extract/conntrack/conn.go @@ -19,6 +19,7 @@ package conntrack import ( "fmt" + "reflect" "time" "github.com/netobserv/flowlogs-pipeline/pkg/utils" @@ -100,7 +101,9 @@ func (c *connType) getNextHeartbeatTime() time.Time { func (c *connType) toGenericMap() config.GenericMap { gm := config.GenericMap{} for k, v := range c.aggFields { - gm[k] = v + if v != nil && (reflect.TypeOf(v).Kind() != reflect.Float64 || v.(float64) != 0) { + gm[k] = v + } } // In case of a conflict between the keys and the aggFields / cpFields, the keys should prevail. diff --git a/pkg/pipeline/extract/conntrack/utils_test.go b/pkg/pipeline/extract/conntrack/utils_test.go index e7cb87c18..0f488db58 100644 --- a/pkg/pipeline/extract/conntrack/utils_test.go +++ b/pkg/pipeline/extract/conntrack/utils_test.go @@ -59,14 +59,26 @@ func newMockRecordConnAB(srcIP string, srcPort int, dstIP string, dstPort int, p "DstAddr": dstIP, "DstPort": dstPort, "Proto": protocol, - "Bytes_AB": bytesAB, - "Bytes_BA": bytesBA, - "Packets_AB": packetsAB, - "Packets_BA": packetsBA, "numFlowLogs": numFlowLogs, api.IsFirstFieldName: false, }, } + + if bytesAB != 0 { + mock.record["Bytes_AB"] = bytesAB + } + + if bytesBA != 0 { + mock.record["Bytes_BA"] = bytesBA + } + + if bytesAB != 0 { + mock.record["Packets_AB"] = packetsAB + } + + if bytesBA != 0 { + mock.record["Packets_BA"] = packetsBA + } return mock }