diff --git a/README.md b/README.md index 0c263156f..ab6373193 100644 --- a/README.md +++ b/README.md @@ -402,20 +402,24 @@ parameters: network: KubeConfigPath: /tmp/config rules: - - input: srcIP - output: srcSubnet - type: add_subnet - parameters: /24 - - input: dstPort - output: service - type: add_service - parameters: protocol - - input: dstIP - output: dstLocation - type: add_location - - input: srcIP - output: srcK8S - type: add_kubernetes + - type: add_subnet + add_subnet: + input: srcIP + output: srcSubnet + subnet_mask: /24 + - type: add_service + add_service: + input: dstPort + output: service + protocol: protocol + - type: add_location + add_location: + input: dstIP + output: dstLocation + - type: add_kubernetes + kubernetes: + input: srcIP + output: srcK8S ``` The rule `add_subnet` generates a new field named `srcSubnet` with the diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index 6ff7500be..f597decea 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -117,37 +117,40 @@ parameters: type: network network: rules: - - input: dstPort - output: service - type: add_service - parameters: proto - - input: dstIP - output: dstSubnet24 - type: add_subnet - parameters: /24 - - input: srcIP - output: srcSubnet24 - type: add_subnet - parameters: /24 - - input: srcIP - output: srcSubnet - type: add_subnet - parameters: /16 - - input: dstIP - output: dstSubnet - type: add_subnet - parameters: /16 - - input: srcIP - output: srcK8S - type: add_kubernetes - parameters: srcK8S_labels - - input: bytes - output: all - type: add_if - parameters: '>=0' - - input: dstIP - output: dstLocation - type: add_location + - type: add_service + add_service: + input: dstPort + output: service + protocol: proto + - type: add_subnet + add_subnet: + input: dstIP + output: dstSubnet24 + subnet_mask: /24 + - type: add_subnet + add_subnet: + input: srcIP + output: srcSubnet24 + subnet_mask: /24 + - type: add_subnet + add_subnet: + input: srcIP + output: srcSubnet + subnet_mask: /16 + - type: add_subnet + add_subnet: + input: dstIP + output: dstSubnet + subnet_mask: /16 + - type: add_kubernetes + kubernetes: + input: srcIP + output: srcK8S + labels_prefix: srcK8S_labels + - type: add_location + add_location: + input: dstIP + output: dstLocation - name: extract_aggregate extract: type: aggregates @@ -252,6 +255,7 @@ parameters: encode: type: prom prom: + promconnectioninfo: null metrics: - name: bandwidth_per_network_service type: counter diff --git a/network_definitions/bandwidth_per_network_service.yaml b/network_definitions/bandwidth_per_network_service.yaml index 7a9a3d038..5f3067d25 100644 --- a/network_definitions/bandwidth_per_network_service.yaml +++ b/network_definitions/bandwidth_per_network_service.yaml @@ -12,10 +12,11 @@ tags: - network-service transform: rules: - - input: dstPort - output: service - type: add_service - parameters: proto + - type: add_service + add_service: + input: dstPort + output: service + protocol: proto extract: type: aggregates aggregates: diff --git a/network_definitions/bandwidth_per_src_dest_subnet.yaml b/network_definitions/bandwidth_per_src_dest_subnet.yaml index e60237cf1..363d6c170 100644 --- a/network_definitions/bandwidth_per_src_dest_subnet.yaml +++ b/network_definitions/bandwidth_per_src_dest_subnet.yaml @@ -12,14 +12,16 @@ tags: - subnet transform: rules: - - input: dstIP - output: dstSubnet24 - type: add_subnet - parameters: /24 - - input: srcIP - output: srcSubnet24 - type: add_subnet - parameters: /24 + - type: add_subnet + add_subnet: + input: dstIP + output: dstSubnet24 + subnet_mask: /24 + - type: add_subnet + add_subnet: + input: srcIP + output: srcSubnet24 + subnet_mask: /24 extract: type: aggregates aggregates: diff --git a/network_definitions/bandwidth_per_src_subnet.yaml b/network_definitions/bandwidth_per_src_subnet.yaml index 557fffe6b..22754bb3a 100644 --- a/network_definitions/bandwidth_per_src_subnet.yaml +++ b/network_definitions/bandwidth_per_src_subnet.yaml @@ -12,10 +12,11 @@ tags: - subnet transform: rules: - - input: srcIP - output: srcSubnet - type: add_subnet - parameters: /16 + - type: add_subnet + add_subnet: + input: srcIP + output: srcSubnet + subnet_mask: /16 extract: type: aggregates aggregates: diff --git a/network_definitions/connection_rate_per_dest_subnet.yaml b/network_definitions/connection_rate_per_dest_subnet.yaml index 2056aaa99..b3b2c6567 100644 --- a/network_definitions/connection_rate_per_dest_subnet.yaml +++ b/network_definitions/connection_rate_per_dest_subnet.yaml @@ -10,10 +10,11 @@ tags: - subnet transform: rules: - - input: dstIP - output: dstSubnet - type: add_subnet - parameters: /16 + - type: add_subnet + add_subnet: + input: dstIP + output: dstSubnet + subnet_mask: /16 extract: type: aggregates aggregates: diff --git a/network_definitions/connection_rate_per_src_subnet.yaml b/network_definitions/connection_rate_per_src_subnet.yaml index 74b2d3446..ceaeae071 100644 --- a/network_definitions/connection_rate_per_src_subnet.yaml +++ b/network_definitions/connection_rate_per_src_subnet.yaml @@ -10,10 +10,11 @@ tags: - subnet transform: rules: - - input: srcIP - output: srcSubnet - type: add_subnet - parameters: /16 + - type: add_subnet + add_subnet: + input: srcIP + output: srcSubnet + subnet_mask: /16 extract: type: aggregates aggregates: diff --git a/network_definitions/count_per_src_dest_subnet.yaml b/network_definitions/count_per_src_dest_subnet.yaml index 2da29a792..97827ac0a 100644 --- a/network_definitions/count_per_src_dest_subnet.yaml +++ b/network_definitions/count_per_src_dest_subnet.yaml @@ -12,14 +12,16 @@ tags: - subnet transform: rules: - - input: dstIP - output: dstSubnet24 - type: add_subnet - parameters: /24 - - input: srcIP - output: srcSubnet24 - type: add_subnet - parameters: /24 + - type: add_subnet + add_subnet: + input: dstIP + output: dstSubnet24 + subnet_mask: /24 + - type: add_subnet + add_subnet: + input: srcIP + output: srcSubnet24 + subnet_mask: /24 extract: type: aggregates aggregates: diff --git a/network_definitions/egress_bandwidth_per_dest_subnet.yaml b/network_definitions/egress_bandwidth_per_dest_subnet.yaml index ca9fe6dd2..c02f75e97 100644 --- a/network_definitions/egress_bandwidth_per_dest_subnet.yaml +++ b/network_definitions/egress_bandwidth_per_dest_subnet.yaml @@ -12,10 +12,11 @@ tags: - subnet transform: rules: - - input: dstIP - output: dstSubnet - type: add_subnet - parameters: /16 + - type: add_subnet + add_subnet: + input: dstIP + output: dstSubnet + subnet_mask: /16 extract: type: aggregates aggregates: diff --git a/network_definitions/egress_bandwidth_per_namespace.yaml b/network_definitions/egress_bandwidth_per_namespace.yaml index 69ea576f4..4cf346d9f 100644 --- a/network_definitions/egress_bandwidth_per_namespace.yaml +++ b/network_definitions/egress_bandwidth_per_namespace.yaml @@ -11,10 +11,11 @@ tags: - graph transform: rules: - - input: srcIP - output: srcK8S - type: add_kubernetes - parameters: srcK8S_labels + - type: add_kubernetes + kubernetes: + input: srcIP + output: srcK8S + labels_prefix: srcK8S_labels extract: type: aggregates aggregates: diff --git a/network_definitions/flows_length_histogram.yaml b/network_definitions/flows_length_histogram.yaml index 5ad3e668f..d3968db4f 100644 --- a/network_definitions/flows_length_histogram.yaml +++ b/network_definitions/flows_length_histogram.yaml @@ -10,12 +10,6 @@ tags: - mice - elephant - rate -transform: - rules: - - input: bytes - output: all - type: add_if - parameters: ">=0" extract: type: aggregates aggregates: diff --git a/network_definitions/geo-location_rate_per_dest.yaml b/network_definitions/geo-location_rate_per_dest.yaml index 5baa5da19..e6761c9de 100644 --- a/network_definitions/geo-location_rate_per_dest.yaml +++ b/network_definitions/geo-location_rate_per_dest.yaml @@ -12,9 +12,10 @@ tags: - destinationIP transform: rules: - - input: dstIP - output: dstLocation - type: add_location + - type: add_location + add_location: + input: dstIP + output: dstLocation extract: type: aggregates aggregates: diff --git a/network_definitions/network_services_count.yaml b/network_definitions/network_services_count.yaml index 712a5449c..2e658e4ab 100644 --- a/network_definitions/network_services_count.yaml +++ b/network_definitions/network_services_count.yaml @@ -12,10 +12,11 @@ tags: - destination-protocol transform: rules: - - input: dstPort - output: service - type: add_service - parameters: proto + - type: add_service + add_service: + input: dstPort + output: service + protocol: proto extract: type: aggregates aggregates: diff --git a/pkg/api/api.go b/pkg/api/api.go index 83f921e86..93535c27a 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -43,7 +43,6 @@ const ( ConnTrackType = "conntrack" NoneType = "none" AddRegExIfRuleType = "add_regex_if" - AddIfRuleType = "add_if" AddSubnetRuleType = "add_subnet" AddLocationRuleType = "add_location" AddServiceRuleType = "add_service" diff --git a/pkg/api/encode_prom.go b/pkg/api/encode_prom.go index 001dab3f7..1e54672f9 100644 --- a/pkg/api/encode_prom.go +++ b/pkg/api/encode_prom.go @@ -54,7 +54,7 @@ type MetricsItem struct { ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"` Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"` Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"` - ValueScale float64 `yaml:"valueScale" json:"valueScale" doc:"scale factor of the value (MetricVal := FlowVal / Scale)"` + ValueScale float64 `yaml:"valueScale,omitempty" json:"valueScale,omitempty" doc:"scale factor of the value (MetricVal := FlowVal / Scale)"` } type MetricsItems []MetricsItem @@ -62,7 +62,7 @@ type MetricsItems []MetricsItem type MetricsFilter struct { Key string `yaml:"key" json:"key" doc:"the key to match and filter by"` Value string `yaml:"value" json:"value" doc:"the value to match and filter by"` - Type string `yaml:"type" json:"type" enum:"MetricEncodeFilterTypeEnum" doc:"the type of filter match: equal (default), not_equal, presence, absence, match_regex or not_match_regex"` + Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"MetricEncodeFilterTypeEnum" doc:"the type of filter match: equal (default), not_equal, presence, absence, match_regex or not_match_regex"` } type MetricEncodeFilterTypeEnum struct { diff --git a/pkg/confgen/confgen.go b/pkg/confgen/confgen.go index 00267e604..b4f111740 100644 --- a/pkg/confgen/confgen.go +++ b/pkg/confgen/confgen.go @@ -180,31 +180,31 @@ func (cg *ConfGen) ParseDefinition(name string, bytes []byte) error { Tags: defFile.Tags, } - // parse transport - definition.TransformNetwork, err = cg.parseTransport(&defFile.Transform) + // parse transform + definition.TransformNetwork, err = cg.parseTransform(&defFile.Transform) if err != nil { - log.Debugf("parseTransport err: %v ", err) + log.Debugf("%s: parseTransform err: %v ", name, err) return err } // parse extract definition.Aggregates, definition.ExtractTimebased, err = cg.parseExtract(&defFile.Extract) if err != nil { - log.Debugf("parseExtract err: %v ", err) + log.Debugf("%s: parseExtract err: %v ", name, err) return err } // parse encode definition.PromEncode, err = cg.parseEncode(&defFile.Encode, len(definition.Aggregates.Rules) > 0) if err != nil { - log.Debugf("parseEncode err: %v ", err) + log.Debugf("%s: parseEncode err: %v ", name, err) return err } // parse visualization definition.Visualization, err = cg.parseVisualization(&defFile.Visualization) if err != nil { - log.Debugf("cg.parseVisualization err: %v ", err) + log.Debugf("%s: cg.parseVisualization err: %v ", name, err) return err } diff --git a/pkg/confgen/transform.go b/pkg/confgen/transform.go index a6446f646..a4b40d1d6 100644 --- a/pkg/confgen/transform.go +++ b/pkg/confgen/transform.go @@ -24,7 +24,7 @@ import ( log "github.com/sirupsen/logrus" ) -func (cg *ConfGen) parseTransport(transform *map[string]interface{}) (*api.TransformNetwork, error) { +func (cg *ConfGen) parseTransform(transform *map[string]interface{}) (*api.TransformNetwork, error) { jsoniterJSON := jsoniter.ConfigCompatibleWithStandardLibrary b, err := jsoniterJSON.Marshal(transform) if err != nil { diff --git a/pkg/config/pipeline_builder_test.go b/pkg/config/pipeline_builder_test.go index 95989fa51..4b041b995 100644 --- a/pkg/config/pipeline_builder_test.go +++ b/pkg/config/pipeline_builder_test.go @@ -170,7 +170,7 @@ func TestKafkaPromPipeline(t *testing.T) { b, err = json.Marshal(params[4]) require.NoError(t, err) - require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filters":[{"key":"name","type":"","value":"src_as_connection_count"}],"valueKey":"recent_count","valueScale":0,"labels":["by","aggregate"],"buckets":[]}],"prefix":"flp_"}}}`, string(b)) + require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filters":[{"key":"name","value":"src_as_connection_count"}],"valueKey":"recent_count","labels":["by","aggregate"],"buckets":[]}],"prefix":"flp_"}}}`, string(b)) } func TestForkPipeline(t *testing.T) {