diff --git a/cmd/flowlogs-pipeline/main_test.go b/cmd/flowlogs-pipeline/main_test.go index fca40f8e9..bc714d839 100644 --- a/cmd/flowlogs-pipeline/main_test.go +++ b/cmd/flowlogs-pipeline/main_test.go @@ -50,7 +50,7 @@ func TestPipelineConfigSetup(t *testing.T) { js := `{ "PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]", - "Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"input\":\"SrcAddr\",\"output\":\"SrcK8S\",\"type\":\"add_kubernetes\"},{\"input\":\"DstAddr\",\"output\":\"DstK8S\",\"type\":\"add_kubernetes\"},{\"input\":\"DstPort\",\"output\":\"Service\",\"parameters\":\"Proto\",\"type\":\"add_service\"},{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"parameters\":\"/16\",\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"filter\":{\"key\":\"\",\"value\":\"\"},\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"filter\":{\"key\":\"\",\"value\":\"\"},\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"filter\":{\"key\":\"\",\"value\":\"\"},\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]", + "Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"kubernetes\":{\"input\":\"SrcAddr\",\"output\":\"SrcK8S\"},\"type\":\"add_kubernetes\"},{\"kubernetes\":{\"input\":\"DstAddr\",\"output\":\"DstK8S\"},\"type\":\"add_kubernetes\"},{\"add_service\":{\"input\":\"DstPort\",\"output\":\"Service\",\"protocol\":\"Proto\"},\"type\":\"add_service\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"filter\":{\"key\":\"\",\"value\":\"\"},\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"filter\":{\"key\":\"\",\"value\":\"\"},\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"filter\":{\"key\":\"\",\"value\":\"\"},\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]", "Health": { "Port": "8080" }, diff --git a/pkg/api/transform_network.go b/pkg/api/transform_network.go index 1de44f89a..0b7313109 100644 --- a/pkg/api/transform_network.go +++ b/pkg/api/transform_network.go @@ -64,22 +64,32 @@ func TransformNetworkOperationName(operation string) string { type NetworkTransformRule struct { Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformNetworkOperationEnum" doc:"one of the following:"` - Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"` - KubernetesInfra *K8sInfraRule `yaml:"kubernetes_infra,omitempty" json:"kubernetes_infra,omitempty" doc:"Kubernetes infra rule specific configuration"` - Kubernetes *K8sRule `yaml:"kubernetes,omitempty" json:"kubernetes,omitempty" doc:"Kubernetes rule specific configuration"` - AddSubnet *NetworkAddSubnetRule `yaml:"add_subnet,omitempty" json:"add_subnet,omitempty" doc:"Add subnet rule specific configuration"` - AddLocation *NetworkGenericRule `yaml:"add_location,omitempty" json:"add_location,omitempty" doc:"Add location rule specific configuration"` - AddService *NetworkAddServiceRule `yaml:"add_service,omitempty" json:"add_service,omitempty" doc:"Add service rule specific configuration"` + KubernetesInfra *K8sInfraRule `yaml:"kubernetes_infra,omitempty" json:"kubernetes_infra,omitempty" doc:"Kubernetes infra rule configuration"` + Kubernetes *K8sRule `yaml:"kubernetes,omitempty" json:"kubernetes,omitempty" doc:"Kubernetes rule configuration"` + AddSubnet *NetworkAddSubnetRule `yaml:"add_subnet,omitempty" json:"add_subnet,omitempty" doc:"Add subnet rule configuration"` + AddLocation *NetworkGenericRule `yaml:"add_location,omitempty" json:"add_location,omitempty" doc:"Add location rule configuration"` + AddIPCategory *NetworkGenericRule `yaml:"add_ip_category,omitempty" json:"add_ip_category,omitempty" doc:"Add ip category rule configuration"` + AddService *NetworkAddServiceRule `yaml:"add_service,omitempty" json:"add_service,omitempty" doc:"Add service rule configuration"` } type K8sInfraRule struct { - Inputs []string `yaml:"inputs,omitempty" json:"inputs,omitempty" doc:"entry inputs fields"` - Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` - InfraPrefix []string `yaml:"infra_prefixes,omitempty" json:"infra_prefixes,omitempty" doc:"Namespace prefixes that will be tagged as infra"` + Inputs []string `yaml:"inputs,omitempty" json:"inputs,omitempty" doc:"entry inputs fields"` + Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` + InfraPrefix []string `yaml:"infra_prefixes,omitempty" json:"infra_prefixes,omitempty" doc:"Namespace prefixes that will be tagged as infra"` + InfraRefs []K8sReference `yaml:"infra_refs,omitempty" json:"infra_refs,omitempty" doc:"Additional object references to be tagged as infra"` +} + +type K8sReference struct { + Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"name of the object"` + Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty" doc:"namespace of the object"` } type K8sRule struct { - AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"If true the rule will add the zone"` + Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` + Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` + Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"` + LabelsPrefix string `yaml:"labels_prefix,omitempty" json:"labels_prefix,omitempty" doc:"labels prefix to use to copy input lables, if empty labels will not be copied"` + AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"If true the rule will add the zone"` } type NetworkGenericRule struct { @@ -90,7 +100,7 @@ type NetworkGenericRule struct { type NetworkAddSubnetRule struct { Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` - SubnetMask string `yaml:"protocol,omitempty" json:"protocol,omitempty" doc:"entry protocol field"` + SubnetMask string `yaml:"subnet_mask,omitempty" json:"subnet_mask,omitempty" doc:"subnet mask field"` } type NetworkAddServiceRule struct { diff --git a/pkg/confgen/confgen_test.go b/pkg/confgen/confgen_test.go index 4e2b0b975..a36dee60d 100644 --- a/pkg/confgen/confgen_test.go +++ b/pkg/confgen/confgen_test.go @@ -124,10 +124,12 @@ func Test_RunShortConfGen(t *testing.T) { // Expects transform network require.Len(t, out.Parameters[1].Transform.Network.Rules, 1) require.Equal(t, api.NetworkTransformRule{ - Input: "testInput", - Output: "testOutput", - Type: "add_service", - Parameters: "proto", + Type: "add_service", + AddService: &api.NetworkAddServiceRule{ + Input: "testInput", + Output: "testOutput", + Protocol: "proto", + }, }, out.Parameters[1].Transform.Network.Rules[0]) // Expects aggregates @@ -212,10 +214,12 @@ func Test_RunConfGenNoAgg(t *testing.T) { // Expects transform network require.Len(t, out.Parameters[1].Transform.Network.Rules, 1) require.Equal(t, api.NetworkTransformRule{ - Input: "testInput", - Output: "testOutput", - Type: "add_service", - Parameters: "proto", + Type: "add_service", + AddService: &api.NetworkAddServiceRule{ + Input: "testInput", + Output: "testOutput", + Protocol: "proto", + }, }, out.Parameters[1].Transform.Network.Rules[0]) // Expects prom encode @@ -299,10 +303,12 @@ func Test_RunLongConfGen(t *testing.T) { // Expects transform network require.Len(t, out.Parameters[2].Transform.Network.Rules, 1) require.Equal(t, api.NetworkTransformRule{ - Input: "testInput", - Output: "testOutput", - Type: "add_service", - Parameters: "proto", + Type: "add_service", + AddService: &api.NetworkAddServiceRule{ + Input: "testInput", + Output: "testOutput", + Protocol: "proto", + }, }, out.Parameters[2].Transform.Network.Rules[0]) // Expects aggregates diff --git a/pkg/confgen/dedup.go b/pkg/confgen/dedup.go index 8a069d2d6..4d0e5de5f 100644 --- a/pkg/confgen/dedup.go +++ b/pkg/confgen/dedup.go @@ -29,25 +29,26 @@ func (cg *ConfGen) dedupe() { cg.aggregates.Rules = dedupeAggregateDefinitions(cg.aggregates.Rules) } -type void struct{} - -var voidMember void - func dedupeNetworkTransformRules(rules api.NetworkTransformRules) api.NetworkTransformRules { - // There are no built-in sets in go - //https://stackoverflow.com/a/34020023/2749989 - uniqueSet := make(map[api.NetworkTransformRule]void) - var dedpueSlice []api.NetworkTransformRule + var dedupeSlice []api.NetworkTransformRule for i, rule := range rules { - if _, exists := uniqueSet[rule]; exists { - // duplicate rule - log.Debugf("Remove duplicate NetworkTransformRule %v at index %v", rule, i) + if containsNetworkTransformRule(dedupeSlice, rule) { + // duplicate aggregateDefinition + log.Debugf("Remove duplicate transformation rule %v at index %v", rule, i) continue } - uniqueSet[rule] = voidMember - dedpueSlice = append(dedpueSlice, rule) + dedupeSlice = append(dedupeSlice, rule) } - return dedpueSlice + return dedupeSlice +} + +func containsNetworkTransformRule(slice []api.NetworkTransformRule, rule api.NetworkTransformRule) bool { + for _, item := range slice { + if reflect.DeepEqual(item, rule) { + return true + } + } + return false } // dedupeAggregateDefinitions is inefficient because we can't use a map to look for duplicates. diff --git a/pkg/confgen/dedup_test.go b/pkg/confgen/dedup_test.go index 8795c163f..70bddae15 100644 --- a/pkg/confgen/dedup_test.go +++ b/pkg/confgen/dedup_test.go @@ -26,15 +26,15 @@ import ( func Test_dedupeNetworkTransformRules(t *testing.T) { slice := api.NetworkTransformRules{ - api.NetworkTransformRule{Input: "i1", Output: "o1"}, - api.NetworkTransformRule{Input: "i2", Output: "o2"}, - api.NetworkTransformRule{Input: "i3", Output: "o3"}, - api.NetworkTransformRule{Input: "i2", Output: "o2"}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i1", Output: "o1"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i3", Output: "o3"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}}, } expected := api.NetworkTransformRules{ - api.NetworkTransformRule{Input: "i1", Output: "o1"}, - api.NetworkTransformRule{Input: "i2", Output: "o2"}, - api.NetworkTransformRule{Input: "i3", Output: "o3"}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i1", Output: "o1"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i3", Output: "o3"}}, } actual := dedupeNetworkTransformRules(slice) diff --git a/pkg/config/pipeline_builder_test.go b/pkg/config/pipeline_builder_test.go index d2ef9ebec..12ac230cc 100644 --- a/pkg/config/pipeline_builder_test.go +++ b/pkg/config/pipeline_builder_test.go @@ -29,13 +29,17 @@ import ( func TestLokiPipeline(t *testing.T) { pl := NewCollectorPipeline("ingest", api.IngestCollector{HostName: "127.0.0.1", Port: 9999}) pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{ - Type: api.AddKubernetesRuleType, - Input: "SrcAddr", - Output: "SrcK8S", + Type: api.AddKubernetesRuleType, + Kubernetes: &api.K8sRule{ + Input: "SrcAddr", + Output: "SrcK8S", + }, }, { - Type: api.AddKubernetesRuleType, - Input: "DstAddr", - Output: "DstK8S", + Type: api.AddKubernetesRuleType, + Kubernetes: &api.K8sRule{ + Input: "DstAddr", + Output: "DstK8S", + }, }}}) pl = pl.WriteLoki("loki", api.WriteLoki{URL: "http://loki:3100/"}) stages := pl.GetStages() @@ -54,7 +58,7 @@ func TestLokiPipeline(t *testing.T) { b, err = json.Marshal(params[1]) require.NoError(t, err) - require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"rules":[{"input":"SrcAddr","output":"SrcK8S","type":"add_kubernetes"},{"input":"DstAddr","output":"DstK8S","type":"add_kubernetes"}]}}}`, string(b)) + require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b)) b, err = json.Marshal(params[2]) require.NoError(t, err) @@ -199,13 +203,17 @@ func TestForkPipeline(t *testing.T) { func TestIPFIXPipeline(t *testing.T) { pl := NewCollectorPipeline("ingest", api.IngestCollector{HostName: "127.0.0.1", Port: 9999}) pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{ - Type: api.AddKubernetesRuleType, - Input: "SrcAddr", - Output: "SrcK8S", + Type: api.AddKubernetesRuleType, + Kubernetes: &api.K8sRule{ + Input: "SrcAddr", + Output: "SrcK8S", + }, }, { - Type: api.AddKubernetesRuleType, - Input: "DstAddr", - Output: "DstK8S", + Type: api.AddKubernetesRuleType, + Kubernetes: &api.K8sRule{ + Input: "DstAddr", + Output: "DstK8S", + }, }}}) pl = pl.WriteIpfix("ipfix", api.WriteIpfix{ TargetHost: "ipfix-receiver-test", @@ -229,7 +237,7 @@ func TestIPFIXPipeline(t *testing.T) { b, err = json.Marshal(params[1]) require.NoError(t, err) - require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"rules":[{"input":"SrcAddr","output":"SrcK8S","type":"add_kubernetes"},{"input":"DstAddr","output":"DstK8S","type":"add_kubernetes"}]}}}`, string(b)) + require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b)) b, err = json.Marshal(params[2]) require.NoError(t, err) diff --git a/pkg/pipeline/transform/kubernetes/enrich.go b/pkg/pipeline/transform/kubernetes/enrich.go index 7e220c462..c62d62410 100644 --- a/pkg/pipeline/transform/kubernetes/enrich.go +++ b/pkg/pipeline/transform/kubernetes/enrich.go @@ -2,6 +2,7 @@ package kubernetes import ( "fmt" + "strings" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" @@ -20,7 +21,7 @@ func InitFromConfig(kubeConfigPath string) error { return informers.InitFromConfig(kubeConfigPath) } -func Enrich(outputEntry config.GenericMap, rule api.NetworkTransformRule) { +func Enrich(outputEntry config.GenericMap, rule api.K8sRule) { kubeInfo, err := informers.GetInfo(fmt.Sprintf("%s", outputEntry[rule.Input])) if err != nil { logrus.WithError(err).Tracef("can't find kubernetes info for IP %v", outputEntry[rule.Input]) @@ -36,9 +37,9 @@ func Enrich(outputEntry config.GenericMap, rule api.NetworkTransformRule) { outputEntry[rule.Output+"_Type"] = kubeInfo.Type outputEntry[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name outputEntry[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type - if rule.Parameters != "" { + if rule.LabelsPrefix != "" { for labelKey, labelValue := range kubeInfo.Labels { - outputEntry[rule.Parameters+"_"+labelKey] = labelValue + outputEntry[rule.LabelsPrefix+"_"+labelKey] = labelValue } } if kubeInfo.HostIP != "" { @@ -70,9 +71,9 @@ func Enrich(outputEntry config.GenericMap, rule api.NetworkTransformRule) { outputEntry[rule.Output+"k8s.type"] = kubeInfo.Type outputEntry[rule.Output+"k8s.owner.name"] = kubeInfo.Owner.Name outputEntry[rule.Output+"k8s.owner.type"] = kubeInfo.Owner.Type - if rule.Parameters != "" { + if rule.LabelsPrefix != "" { for labelKey, labelValue := range kubeInfo.Labels { - outputEntry[rule.Parameters+"."+labelKey] = labelValue + outputEntry[rule.LabelsPrefix+"."+labelKey] = labelValue } } if kubeInfo.HostIP != "" { @@ -87,8 +88,8 @@ func Enrich(outputEntry config.GenericMap, rule api.NetworkTransformRule) { const nodeZoneLabelName = "topology.kubernetes.io/zone" -func fillInK8sZone(outputEntry config.GenericMap, rule api.NetworkTransformRule, kubeInfo inf.Info, zonePrefix string) { - if rule.Kubernetes == nil || !rule.Kubernetes.AddZone { +func fillInK8sZone(outputEntry config.GenericMap, rule api.K8sRule, kubeInfo inf.Info, zonePrefix string) { + if !rule.AddZone { //Nothing to do return } @@ -119,39 +120,34 @@ func fillInK8sZone(outputEntry config.GenericMap, rule api.NetworkTransformRule, } } -func EnrichLayer(outputEntry config.GenericMap, rule api.NetworkTransformRule) { - if rule.KubernetesInfra == nil { - logrus.Error("transformation rule: Missing Kubernetes Infra configuration ") - return - } - outputEntry[rule.KubernetesInfra.Output] = "infra" - for _, input := range rule.KubernetesInfra.Inputs { - if objectIsApp(fmt.Sprintf("%s", outputEntry[input]), rule.KubernetesInfra.InfraPrefix) { - outputEntry[rule.KubernetesInfra.Output] = "app" +func EnrichLayer(outputEntry config.GenericMap, rule api.K8sInfraRule) { + outputEntry[rule.Output] = "infra" + for _, input := range rule.Inputs { + if objectIsApp(fmt.Sprintf("%s", outputEntry[input]), rule) { + outputEntry[rule.Output] = "app" return } } } -const openshiftNamespacePrefix = "openshift-" -const openshiftPrefixLen = len(openshiftNamespacePrefix) - -func objectIsApp(addr string, additionalInfraPrefix string) bool { +func objectIsApp(addr string, rule api.K8sInfraRule) bool { obj, err := informers.GetInfo(addr) if err != nil { logrus.WithError(err).Tracef("can't find kubernetes info for IP %s", addr) return false } - nsLen := len(obj.Namespace) - additionalPrefixLen := len(additionalInfraPrefix) - if nsLen == 0 { + if len(obj.Namespace) == 0 { return false } - if nsLen >= openshiftPrefixLen && obj.Namespace[:openshiftPrefixLen] == openshiftNamespacePrefix { - return false + for _, prefix := range rule.InfraPrefix { + if strings.HasPrefix(obj.Namespace, prefix) { + return false + } } - if nsLen >= additionalPrefixLen && obj.Namespace[:additionalPrefixLen] == additionalInfraPrefix { - return false + for _, ref := range rule.InfraRefs { + if obj.Namespace == ref.Namespace && obj.Name == ref.Name { + return false + } } //Special case with openshift and kubernetes service in default namespace if obj.Namespace == "default" && (obj.Name == "kubernetes" || obj.Name == "openshift") { diff --git a/pkg/pipeline/transform/kubernetes/enrich_test.go b/pkg/pipeline/transform/kubernetes/enrich_test.go index a3e7a4658..d38bf7c34 100644 --- a/pkg/pipeline/transform/kubernetes/enrich_test.go +++ b/pkg/pipeline/transform/kubernetes/enrich_test.go @@ -63,18 +63,18 @@ var nodes = map[string]*inf.Info{ var rules = api.NetworkTransformRules{ { - Type: api.OpAddKubernetes, - Input: "SrcAddr", - Output: "SrcK8s", + Type: api.OpAddKubernetes, Kubernetes: &api.K8sRule{ + Input: "SrcAddr", + Output: "SrcK8s", AddZone: true, }, }, { - Type: api.OpAddKubernetes, - Input: "DstAddr", - Output: "DstK8s", + Type: api.OpAddKubernetes, Kubernetes: &api.K8sRule{ + Input: "DstAddr", + Output: "DstK8s", AddZone: true, }, }, @@ -89,7 +89,7 @@ func TestEnrich(t *testing.T) { "DstAddr": "42.42.42.42", // unknown } for _, r := range rules { - Enrich(entry, r) + Enrich(entry, *r.Kubernetes) } assert.Equal(t, config.GenericMap{ "DstAddr": "42.42.42.42", @@ -110,7 +110,7 @@ func TestEnrich(t *testing.T) { "DstAddr": "10.0.0.2", // pod-2 } for _, r := range rules { - Enrich(entry, r) + Enrich(entry, *r.Kubernetes) } assert.Equal(t, config.GenericMap{ "DstAddr": "10.0.0.2", @@ -139,7 +139,7 @@ func TestEnrich(t *testing.T) { "DstAddr": "20.0.0.1", // service-1 } for _, r := range rules { - Enrich(entry, r) + Enrich(entry, *r.Kubernetes) } assert.Equal(t, config.GenericMap{ "DstAddr": "20.0.0.1", @@ -162,21 +162,21 @@ func TestEnrich(t *testing.T) { var otelRules = api.NetworkTransformRules{ { - Type: api.OpAddKubernetes, - Input: "source.ip", - Output: "source.", - Assignee: "otel", + Type: api.OpAddKubernetes, Kubernetes: &api.K8sRule{ - AddZone: true, + Input: "source.ip", + Output: "source.", + Assignee: "otel", + AddZone: true, }, }, { - Type: api.OpAddKubernetes, - Input: "destination.ip", - Output: "destination.", - Assignee: "otel", + Type: api.OpAddKubernetes, Kubernetes: &api.K8sRule{ - AddZone: true, + Input: "destination.ip", + Output: "destination.", + Assignee: "otel", + AddZone: true, }, }, } @@ -190,7 +190,7 @@ func TestEnrich_Otel(t *testing.T) { "destination.ip": "42.42.42.42", // unknown } for _, r := range otelRules { - Enrich(entry, r) + Enrich(entry, *r.Kubernetes) } assert.Equal(t, config.GenericMap{ "destination.ip": "42.42.42.42", @@ -213,7 +213,7 @@ func TestEnrich_Otel(t *testing.T) { "destination.ip": "10.0.0.2", // pod-2 } for _, r := range otelRules { - Enrich(entry, r) + Enrich(entry, *r.Kubernetes) } assert.Equal(t, config.GenericMap{ "destination.ip": "10.0.0.2", @@ -246,7 +246,7 @@ func TestEnrich_Otel(t *testing.T) { "destination.ip": "20.0.0.1", // service-1 } for _, r := range otelRules { - Enrich(entry, r) + Enrich(entry, *r.Kubernetes) } assert.Equal(t, config.GenericMap{ "destination.ip": "20.0.0.1", @@ -282,7 +282,7 @@ func TestEnrich_EmptyNamespace(t *testing.T) { } for _, r := range rules { - Enrich(entry, r) + Enrich(entry, *r.Kubernetes) } assert.NotContains(t, entry, "SrcK8s_Namespace") diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index ec1572348..a92be790e 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -87,7 +87,7 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo log.Errorf("Missing add service configuration") continue } - protocol := fmt.Sprintf("%v", outputEntry[rule.Parameters]) + protocol := fmt.Sprintf("%v", outputEntry[rule.AddService.Protocol]) portNumber, err := strconv.Atoi(fmt.Sprintf("%v", outputEntry[rule.AddService.Input])) if err != nil { log.Errorf("Can't convert port to int: Port %v - err %v", outputEntry[rule.AddService.Input], err) @@ -110,19 +110,27 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo } outputEntry[rule.AddService.Output] = serviceName case api.OpAddKubernetes: - kubernetes.Enrich(outputEntry, rule) + kubernetes.Enrich(outputEntry, *rule.Kubernetes) case api.OpAddKubernetesInfra: - kubernetes.EnrichLayer(outputEntry, rule) + if rule.KubernetesInfra == nil { + logrus.Error("transformation rule: Missing configuration ") + continue + } + kubernetes.EnrichLayer(outputEntry, *rule.KubernetesInfra) case api.OpReinterpretDirection: reinterpretDirection(outputEntry, &n.DirectionInfo) case api.OpAddIPCategory: - if strIP, ok := outputEntry[rule.Input].(string); ok { + if rule.AddIPCategory == nil { + logrus.Error("AddIPCategory rule: Missing configuration ") + continue + } + if strIP, ok := outputEntry[rule.AddIPCategory.Input].(string); ok { cat, ok := n.ipCatCache.GetCacheEntry(strIP) if !ok { cat = n.categorizeIP(net.ParseIP(strIP)) n.ipCatCache.UpdateCacheEntry(strIP, cat) } - outputEntry[rule.Output] = cat + outputEntry[rule.AddIPCategory.Output] = cat } default: diff --git a/pkg/pipeline/transform/transform_network_test.go b/pkg/pipeline/transform/transform_network_test.go index 58a2409f0..21da21cbf 100644 --- a/pkg/pipeline/transform/transform_network_test.go +++ b/pkg/pipeline/transform/transform_network_test.go @@ -33,45 +33,59 @@ import ( func getMockNetworkTransformRules() api.NetworkTransformRules { return api.NetworkTransformRules{ api.NetworkTransformRule{ - Input: "srcIP", - Output: "subnet16SrcIP", - Type: "add_subnet", - Parameters: "/16", + Type: "add_subnet", + AddSubnet: &api.NetworkAddSubnetRule{ + Input: "srcIP", + Output: "subnet16SrcIP", + SubnetMask: "/16", + }, }, api.NetworkTransformRule{ - Input: "srcIP", - Output: "subnet24SrcIP", - Type: "add_subnet", - Parameters: "/24", + Type: "add_subnet", + AddSubnet: &api.NetworkAddSubnetRule{ + Input: "srcIP", + Output: "subnet24SrcIP", + SubnetMask: "/24", + }, }, api.NetworkTransformRule{ - Input: "emptyIP", - Output: "cidr_fail_skip", - Type: "add_subnet", - Parameters: "/16", + Type: "add_subnet", + AddSubnet: &api.NetworkAddSubnetRule{ + Input: "emptyIP", + Output: "cidr_fail_skip", + SubnetMask: "/16", + }, }, api.NetworkTransformRule{ - Input: "dstPort", - Output: "service", - Type: "add_service", - Parameters: "protocol", + Type: "add_service", + AddService: &api.NetworkAddServiceRule{ + Input: "dstPort", + Output: "service", + Protocol: "protocol", + }, }, api.NetworkTransformRule{ - Input: "dstPort", - Output: "service_protocol_num", - Type: "add_service", - Parameters: "protocol_num", + Type: "add_service", + AddService: &api.NetworkAddServiceRule{ + Input: "dstPort", + Output: "service_protocol_num", + Protocol: "protocol_num", + }, }, api.NetworkTransformRule{ - Input: "srcPort", - Output: "unknown_service", - Type: "add_service", - Parameters: "protocol", + Type: "add_service", + AddService: &api.NetworkAddServiceRule{ + Input: "srcPort", + Output: "unknown_service", + Protocol: "protocol", + }, }, api.NetworkTransformRule{ - Input: "8888IP", - Output: "8888IP_location", - Type: "add_location", + Type: "add_location", + AddLocation: &api.NetworkGenericRule{ + Input: "8888IP", + Output: "8888IP_location", + }, }, } } @@ -173,10 +187,11 @@ parameters: type: network network: rules: - - input: srcIP - output: subnetSrcIP - type: add_subnet - parameters: /24 + - type: add_subnet + add_subnet: + input: srcIP + output: subnetSrcIP + subnet_mask: /24 - name: write1 write: type: stdout @@ -212,10 +227,10 @@ func Test_Categorize(t *testing.T) { Transform: &config.Transform{ Network: &api.TransformNetwork{ Rules: []api.NetworkTransformRule{ - {Type: api.OpAddIPCategory, Input: "addr1", Output: "cat1"}, - {Type: api.OpAddIPCategory, Input: "addr2", Output: "cat2"}, - {Type: api.OpAddIPCategory, Input: "addr3", Output: "cat3"}, - {Type: api.OpAddIPCategory, Input: "addr4", Output: "cat4"}, + {Type: api.OpAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr1", Output: "cat1"}}, + {Type: api.OpAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr2", Output: "cat2"}}, + {Type: api.OpAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr3", Output: "cat3"}}, + {Type: api.OpAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr4", Output: "cat4"}}, }, IPCategories: []api.NetworkTransformIPCategory{{ Name: "Pods overlay", diff --git a/pkg/test/e2e/pipline/flp-config.yaml b/pkg/test/e2e/pipline/flp-config.yaml index 2435ff45f..2f0f2b8d3 100644 --- a/pkg/test/e2e/pipline/flp-config.yaml +++ b/pkg/test/e2e/pipline/flp-config.yaml @@ -42,42 +42,48 @@ data: transform: network: rules: - - input: dstPort - output: service - type: add_service - parameters: proto - - input: dstIP - output: dstSubnet24 - type: add_subnet - parameters: /24 - - input: srcIP - output: srcSubnet24 - type: add_subnet - parameters: /24 - - input: srcIP - output: srcSubnet - type: add_subnet - parameters: /16 - - input: dstIP - output: dstSubnet - type: add_subnet - parameters: /16 - - input: srcIP - output: srcK8S - type: add_kubernetes - parameters: srcK8S_labels - - input: dstIP - output: dstLocation - type: add_location - parameters: "" - - input: bytes - output: mice - type: add_if - parameters: <512 - - input: bytes - output: elephant - type: add_if - parameters: '>=512' + - type: add_service + add_service: + input: dstPort + output: service + protocol: proto + - type: add_subnet + add_subnet: + input: dstIP + output: dstSubnet24 + subnet_mask: /24 + - type: add_subnet + add_subnet: + input: srcIP + output: srcSubnet24 + subnet_mask: /24 + - type: add_subnet + add_subnet: + input: srcIP + output: srcSubnet + parameters: /16 + - type: add_subnet + add_subnet: + input: dstIP + output: dstSubnet + subnet_mask: /16 + - type: add_kubernetes + kubernetes: + input: srcIP + output: srcK8S + labels_prefix: srcK8S_labels + - type: add_location + add_location: + input: dstIP + output: dstLocation + # - input: bytes + # output: mice + # type: add_if + # parameters: <512 + # - input: bytes + # output: elephant + # type: add_if + # parameters: '>=512' type: network - extract: aggregates: diff --git a/pkg/test/network_defs.go b/pkg/test/network_defs.go index 64d396edd..5d29ce1ed 100644 --- a/pkg/test/network_defs.go +++ b/pkg/test/network_defs.go @@ -67,10 +67,11 @@ tags: - label transform: rules: - - input: testInput - output: testOutput - type: add_service - parameters: proto + - type: add_service + add_service: + input: testInput + output: testOutput + protocol: proto extract: types: aggregates aggregates: @@ -149,10 +150,11 @@ tags: - label transform: rules: - - input: testInput - output: testOutput - type: add_service - parameters: proto + - type: add_service + add_service: + input: testInput + output: testOutput + protocol: proto encode: type: prom prom: