diff --git a/cmd/flowlogs-pipeline/main_test.go b/cmd/flowlogs-pipeline/main_test.go index b9f450821..27939084c 100644 --- a/cmd/flowlogs-pipeline/main_test.go +++ b/cmd/flowlogs-pipeline/main_test.go @@ -50,7 +50,7 @@ func TestPipelineConfigSetup(t *testing.T) { js := `{ "PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]", - "Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"input\":\"SrcAddr\",\"output\":\"SrcK8S\",\"type\":\"add_kubernetes\"},{\"input\":\"DstAddr\",\"output\":\"DstK8S\",\"type\":\"add_kubernetes\"},{\"input\":\"DstPort\",\"output\":\"Service\",\"parameters\":\"Proto\",\"type\":\"add_service\"},{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"parameters\":\"/16\",\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]", + "Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"kubernetes\":{\"input\":\"SrcAddr\",\"output\":\"SrcK8S\"},\"type\":\"add_kubernetes\"},{\"kubernetes\":{\"input\":\"DstAddr\",\"output\":\"DstK8S\"},\"type\":\"add_kubernetes\"},{\"add_service\":{\"input\":\"DstPort\",\"output\":\"Service\",\"protocol\":\"Proto\"},\"type\":\"add_service\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]", "Health": { "Port": "8080" }, diff --git a/pkg/api/transform_network.go b/pkg/api/transform_network.go index 28167bbbd..1156c6276 100644 --- a/pkg/api/transform_network.go +++ b/pkg/api/transform_network.go @@ -63,23 +63,50 @@ func TransformNetworkOperationName(operation string) string { } type NetworkTransformRule struct { - Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` - Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` - Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformNetworkOperationEnum" doc:"one of the following:"` - Parameters string `yaml:"parameters,omitempty" json:"parameters,omitempty" doc:"parameters specific to type"` - Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"` - KubernetesInfra *K8sInfraRule `yaml:"kubernetes_infra,omitempty" json:"kubernetes_infra,omitempty" doc:"Kubernetes infra rule specific configuration"` - Kubernetes *K8sRule `yaml:"kubernetes,omitempty" json:"kubernetes,omitempty" doc:"Kubernetes rule specific configuration"` + Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformNetworkOperationEnum" doc:"one of the following:"` + KubernetesInfra *K8sInfraRule `yaml:"kubernetes_infra,omitempty" json:"kubernetes_infra,omitempty" doc:"Kubernetes infra rule configuration"` + Kubernetes *K8sRule `yaml:"kubernetes,omitempty" json:"kubernetes,omitempty" doc:"Kubernetes rule configuration"` + AddSubnet *NetworkAddSubnetRule `yaml:"add_subnet,omitempty" json:"add_subnet,omitempty" doc:"Add subnet rule configuration"` + AddLocation *NetworkGenericRule `yaml:"add_location,omitempty" json:"add_location,omitempty" doc:"Add location rule configuration"` + AddIPCategory *NetworkGenericRule `yaml:"add_ip_category,omitempty" json:"add_ip_category,omitempty" doc:"Add ip category rule configuration"` + AddService *NetworkAddServiceRule `yaml:"add_service,omitempty" json:"add_service,omitempty" doc:"Add service rule configuration"` } type K8sInfraRule struct { - Inputs []string `yaml:"inputs,omitempty" json:"inputs,omitempty" doc:"entry inputs fields"` - Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` - InfraPrefix string `yaml:"infra_prefixes,omitempty" json:"infra_prefixes,omitempty" doc:"Namespace prefixes that will be tagged as infra"` + Inputs []string `yaml:"inputs,omitempty" json:"inputs,omitempty" doc:"entry inputs fields"` + Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` + InfraPrefixes []string `yaml:"infra_prefixes,omitempty" json:"infra_prefixes,omitempty" doc:"Namespace prefixes that will be tagged as infra"` + InfraRefs []K8sReference `yaml:"infra_refs,omitempty" json:"infra_refs,omitempty" doc:"Additional object references to be tagged as infra"` +} + +type K8sReference struct { + Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"name of the object"` + Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty" doc:"namespace of the object"` } type K8sRule struct { - AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"If true the rule will add the zone"` + Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` + Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` + Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"` + LabelsPrefix string `yaml:"labels_prefix,omitempty" json:"labels_prefix,omitempty" doc:"labels prefix to use to copy input lables, if empty labels will not be copied"` + AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"If true the rule will add the zone"` +} + +type NetworkGenericRule struct { + Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` + Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` +} + +type NetworkAddSubnetRule struct { + Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` + Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` + SubnetMask string `yaml:"subnet_mask,omitempty" json:"subnet_mask,omitempty" doc:"subnet mask field"` +} + +type NetworkAddServiceRule struct { + Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` + Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` + Protocol string `yaml:"protocol,omitempty" json:"protocol,omitempty" doc:"entry protocol field"` } type NetworkTransformDirectionInfo struct { diff --git a/pkg/confgen/confgen_test.go b/pkg/confgen/confgen_test.go index 4e2b0b975..a36dee60d 100644 --- a/pkg/confgen/confgen_test.go +++ b/pkg/confgen/confgen_test.go @@ -124,10 +124,12 @@ func Test_RunShortConfGen(t *testing.T) { // Expects transform network require.Len(t, out.Parameters[1].Transform.Network.Rules, 1) require.Equal(t, api.NetworkTransformRule{ - Input: "testInput", - Output: "testOutput", - Type: "add_service", - Parameters: "proto", + Type: "add_service", + AddService: &api.NetworkAddServiceRule{ + Input: "testInput", + Output: "testOutput", + Protocol: "proto", + }, }, out.Parameters[1].Transform.Network.Rules[0]) // Expects aggregates @@ -212,10 +214,12 @@ func Test_RunConfGenNoAgg(t *testing.T) { // Expects transform network require.Len(t, out.Parameters[1].Transform.Network.Rules, 1) require.Equal(t, api.NetworkTransformRule{ - Input: "testInput", - Output: "testOutput", - Type: "add_service", - Parameters: "proto", + Type: "add_service", + AddService: &api.NetworkAddServiceRule{ + Input: "testInput", + Output: "testOutput", + Protocol: "proto", + }, }, out.Parameters[1].Transform.Network.Rules[0]) // Expects prom encode @@ -299,10 +303,12 @@ func Test_RunLongConfGen(t *testing.T) { // Expects transform network require.Len(t, out.Parameters[2].Transform.Network.Rules, 1) require.Equal(t, api.NetworkTransformRule{ - Input: "testInput", - Output: "testOutput", - Type: "add_service", - Parameters: "proto", + Type: "add_service", + AddService: &api.NetworkAddServiceRule{ + Input: "testInput", + Output: "testOutput", + Protocol: "proto", + }, }, out.Parameters[2].Transform.Network.Rules[0]) // Expects aggregates diff --git a/pkg/confgen/dedup.go b/pkg/confgen/dedup.go index 8a069d2d6..4d0e5de5f 100644 --- a/pkg/confgen/dedup.go +++ b/pkg/confgen/dedup.go @@ -29,25 +29,26 @@ func (cg *ConfGen) dedupe() { cg.aggregates.Rules = dedupeAggregateDefinitions(cg.aggregates.Rules) } -type void struct{} - -var voidMember void - func dedupeNetworkTransformRules(rules api.NetworkTransformRules) api.NetworkTransformRules { - // There are no built-in sets in go - //https://stackoverflow.com/a/34020023/2749989 - uniqueSet := make(map[api.NetworkTransformRule]void) - var dedpueSlice []api.NetworkTransformRule + var dedupeSlice []api.NetworkTransformRule for i, rule := range rules { - if _, exists := uniqueSet[rule]; exists { - // duplicate rule - log.Debugf("Remove duplicate NetworkTransformRule %v at index %v", rule, i) + if containsNetworkTransformRule(dedupeSlice, rule) { + // duplicate aggregateDefinition + log.Debugf("Remove duplicate transformation rule %v at index %v", rule, i) continue } - uniqueSet[rule] = voidMember - dedpueSlice = append(dedpueSlice, rule) + dedupeSlice = append(dedupeSlice, rule) } - return dedpueSlice + return dedupeSlice +} + +func containsNetworkTransformRule(slice []api.NetworkTransformRule, rule api.NetworkTransformRule) bool { + for _, item := range slice { + if reflect.DeepEqual(item, rule) { + return true + } + } + return false } // dedupeAggregateDefinitions is inefficient because we can't use a map to look for duplicates. diff --git a/pkg/confgen/dedup_test.go b/pkg/confgen/dedup_test.go index 8795c163f..70bddae15 100644 --- a/pkg/confgen/dedup_test.go +++ b/pkg/confgen/dedup_test.go @@ -26,15 +26,15 @@ import ( func Test_dedupeNetworkTransformRules(t *testing.T) { slice := api.NetworkTransformRules{ - api.NetworkTransformRule{Input: "i1", Output: "o1"}, - api.NetworkTransformRule{Input: "i2", Output: "o2"}, - api.NetworkTransformRule{Input: "i3", Output: "o3"}, - api.NetworkTransformRule{Input: "i2", Output: "o2"}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i1", Output: "o1"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i3", Output: "o3"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}}, } expected := api.NetworkTransformRules{ - api.NetworkTransformRule{Input: "i1", Output: "o1"}, - api.NetworkTransformRule{Input: "i2", Output: "o2"}, - api.NetworkTransformRule{Input: "i3", Output: "o3"}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i1", Output: "o1"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i3", Output: "o3"}}, } actual := dedupeNetworkTransformRules(slice) diff --git a/pkg/config/pipeline_builder_test.go b/pkg/config/pipeline_builder_test.go index d5670de4c..5711dae26 100644 --- a/pkg/config/pipeline_builder_test.go +++ b/pkg/config/pipeline_builder_test.go @@ -29,13 +29,17 @@ import ( func TestLokiPipeline(t *testing.T) { pl := NewCollectorPipeline("ingest", api.IngestCollector{HostName: "127.0.0.1", Port: 9999}) pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{ - Type: api.AddKubernetesRuleType, - Input: "SrcAddr", - Output: "SrcK8S", + Type: api.AddKubernetesRuleType, + Kubernetes: &api.K8sRule{ + Input: "SrcAddr", + Output: "SrcK8S", + }, }, { - Type: api.AddKubernetesRuleType, - Input: "DstAddr", - Output: "DstK8S", + Type: api.AddKubernetesRuleType, + Kubernetes: &api.K8sRule{ + Input: "DstAddr", + Output: "DstK8S", + }, }}}) pl = pl.WriteLoki("loki", api.WriteLoki{URL: "http://loki:3100/"}) stages := pl.GetStages() @@ -54,7 +58,7 @@ func TestLokiPipeline(t *testing.T) { b, err = json.Marshal(params[1]) require.NoError(t, err) - require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"rules":[{"input":"SrcAddr","output":"SrcK8S","type":"add_kubernetes"},{"input":"DstAddr","output":"DstK8S","type":"add_kubernetes"}]}}}`, string(b)) + require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b)) b, err = json.Marshal(params[2]) require.NoError(t, err) @@ -199,13 +203,17 @@ func TestForkPipeline(t *testing.T) { func TestIPFIXPipeline(t *testing.T) { pl := NewCollectorPipeline("ingest", api.IngestCollector{HostName: "127.0.0.1", Port: 9999}) pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{ - Type: api.AddKubernetesRuleType, - Input: "SrcAddr", - Output: "SrcK8S", + Type: api.AddKubernetesRuleType, + Kubernetes: &api.K8sRule{ + Input: "SrcAddr", + Output: "SrcK8S", + }, }, { - Type: api.AddKubernetesRuleType, - Input: "DstAddr", - Output: "DstK8S", + Type: api.AddKubernetesRuleType, + Kubernetes: &api.K8sRule{ + Input: "DstAddr", + Output: "DstK8S", + }, }}}) pl = pl.WriteIpfix("ipfix", api.WriteIpfix{ TargetHost: "ipfix-receiver-test", @@ -229,7 +237,7 @@ func TestIPFIXPipeline(t *testing.T) { b, err = json.Marshal(params[1]) require.NoError(t, err) - require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"rules":[{"input":"SrcAddr","output":"SrcK8S","type":"add_kubernetes"},{"input":"DstAddr","output":"DstK8S","type":"add_kubernetes"}]}}}`, string(b)) + require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b)) b, err = json.Marshal(params[2]) require.NoError(t, err) diff --git a/pkg/pipeline/transform/kubernetes/enrich.go b/pkg/pipeline/transform/kubernetes/enrich.go index 7e220c462..e6ed55bc6 100644 --- a/pkg/pipeline/transform/kubernetes/enrich.go +++ b/pkg/pipeline/transform/kubernetes/enrich.go @@ -2,6 +2,7 @@ package kubernetes import ( "fmt" + "strings" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" @@ -20,7 +21,7 @@ func InitFromConfig(kubeConfigPath string) error { return informers.InitFromConfig(kubeConfigPath) } -func Enrich(outputEntry config.GenericMap, rule api.NetworkTransformRule) { +func Enrich(outputEntry config.GenericMap, rule api.K8sRule) { kubeInfo, err := informers.GetInfo(fmt.Sprintf("%s", outputEntry[rule.Input])) if err != nil { logrus.WithError(err).Tracef("can't find kubernetes info for IP %v", outputEntry[rule.Input]) @@ -36,9 +37,9 @@ func Enrich(outputEntry config.GenericMap, rule api.NetworkTransformRule) { outputEntry[rule.Output+"_Type"] = kubeInfo.Type outputEntry[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name outputEntry[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type - if rule.Parameters != "" { + if rule.LabelsPrefix != "" { for labelKey, labelValue := range kubeInfo.Labels { - outputEntry[rule.Parameters+"_"+labelKey] = labelValue + outputEntry[rule.LabelsPrefix+"_"+labelKey] = labelValue } } if kubeInfo.HostIP != "" { @@ -70,9 +71,9 @@ func Enrich(outputEntry config.GenericMap, rule api.NetworkTransformRule) { outputEntry[rule.Output+"k8s.type"] = kubeInfo.Type outputEntry[rule.Output+"k8s.owner.name"] = kubeInfo.Owner.Name outputEntry[rule.Output+"k8s.owner.type"] = kubeInfo.Owner.Type - if rule.Parameters != "" { + if rule.LabelsPrefix != "" { for labelKey, labelValue := range kubeInfo.Labels { - outputEntry[rule.Parameters+"."+labelKey] = labelValue + outputEntry[rule.LabelsPrefix+"."+labelKey] = labelValue } } if kubeInfo.HostIP != "" { @@ -87,8 +88,8 @@ func Enrich(outputEntry config.GenericMap, rule api.NetworkTransformRule) { const nodeZoneLabelName = "topology.kubernetes.io/zone" -func fillInK8sZone(outputEntry config.GenericMap, rule api.NetworkTransformRule, kubeInfo inf.Info, zonePrefix string) { - if rule.Kubernetes == nil || !rule.Kubernetes.AddZone { +func fillInK8sZone(outputEntry config.GenericMap, rule api.K8sRule, kubeInfo inf.Info, zonePrefix string) { + if !rule.AddZone { //Nothing to do return } @@ -119,39 +120,34 @@ func fillInK8sZone(outputEntry config.GenericMap, rule api.NetworkTransformRule, } } -func EnrichLayer(outputEntry config.GenericMap, rule api.NetworkTransformRule) { - if rule.KubernetesInfra == nil { - logrus.Error("transformation rule: Missing Kubernetes Infra configuration ") - return - } - outputEntry[rule.KubernetesInfra.Output] = "infra" - for _, input := range rule.KubernetesInfra.Inputs { - if objectIsApp(fmt.Sprintf("%s", outputEntry[input]), rule.KubernetesInfra.InfraPrefix) { - outputEntry[rule.KubernetesInfra.Output] = "app" +func EnrichLayer(outputEntry config.GenericMap, rule api.K8sInfraRule) { + outputEntry[rule.Output] = "infra" + for _, input := range rule.Inputs { + if objectIsApp(fmt.Sprintf("%s", outputEntry[input]), rule) { + outputEntry[rule.Output] = "app" return } } } -const openshiftNamespacePrefix = "openshift-" -const openshiftPrefixLen = len(openshiftNamespacePrefix) - -func objectIsApp(addr string, additionalInfraPrefix string) bool { +func objectIsApp(addr string, rule api.K8sInfraRule) bool { obj, err := informers.GetInfo(addr) if err != nil { logrus.WithError(err).Tracef("can't find kubernetes info for IP %s", addr) return false } - nsLen := len(obj.Namespace) - additionalPrefixLen := len(additionalInfraPrefix) - if nsLen == 0 { + if len(obj.Namespace) == 0 { return false } - if nsLen >= openshiftPrefixLen && obj.Namespace[:openshiftPrefixLen] == openshiftNamespacePrefix { - return false + for _, prefix := range rule.InfraPrefixes { + if strings.HasPrefix(obj.Namespace, prefix) { + return false + } } - if nsLen >= additionalPrefixLen && obj.Namespace[:additionalPrefixLen] == additionalInfraPrefix { - return false + for _, ref := range rule.InfraRefs { + if obj.Namespace == ref.Namespace && obj.Name == ref.Name { + return false + } } //Special case with openshift and kubernetes service in default namespace if obj.Namespace == "default" && (obj.Name == "kubernetes" || obj.Name == "openshift") { diff --git a/pkg/pipeline/transform/kubernetes/enrich_test.go b/pkg/pipeline/transform/kubernetes/enrich_test.go index a3e7a4658..c4598bf4a 100644 --- a/pkg/pipeline/transform/kubernetes/enrich_test.go +++ b/pkg/pipeline/transform/kubernetes/enrich_test.go @@ -63,18 +63,18 @@ var nodes = map[string]*inf.Info{ var rules = api.NetworkTransformRules{ { - Type: api.OpAddKubernetes, - Input: "SrcAddr", - Output: "SrcK8s", + Type: api.OpAddKubernetes, Kubernetes: &api.K8sRule{ + Input: "SrcAddr", + Output: "SrcK8s", AddZone: true, }, }, { - Type: api.OpAddKubernetes, - Input: "DstAddr", - Output: "DstK8s", + Type: api.OpAddKubernetes, Kubernetes: &api.K8sRule{ + Input: "DstAddr", + Output: "DstK8s", AddZone: true, }, }, @@ -89,7 +89,7 @@ func TestEnrich(t *testing.T) { "DstAddr": "42.42.42.42", // unknown } for _, r := range rules { - Enrich(entry, r) + Enrich(entry, *r.Kubernetes) } assert.Equal(t, config.GenericMap{ "DstAddr": "42.42.42.42", @@ -110,7 +110,7 @@ func TestEnrich(t *testing.T) { "DstAddr": "10.0.0.2", // pod-2 } for _, r := range rules { - Enrich(entry, r) + Enrich(entry, *r.Kubernetes) } assert.Equal(t, config.GenericMap{ "DstAddr": "10.0.0.2", @@ -139,7 +139,7 @@ func TestEnrich(t *testing.T) { "DstAddr": "20.0.0.1", // service-1 } for _, r := range rules { - Enrich(entry, r) + Enrich(entry, *r.Kubernetes) } assert.Equal(t, config.GenericMap{ "DstAddr": "20.0.0.1", @@ -162,21 +162,21 @@ func TestEnrich(t *testing.T) { var otelRules = api.NetworkTransformRules{ { - Type: api.OpAddKubernetes, - Input: "source.ip", - Output: "source.", - Assignee: "otel", + Type: api.OpAddKubernetes, Kubernetes: &api.K8sRule{ - AddZone: true, + Input: "source.ip", + Output: "source.", + Assignee: "otel", + AddZone: true, }, }, { - Type: api.OpAddKubernetes, - Input: "destination.ip", - Output: "destination.", - Assignee: "otel", + Type: api.OpAddKubernetes, Kubernetes: &api.K8sRule{ - AddZone: true, + Input: "destination.ip", + Output: "destination.", + Assignee: "otel", + AddZone: true, }, }, } @@ -190,7 +190,7 @@ func TestEnrich_Otel(t *testing.T) { "destination.ip": "42.42.42.42", // unknown } for _, r := range otelRules { - Enrich(entry, r) + Enrich(entry, *r.Kubernetes) } assert.Equal(t, config.GenericMap{ "destination.ip": "42.42.42.42", @@ -213,7 +213,7 @@ func TestEnrich_Otel(t *testing.T) { "destination.ip": "10.0.0.2", // pod-2 } for _, r := range otelRules { - Enrich(entry, r) + Enrich(entry, *r.Kubernetes) } assert.Equal(t, config.GenericMap{ "destination.ip": "10.0.0.2", @@ -246,7 +246,7 @@ func TestEnrich_Otel(t *testing.T) { "destination.ip": "20.0.0.1", // service-1 } for _, r := range otelRules { - Enrich(entry, r) + Enrich(entry, *r.Kubernetes) } assert.Equal(t, config.GenericMap{ "destination.ip": "20.0.0.1", @@ -282,9 +282,150 @@ func TestEnrich_EmptyNamespace(t *testing.T) { } for _, r := range rules { - Enrich(entry, r) + Enrich(entry, *r.Kubernetes) } assert.NotContains(t, entry, "SrcK8s_Namespace") assert.NotContains(t, entry, "DstK8s_Namespace") } + +var infoLayers = map[string]*inf.Info{ + "1.2.3.4": nil, + "10.0.0.1": { + ObjectMeta: v1.ObjectMeta{ + Name: "prometheus", + Namespace: "openshift-monitoring", + }, + Type: "Pod", + HostName: "host-1", + HostIP: "100.0.0.1", + }, + "10.0.0.2": { + ObjectMeta: v1.ObjectMeta{ + Name: "pod-2", + Namespace: "ns-2", + }, + Type: "Pod", + HostName: "host-2", + HostIP: "100.0.0.2", + }, + "10.0.0.3": { + ObjectMeta: v1.ObjectMeta{ + Name: "flowlogs-pipeline-1", + Namespace: "netobserv", + }, + Type: "Pod", + HostName: "host-2", + HostIP: "100.0.0.2", + }, + "20.0.0.1": { + ObjectMeta: v1.ObjectMeta{ + Name: "kubernetes", + Namespace: "default", + }, + Type: "Service", + }, + "20.0.0.2": { + ObjectMeta: v1.ObjectMeta{ + Name: "my-service", + Namespace: "my-ns", + }, + Type: "Service", + }, + "30.0.0.1": { + ObjectMeta: v1.ObjectMeta{ + Name: "node-x", + }, + Type: "Node", + }, +} + +func TestEnrichLayer(t *testing.T) { + informers = inf.SetupStubs(infoLayers, nodes) + + rule := api.NetworkTransformRule{ + KubernetesInfra: &api.K8sInfraRule{ + Inputs: []string{"SrcAddr", "DstAddr"}, + Output: "K8S_FlowLayer", + InfraPrefixes: []string{"netobserv", "openshift-"}, + InfraRefs: []api.K8sReference{ + { + Name: "kubernetes", + Namespace: "default", + }, + }, + }, + } + + // infra to infra => infra + flow := config.GenericMap{ + "SrcAddr": "10.0.0.1", // openshift-monitoring + "DstAddr": "10.0.0.3", // netobserv/flp + } + EnrichLayer(flow, *rule.KubernetesInfra) + + assert.Equal(t, "infra", flow["K8S_FlowLayer"]) + + // infra to node => infra + flow = config.GenericMap{ + "SrcAddr": "10.0.0.1", // openshift-monitoring + "DstAddr": "30.0.0.1", // node + } + EnrichLayer(flow, *rule.KubernetesInfra) + + assert.Equal(t, "infra", flow["K8S_FlowLayer"]) + + // node to kubernetes => infra + flow = config.GenericMap{ + "SrcAddr": "30.0.0.1", // node + "DstAddr": "20.0.0.1", // kube service + } + EnrichLayer(flow, *rule.KubernetesInfra) + + assert.Equal(t, "infra", flow["K8S_FlowLayer"]) + + // node to external => infra + flow = config.GenericMap{ + "SrcAddr": "30.0.0.1", // node + "DstAddr": "1.2.3.4", // external + } + EnrichLayer(flow, *rule.KubernetesInfra) + + assert.Equal(t, "infra", flow["K8S_FlowLayer"]) + + // app to app => app + flow = config.GenericMap{ + "SrcAddr": "10.0.0.2", // app pod + "DstAddr": "20.0.0.2", // app service + } + EnrichLayer(flow, *rule.KubernetesInfra) + + assert.Equal(t, "app", flow["K8S_FlowLayer"]) + + // node to app => app + flow = config.GenericMap{ + "SrcAddr": "30.0.0.1", // node + "DstAddr": "20.0.0.2", // app service + } + EnrichLayer(flow, *rule.KubernetesInfra) + + assert.Equal(t, "app", flow["K8S_FlowLayer"]) + + // app to infra => app + flow = config.GenericMap{ + "SrcAddr": "10.0.0.2", // app pod + "DstAddr": "20.0.0.1", // kube service + } + EnrichLayer(flow, *rule.KubernetesInfra) + + assert.Equal(t, "app", flow["K8S_FlowLayer"]) + + // app to external => app + flow = config.GenericMap{ + "SrcAddr": "10.0.0.2", // app pod + "DstAddr": "1.2.3.4", // external + } + EnrichLayer(flow, *rule.KubernetesInfra) + + assert.Equal(t, "app", flow["K8S_FlowLayer"]) +} diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index f6c4ea8ee..a92be790e 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -55,30 +55,42 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo for _, rule := range n.Rules { switch rule.Type { case api.OpAddSubnet: - _, ipv4Net, err := net.ParseCIDR(fmt.Sprintf("%v%s", outputEntry[rule.Input], rule.Parameters)) + if rule.AddSubnet == nil { + log.Errorf("Missing add subnet configuration") + continue + } + _, ipv4Net, err := net.ParseCIDR(fmt.Sprintf("%v%s", outputEntry[rule.AddSubnet.Input], rule.AddSubnet.SubnetMask)) if err != nil { - log.Warningf("Can't find subnet for IP %v and prefix length %s - err %v", outputEntry[rule.Input], rule.Parameters, err) + log.Warningf("Can't find subnet for IP %v and prefix length %s - err %v", outputEntry[rule.AddSubnet.Input], rule.AddSubnet.SubnetMask, err) continue } - outputEntry[rule.Output] = ipv4Net.String() + outputEntry[rule.AddSubnet.Output] = ipv4Net.String() case api.OpAddLocation: + if rule.AddLocation == nil { + log.Errorf("Missing add location configuration") + continue + } var locationInfo *location.Info - err, locationInfo := location.GetLocation(fmt.Sprintf("%s", outputEntry[rule.Input])) + err, locationInfo := location.GetLocation(fmt.Sprintf("%s", outputEntry[rule.AddLocation.Input])) if err != nil { - log.Warningf("Can't find location for IP %v err %v", outputEntry[rule.Input], err) + log.Warningf("Can't find location for IP %v err %v", outputEntry[rule.AddLocation.Input], err) continue } - outputEntry[rule.Output+"_CountryName"] = locationInfo.CountryName - outputEntry[rule.Output+"_CountryLongName"] = locationInfo.CountryLongName - outputEntry[rule.Output+"_RegionName"] = locationInfo.RegionName - outputEntry[rule.Output+"_CityName"] = locationInfo.CityName - outputEntry[rule.Output+"_Latitude"] = locationInfo.Latitude - outputEntry[rule.Output+"_Longitude"] = locationInfo.Longitude + outputEntry[rule.AddLocation.Output+"_CountryName"] = locationInfo.CountryName + outputEntry[rule.AddLocation.Output+"_CountryLongName"] = locationInfo.CountryLongName + outputEntry[rule.AddLocation.Output+"_RegionName"] = locationInfo.RegionName + outputEntry[rule.AddLocation.Output+"_CityName"] = locationInfo.CityName + outputEntry[rule.AddLocation.Output+"_Latitude"] = locationInfo.Latitude + outputEntry[rule.AddLocation.Output+"_Longitude"] = locationInfo.Longitude case api.OpAddService: - protocol := fmt.Sprintf("%v", outputEntry[rule.Parameters]) - portNumber, err := strconv.Atoi(fmt.Sprintf("%v", outputEntry[rule.Input])) + if rule.AddService == nil { + log.Errorf("Missing add service configuration") + continue + } + protocol := fmt.Sprintf("%v", outputEntry[rule.AddService.Protocol]) + portNumber, err := strconv.Atoi(fmt.Sprintf("%v", outputEntry[rule.AddService.Input])) if err != nil { - log.Errorf("Can't convert port to int: Port %v - err %v", outputEntry[rule.Input], err) + log.Errorf("Can't convert port to int: Port %v - err %v", outputEntry[rule.AddService.Input], err) continue } var serviceName string @@ -92,25 +104,33 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo } if serviceName == "" { if err != nil { - log.Debugf("Can't find service name for Port %v and protocol %v - err %v", outputEntry[rule.Input], protocol, err) + log.Debugf("Can't find service name for Port %v and protocol %v - err %v", outputEntry[rule.AddService.Input], protocol, err) continue } } - outputEntry[rule.Output] = serviceName + outputEntry[rule.AddService.Output] = serviceName case api.OpAddKubernetes: - kubernetes.Enrich(outputEntry, rule) + kubernetes.Enrich(outputEntry, *rule.Kubernetes) case api.OpAddKubernetesInfra: - kubernetes.EnrichLayer(outputEntry, rule) + if rule.KubernetesInfra == nil { + logrus.Error("transformation rule: Missing configuration ") + continue + } + kubernetes.EnrichLayer(outputEntry, *rule.KubernetesInfra) case api.OpReinterpretDirection: reinterpretDirection(outputEntry, &n.DirectionInfo) case api.OpAddIPCategory: - if strIP, ok := outputEntry[rule.Input].(string); ok { + if rule.AddIPCategory == nil { + logrus.Error("AddIPCategory rule: Missing configuration ") + continue + } + if strIP, ok := outputEntry[rule.AddIPCategory.Input].(string); ok { cat, ok := n.ipCatCache.GetCacheEntry(strIP) if !ok { cat = n.categorizeIP(net.ParseIP(strIP)) n.ipCatCache.UpdateCacheEntry(strIP, cat) } - outputEntry[rule.Output] = cat + outputEntry[rule.AddIPCategory.Output] = cat } default: diff --git a/pkg/pipeline/transform/transform_network_test.go b/pkg/pipeline/transform/transform_network_test.go index 58a2409f0..21da21cbf 100644 --- a/pkg/pipeline/transform/transform_network_test.go +++ b/pkg/pipeline/transform/transform_network_test.go @@ -33,45 +33,59 @@ import ( func getMockNetworkTransformRules() api.NetworkTransformRules { return api.NetworkTransformRules{ api.NetworkTransformRule{ - Input: "srcIP", - Output: "subnet16SrcIP", - Type: "add_subnet", - Parameters: "/16", + Type: "add_subnet", + AddSubnet: &api.NetworkAddSubnetRule{ + Input: "srcIP", + Output: "subnet16SrcIP", + SubnetMask: "/16", + }, }, api.NetworkTransformRule{ - Input: "srcIP", - Output: "subnet24SrcIP", - Type: "add_subnet", - Parameters: "/24", + Type: "add_subnet", + AddSubnet: &api.NetworkAddSubnetRule{ + Input: "srcIP", + Output: "subnet24SrcIP", + SubnetMask: "/24", + }, }, api.NetworkTransformRule{ - Input: "emptyIP", - Output: "cidr_fail_skip", - Type: "add_subnet", - Parameters: "/16", + Type: "add_subnet", + AddSubnet: &api.NetworkAddSubnetRule{ + Input: "emptyIP", + Output: "cidr_fail_skip", + SubnetMask: "/16", + }, }, api.NetworkTransformRule{ - Input: "dstPort", - Output: "service", - Type: "add_service", - Parameters: "protocol", + Type: "add_service", + AddService: &api.NetworkAddServiceRule{ + Input: "dstPort", + Output: "service", + Protocol: "protocol", + }, }, api.NetworkTransformRule{ - Input: "dstPort", - Output: "service_protocol_num", - Type: "add_service", - Parameters: "protocol_num", + Type: "add_service", + AddService: &api.NetworkAddServiceRule{ + Input: "dstPort", + Output: "service_protocol_num", + Protocol: "protocol_num", + }, }, api.NetworkTransformRule{ - Input: "srcPort", - Output: "unknown_service", - Type: "add_service", - Parameters: "protocol", + Type: "add_service", + AddService: &api.NetworkAddServiceRule{ + Input: "srcPort", + Output: "unknown_service", + Protocol: "protocol", + }, }, api.NetworkTransformRule{ - Input: "8888IP", - Output: "8888IP_location", - Type: "add_location", + Type: "add_location", + AddLocation: &api.NetworkGenericRule{ + Input: "8888IP", + Output: "8888IP_location", + }, }, } } @@ -173,10 +187,11 @@ parameters: type: network network: rules: - - input: srcIP - output: subnetSrcIP - type: add_subnet - parameters: /24 + - type: add_subnet + add_subnet: + input: srcIP + output: subnetSrcIP + subnet_mask: /24 - name: write1 write: type: stdout @@ -212,10 +227,10 @@ func Test_Categorize(t *testing.T) { Transform: &config.Transform{ Network: &api.TransformNetwork{ Rules: []api.NetworkTransformRule{ - {Type: api.OpAddIPCategory, Input: "addr1", Output: "cat1"}, - {Type: api.OpAddIPCategory, Input: "addr2", Output: "cat2"}, - {Type: api.OpAddIPCategory, Input: "addr3", Output: "cat3"}, - {Type: api.OpAddIPCategory, Input: "addr4", Output: "cat4"}, + {Type: api.OpAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr1", Output: "cat1"}}, + {Type: api.OpAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr2", Output: "cat2"}}, + {Type: api.OpAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr3", Output: "cat3"}}, + {Type: api.OpAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr4", Output: "cat4"}}, }, IPCategories: []api.NetworkTransformIPCategory{{ Name: "Pods overlay", diff --git a/pkg/test/e2e/pipline/flp-config.yaml b/pkg/test/e2e/pipline/flp-config.yaml index 2435ff45f..8ea125330 100644 --- a/pkg/test/e2e/pipline/flp-config.yaml +++ b/pkg/test/e2e/pipline/flp-config.yaml @@ -42,42 +42,40 @@ data: transform: network: rules: - - input: dstPort - output: service - type: add_service - parameters: proto - - input: dstIP - output: dstSubnet24 - type: add_subnet - parameters: /24 - - input: srcIP - output: srcSubnet24 - type: add_subnet - parameters: /24 - - input: srcIP - output: srcSubnet - type: add_subnet - parameters: /16 - - input: dstIP - output: dstSubnet - type: add_subnet - parameters: /16 - - input: srcIP - output: srcK8S - type: add_kubernetes - parameters: srcK8S_labels - - input: dstIP - output: dstLocation - type: add_location - parameters: "" - - input: bytes - output: mice - type: add_if - parameters: <512 - - input: bytes - output: elephant - type: add_if - parameters: '>=512' + - type: add_service + add_service: + input: dstPort + output: service + protocol: proto + - type: add_subnet + add_subnet: + input: dstIP + output: dstSubnet24 + subnet_mask: /24 + - type: add_subnet + add_subnet: + input: srcIP + output: srcSubnet24 + subnet_mask: /24 + - type: add_subnet + add_subnet: + input: srcIP + output: srcSubnet + subnet_mask: /16 + - type: add_subnet + add_subnet: + input: dstIP + output: dstSubnet + subnet_mask: /16 + - type: add_kubernetes + kubernetes: + input: srcIP + output: srcK8S + labels_prefix: srcK8S_labels + - type: add_location + add_location: + input: dstIP + output: dstLocation type: network - extract: aggregates: diff --git a/pkg/test/e2e/pipline/pipline_test.go b/pkg/test/e2e/pipline/pipline_test.go index 52268c10f..59dd552c0 100644 --- a/pkg/test/e2e/pipline/pipline_test.go +++ b/pkg/test/e2e/pipline/pipline_test.go @@ -35,6 +35,29 @@ import ( "sigs.k8s.io/e2e-framework/pkg/features" ) +func printLogsFromPods(t *testing.T, cfg *envconf.Config) { + client, err := cfg.NewClient() + if err != nil { + t.Fatal(err) + } + + // coreV1Client from current context in kubeconfig + coreV1Client, err := e2e.GetCoreV1Client(cfg.KubeconfigFile()) + if err != nil { + t.Fatal(err) + } + + var pods corev1.PodList + err = client.Resources(cfg.Namespace()).List(context.TODO(), &pods) + if err != nil { + t.Fatal(err) + } + + logs := e2e.LogsFromPods(pods, coreV1Client, cfg.Namespace()) + fmt.Print(logs) + +} + func TestPipeline_Basic(t *testing.T) { pipelineFeature := features.New("FLP/pipeline").WithLabel("env", "dev"). Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { @@ -51,6 +74,7 @@ func TestPipeline_Basic(t *testing.T) { // wait for the deployment to finish becoming available err = wait.For(conditions.New(client.Resources()).DeploymentConditionMatch(&dep, appsv1.DeploymentAvailable, v1.ConditionTrue), wait.WithTimeout(time.Minute*3)) if err != nil { + printLogsFromPods(t, cfg) t.Fatal(err) } diff --git a/pkg/test/network_defs.go b/pkg/test/network_defs.go index 64d396edd..5d29ce1ed 100644 --- a/pkg/test/network_defs.go +++ b/pkg/test/network_defs.go @@ -67,10 +67,11 @@ tags: - label transform: rules: - - input: testInput - output: testOutput - type: add_service - parameters: proto + - type: add_service + add_service: + input: testInput + output: testOutput + protocol: proto extract: types: aggregates aggregates: @@ -149,10 +150,11 @@ tags: - label transform: rules: - - input: testInput - output: testOutput - type: add_service - parameters: proto + - type: add_service + add_service: + input: testInput + output: testOutput + protocol: proto encode: type: prom prom: