From 2ea1a7ce77b87236a47f308e37abdbd886756d29 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Fri, 20 Sep 2024 08:36:18 +0200 Subject: [PATCH] NETOBSERV-1875: Make enrichment indexes configurable (#711) * NETOBSERV-1875: Make enrichment indexes configurable For secondary interfaces, it's better to allow more configurable indexing for pods enrichment. For some network types, pods might be identified by their MAC address advertized in the network-status annotations, while in others it can be by IP, by interface name, or any combination of those. This PR makes it all configurable in that way (example): ```bash secondaryNetworks: - name: my-network index: mac: true ip: true ``` More than one network can be configured that way. Additionally, a new metric is added to track which indexer is used for every enrichment hit: promql: `sum(rate(netobserv_secondary_network_indexer_hit[1m])) by (kind, network, error)` This metric can also report errors and unexpected multiple matches * Add namespace label to metric, add network name enrichment * do not return from building keys when a field is missing: move to next network --- README.md | 4 +- cmd/flowlogs-pipeline/main_test.go | 2 +- .../kubernetes/flowlogs-pipeline.conf.yaml | 2 +- docs/api.md | 10 +- docs/operational-metrics.md | 8 + pkg/api/transform_network.go | 26 ++- pkg/confgen/dedup_test.go | 14 +- pkg/config/pipeline_builder_test.go | 20 +- pkg/operational/metrics.go | 13 ++ pkg/pipeline/pipeline_builder.go | 4 +- pkg/pipeline/transform/kubernetes/cni/cni.go | 1 - .../transform/kubernetes/cni/multus.go | 139 +++++++++--- .../transform/kubernetes/cni/multus_test.go | 49 +++-- .../kubernetes/cni/ovn_kubernetes.go | 5 - pkg/pipeline/transform/kubernetes/enrich.go | 20 +- .../transform/kubernetes/enrich_test.go | 176 ++++++++------- .../kubernetes/informers/informers-mock.go | 64 ++++-- .../kubernetes/informers/informers.go | 204 +++++++++++------- .../kubernetes/informers/informers_test.go | 73 ++++--- pkg/pipeline/transform/transform_network.go | 5 +- .../transform/transform_network_test.go | 16 +- pkg/test/e2e/pipline/flp-config.yaml | 2 +- 22 files changed, 529 insertions(+), 328 deletions(-) diff --git a/README.md b/README.md index 4ca444be6..eb4dc339b 100644 --- a/README.md +++ b/README.md @@ -422,7 +422,7 @@ parameters: output: dstLocation - type: add_kubernetes kubernetes: - input: srcIP + ipField: srcIP output: srcK8S ``` @@ -443,7 +443,7 @@ All the geo-location fields will be named by appending `output` value (e.g., `CountryName`, `CountryLongName`, `RegionName`, `CityName` , `Longitude` and `Latitude`) The rule `add_kubernetes` generates new fields with kubernetes information by -matching the `input` value (`srcIP` in the example above) with kubernetes `nodes`, `pods` and `services` IPs. +matching the `ipField` value (`srcIP` in the example above) with kubernetes `nodes`, `pods` and `services` IPs. All the kubernetes fields will be named by appending `output` value (`srcK8S` in the example above) to the kubernetes metadata field names (e.g., `Namespace`, `Name`, `Type`, `HostIP`, `OwnerName`, `OwnerType` ) diff --git a/cmd/flowlogs-pipeline/main_test.go b/cmd/flowlogs-pipeline/main_test.go index dded3bb69..2f0a3c8ae 100644 --- a/cmd/flowlogs-pipeline/main_test.go +++ b/cmd/flowlogs-pipeline/main_test.go @@ -52,7 +52,7 @@ func TestPipelineConfigSetup(t *testing.T) { js := `{ "PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]", - "Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"kubernetes\":{\"input\":\"SrcAddr\",\"output\":\"SrcK8S\"},\"type\":\"add_kubernetes\"},{\"kubernetes\":{\"input\":\"DstAddr\",\"output\":\"DstK8S\"},\"type\":\"add_kubernetes\"},{\"add_service\":{\"input\":\"DstPort\",\"output\":\"Service\",\"protocol\":\"Proto\"},\"type\":\"add_service\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]", + "Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"kubernetes\":{\"ipField\":\"SrcAddr\",\"output\":\"SrcK8S\"},\"type\":\"add_kubernetes\"},{\"kubernetes\":{\"ipField\":\"DstAddr\",\"output\":\"DstK8S\"},\"type\":\"add_kubernetes\"},{\"add_service\":{\"input\":\"DstPort\",\"output\":\"Service\",\"protocol\":\"Proto\"},\"type\":\"add_service\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]", "Health": { "Port": "8080" }, diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index f597decea..e09e62d9e 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -144,7 +144,7 @@ parameters: subnet_mask: /16 - type: add_kubernetes kubernetes: - input: srcIP + ipField: srcIP output: srcK8S labels_prefix: srcK8S_labels - type: add_location diff --git a/docs/api.md b/docs/api.md index 13a192d43..19fb3f54a 100644 --- a/docs/api.md +++ b/docs/api.md @@ -250,8 +250,9 @@ Following is the supported API format for network transformations: name: name of the object namespace: namespace of the object kubernetes: Kubernetes rule configuration - input: entry IP input field - mac-input: Optional entry MAC input field + ipField: entry IP input field + interfacesField: entry Interfaces input field + macField: entry MAC input field output: entry output field assignee: value needs to assign to output field labels_prefix: labels prefix to use to copy input lables, if empty labels will not be copied @@ -272,7 +273,10 @@ Following is the supported API format for network transformations: protocol: entry protocol field kubeConfig: global configuration related to Kubernetes (optional) configPath: path to kubeconfig file (optional) - managedCNI: a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn, multus + secondaryNetworks: configuration for secondary networks + name: name of the secondary network, as mentioned in the annotation 'k8s.v1.cni.cncf.io/network-status' + index: fields to use for indexing, must be any combination of 'mac', 'ip', 'interface' + managedCNI: a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn servicesFile: path to services file (optional, default: /etc/services) protocolsFile: path to protocols file (optional, default: /etc/protocols) subnetLabels: configure subnet and IPs custom labels diff --git a/docs/operational-metrics.md b/docs/operational-metrics.md index 641d265f2..e79e7e556 100644 --- a/docs/operational-metrics.md +++ b/docs/operational-metrics.md @@ -143,6 +143,14 @@ Each table below provides documentation for an exported flowlogs-pipeline operat | **Labels** | stage | +### secondary_network_indexer_hit +| **Name** | secondary_network_indexer_hit | +|:---|:---| +| **Description** | Counter of hits per secondary network index for Kubernetes enrichment | +| **Type** | counter | +| **Labels** | kind, network, warning | + + ### stage_duration_ms | **Name** | stage_duration_ms | |:---|:---| diff --git a/pkg/api/transform_network.go b/pkg/api/transform_network.go index 8c60c0aa8..baf922230 100644 --- a/pkg/api/transform_network.go +++ b/pkg/api/transform_network.go @@ -39,13 +39,13 @@ func (tn *TransformNetwork) GetServiceFiles() (string, string) { } const ( - OVN = "ovn" - Multus = "multus" + OVN = "ovn" ) type NetworkTransformKubeConfig struct { - ConfigPath string `yaml:"configPath,omitempty" json:"configPath,omitempty" doc:"path to kubeconfig file (optional)"` - ManagedCNI []string `yaml:"managedCNI,omitempty" json:"managedCNI,omitempty" doc:"a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn, multus"` + ConfigPath string `yaml:"configPath,omitempty" json:"configPath,omitempty" doc:"path to kubeconfig file (optional)"` + SecondaryNetworks []SecondaryNetwork `yaml:"secondaryNetworks,omitempty" json:"secondaryNetworks,omitempty" doc:"configuration for secondary networks"` + ManagedCNI []string `yaml:"managedCNI,omitempty" json:"managedCNI,omitempty" doc:"a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn"` } type TransformNetworkOperationEnum string @@ -84,12 +84,18 @@ type K8sReference struct { } type K8sRule struct { - Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry IP input field"` - MacInput string `yaml:"mac-input,omitempty" json:"mac-input,omitempty" doc:"Optional entry MAC input field"` - Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` - Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"` - LabelsPrefix string `yaml:"labels_prefix,omitempty" json:"labels_prefix,omitempty" doc:"labels prefix to use to copy input lables, if empty labels will not be copied"` - AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"if true the rule will add the zone"` + IPField string `yaml:"ipField,omitempty" json:"ipField,omitempty" doc:"entry IP input field"` + InterfacesField string `yaml:"interfacesField,omitempty" json:"interfacesField,omitempty" doc:"entry Interfaces input field"` + MACField string `yaml:"macField,omitempty" json:"macField,omitempty" doc:"entry MAC input field"` + Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` + Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"` + LabelsPrefix string `yaml:"labels_prefix,omitempty" json:"labels_prefix,omitempty" doc:"labels prefix to use to copy input lables, if empty labels will not be copied"` + AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"if true the rule will add the zone"` +} + +type SecondaryNetwork struct { + Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"name of the secondary network, as mentioned in the annotation 'k8s.v1.cni.cncf.io/network-status'"` + Index map[string]any `yaml:"index,omitempty" json:"index,omitempty" doc:"fields to use for indexing, must be any combination of 'mac', 'ip', 'interface'"` } type NetworkGenericRule struct { diff --git a/pkg/confgen/dedup_test.go b/pkg/confgen/dedup_test.go index 70bddae15..e7a59367f 100644 --- a/pkg/confgen/dedup_test.go +++ b/pkg/confgen/dedup_test.go @@ -26,15 +26,15 @@ import ( func Test_dedupeNetworkTransformRules(t *testing.T) { slice := api.NetworkTransformRules{ - api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i1", Output: "o1"}}, - api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}}, - api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i3", Output: "o3"}}, - api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i1", Output: "o1"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i2", Output: "o2"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i3", Output: "o3"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i2", Output: "o2"}}, } expected := api.NetworkTransformRules{ - api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i1", Output: "o1"}}, - api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}}, - api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i3", Output: "o3"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i1", Output: "o1"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i2", Output: "o2"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i3", Output: "o3"}}, } actual := dedupeNetworkTransformRules(slice) diff --git a/pkg/config/pipeline_builder_test.go b/pkg/config/pipeline_builder_test.go index 8b10142fb..2f8d8d357 100644 --- a/pkg/config/pipeline_builder_test.go +++ b/pkg/config/pipeline_builder_test.go @@ -31,14 +31,14 @@ func TestLokiPipeline(t *testing.T) { pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{ Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "SrcAddr", - Output: "SrcK8S", + IPField: "SrcAddr", + Output: "SrcK8S", }, }, { Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "DstAddr", - Output: "DstK8S", + IPField: "DstAddr", + Output: "DstK8S", }, }}}) pl = pl.WriteLoki("loki", api.WriteLoki{URL: "http://loki:3100/"}) @@ -58,7 +58,7 @@ func TestLokiPipeline(t *testing.T) { b, err = json.Marshal(params[1]) require.NoError(t, err) - require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b)) + require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"ipField":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"ipField":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b)) b, err = json.Marshal(params[2]) require.NoError(t, err) @@ -206,14 +206,14 @@ func TestIPFIXPipeline(t *testing.T) { pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{ Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "SrcAddr", - Output: "SrcK8S", + IPField: "SrcAddr", + Output: "SrcK8S", }, }, { Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "DstAddr", - Output: "DstK8S", + IPField: "DstAddr", + Output: "DstK8S", }, }}}) pl = pl.WriteIpfix("ipfix", api.WriteIpfix{ @@ -238,7 +238,7 @@ func TestIPFIXPipeline(t *testing.T) { b, err = json.Marshal(params[1]) require.NoError(t, err) - require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b)) + require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"ipField":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"ipField":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b)) b, err = json.Marshal(params[2]) require.NoError(t, err) diff --git a/pkg/operational/metrics.go b/pkg/operational/metrics.go index 44488f232..d0008aa3f 100644 --- a/pkg/operational/metrics.go +++ b/pkg/operational/metrics.go @@ -80,6 +80,15 @@ var ( TypeHistogram, "stage", ) + indexerHit = DefineMetric( + "secondary_network_indexer_hit", + "Counter of hits per secondary network index for Kubernetes enrichment", + TypeCounter, + "kind", + "namespace", + "network", + "warning", + ) ) func (def *MetricDefinition) mapLabels(labels []string) prometheus.Labels { @@ -241,6 +250,10 @@ func (o *Metrics) GetOrCreateStageDurationHisto() *prometheus.HistogramVec { return o.stageDurationHisto } +func (o *Metrics) CreateIndexerHitCounter() *prometheus.CounterVec { + return o.NewCounterVec(&indexerHit) +} + func GetDocumentation() string { doc := "" sort.Slice(allMetrics, func(i, j int) bool { diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index dfa10cce8..cfffc894f 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -434,7 +434,7 @@ func getWriter(opMetrics *operational.Metrics, params config.StageParam) (write. return writer, err } -func getTransformer(_ *operational.Metrics, params config.StageParam) (transform.Transformer, error) { +func getTransformer(opMetrics *operational.Metrics, params config.StageParam) (transform.Transformer, error) { var transformer transform.Transformer var err error switch params.Transform.Type { @@ -443,7 +443,7 @@ func getTransformer(_ *operational.Metrics, params config.StageParam) (transform case api.FilterType: transformer, err = transform.NewTransformFilter(params) case api.NetworkType: - transformer, err = transform.NewTransformNetwork(params) + transformer, err = transform.NewTransformNetwork(params, opMetrics) case api.NoneType: transformer, err = transform.NewTransformNone() default: diff --git a/pkg/pipeline/transform/kubernetes/cni/cni.go b/pkg/pipeline/transform/kubernetes/cni/cni.go index ef787a74d..05533886d 100644 --- a/pkg/pipeline/transform/kubernetes/cni/cni.go +++ b/pkg/pipeline/transform/kubernetes/cni/cni.go @@ -6,5 +6,4 @@ import ( type Plugin interface { GetNodeIPs(node *v1.Node) []string - GetPodIPsAndMACs(pod *v1.Pod) ([]string, []string) } diff --git a/pkg/pipeline/transform/kubernetes/cni/multus.go b/pkg/pipeline/transform/kubernetes/cni/multus.go index 2fd073377..b37d45ea9 100644 --- a/pkg/pipeline/transform/kubernetes/cni/multus.go +++ b/pkg/pipeline/transform/kubernetes/cni/multus.go @@ -5,59 +5,130 @@ import ( "fmt" "strings" - log "github.com/sirupsen/logrus" + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" v1 "k8s.io/api/core/v1" ) const ( statusAnnotation = "k8s.v1.cni.cncf.io/network-status" + // Index names + indexIP = "ip" + indexMAC = "mac" + indexInterface = "interface" ) -type MultusPlugin struct { - Plugin +type MultusHandler struct { } -func (m *MultusPlugin) GetNodeIPs(_ *v1.Node) []string { - // No CNI-specific logic needed for pods - return nil +type SecondaryNetKey struct { + NetworkName string + Key string } -func (m *MultusPlugin) GetPodIPsAndMACs(pod *v1.Pod) ([]string, []string) { - // Cf https://k8snetworkplumbingwg.github.io/multus-cni/docs/quickstart.html#network-status-annotations - ips, macs, err := extractNetStatusIPsAndMACs(pod.Annotations) - if err != nil { - // Log the error as Info, do not block other ips indexing - log.Infof("failed to index IPs from network-status annotation: %v", err) +func (m *MultusHandler) BuildKeys(flow config.GenericMap, rule *api.K8sRule, secNets []api.SecondaryNetwork) []SecondaryNetKey { + if len(secNets) == 0 { + return nil + } + var keys []SecondaryNetKey + for _, sn := range secNets { + snKeys := m.buildSNKeys(flow, rule, &sn) + if snKeys != nil { + keys = append(keys, snKeys...) + } } - log.Tracef("GetPodIPsAndMACs found ips: %v macs: %v for pod %s", ips, macs, pod.Name) - return ips, macs + return keys } -type netStatItem struct { - IPs []string `json:"ips"` - MAC string `json:"mac"` -} +func (m *MultusHandler) buildSNKeys(flow config.GenericMap, rule *api.K8sRule, sn *api.SecondaryNetwork) []SecondaryNetKey { + var keys []SecondaryNetKey -func extractNetStatusIPsAndMACs(annotations map[string]string) ([]string, []string, error) { - if statusAnnotationJSON, ok := annotations[statusAnnotation]; ok { - var ips, macs []string - var networks []netStatItem - err := json.Unmarshal([]byte(statusAnnotationJSON), &networks) - if err == nil { - for _, network := range networks { - if len(network.IPs) > 0 { - ips = append(ips, network.IPs...) - } + var ip, mac string + var interfaces []string + if _, ok := sn.Index[indexIP]; ok && len(rule.IPField) > 0 { + ip, ok = flow.LookupString(rule.IPField) + if !ok { + return nil + } + } + if _, ok := sn.Index[indexMAC]; ok && len(rule.MACField) > 0 { + mac, ok = flow.LookupString(rule.MACField) + if !ok { + return nil + } + } + if _, ok := sn.Index[indexInterface]; ok && len(rule.InterfacesField) > 0 { + v, ok := flow[rule.InterfacesField] + if !ok { + return nil + } + interfaces, ok = v.([]string) + if !ok { + return nil + } + } - if len(network.MAC) > 0 { - macs = append(macs, strings.ToUpper(network.MAC)) + macIP := "~" + ip + "~" + mac + if interfaces == nil { + return []SecondaryNetKey{{NetworkName: sn.Name, Key: macIP}} + } + for _, intf := range interfaces { + keys = append(keys, SecondaryNetKey{NetworkName: sn.Name, Key: intf + macIP}) + } + + return keys +} + +func (m *MultusHandler) GetPodUniqueKeys(pod *v1.Pod, secNets []api.SecondaryNetwork) ([]string, error) { + if len(secNets) == 0 { + return nil, nil + } + // Cf https://k8snetworkplumbingwg.github.io/multus-cni/docs/quickstart.html#network-status-annotations + if statusAnnotationJSON, ok := pod.Annotations[statusAnnotation]; ok { + var networks []NetStatItem + if err := json.Unmarshal([]byte(statusAnnotationJSON), &networks); err != nil { + return nil, fmt.Errorf("failed to index from network-status annotation, cannot read annotation %s: %w", statusAnnotation, err) + } + var keys []string + for _, network := range networks { + for _, snConfig := range secNets { + if snConfig.Name == network.Name { + keys = append(keys, network.Keys(snConfig)...) } } - return ips, macs, nil } - - return nil, nil, fmt.Errorf("cannot read annotation %s: %w", statusAnnotation, err) + return keys, nil } // Annotation not present => just ignore, no error - return nil, nil, nil + return nil, nil +} + +type NetStatItem struct { + Name string `json:"name"` + Interface string `json:"interface"` + IPs []string `json:"ips"` + MAC string `json:"mac"` +} + +func (n *NetStatItem) Keys(snConfig api.SecondaryNetwork) []string { + var mac, intf string + if _, ok := snConfig.Index[indexMAC]; ok { + mac = n.MAC + } + if _, ok := snConfig.Index[indexInterface]; ok { + intf = n.Interface + } + if _, ok := snConfig.Index[indexIP]; ok { + var keys []string + for _, ip := range n.IPs { + keys = append(keys, key(intf, ip, mac)) + } + return keys + } + // Ignore IP + return []string{key(intf, "", mac)} +} + +func key(intf, ip, mac string) string { + return intf + "~" + ip + "~" + strings.ToUpper(mac) } diff --git a/pkg/pipeline/transform/kubernetes/cni/multus_test.go b/pkg/pipeline/transform/kubernetes/cni/multus_test.go index 69772b00e..88481d46e 100644 --- a/pkg/pipeline/transform/kubernetes/cni/multus_test.go +++ b/pkg/pipeline/transform/kubernetes/cni/multus_test.go @@ -3,27 +3,38 @@ package cni import ( "testing" + "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestExtractNetStatusIPs(t *testing.T) { - // Annotation not found => no error, no ip - ip, mac, err := extractNetStatusIPsAndMACs(map[string]string{}) +var ( + multusHandler = MultusHandler{} + pod = v1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{}}} + secondaryNetConfig = []api.SecondaryNetwork{ + { + Name: "macvlan-conf", + Index: map[string]any{"mac": nil}, + }, + } +) + +func TestExtractNetStatusKeys(t *testing.T) { + // Annotation not found => no error, no key + keys, err := multusHandler.GetPodUniqueKeys(&pod, secondaryNetConfig) require.NoError(t, err) - require.Empty(t, ip) - require.Empty(t, mac) + require.Empty(t, keys) - // Annotation malformed => error, no ip - ip, mac, err = extractNetStatusIPsAndMACs(map[string]string{ - statusAnnotation: "whatever", - }) + // Annotation malformed => error, no key + pod.Annotations = map[string]string{statusAnnotation: "whatever"} + keys, err = multusHandler.GetPodUniqueKeys(&pod, secondaryNetConfig) require.Error(t, err) require.Contains(t, err.Error(), "cannot read annotation") - require.Empty(t, ip) - require.Empty(t, mac) + require.Empty(t, keys) - // Valid annotation => no error, ip - ip, mac, err = extractNetStatusIPsAndMACs(map[string]string{ + // Valid annotation => no error, key + pod.Annotations = map[string]string{ statusAnnotation: ` [{ "name": "cbr0", @@ -31,6 +42,7 @@ func TestExtractNetStatusIPs(t *testing.T) { "10.244.1.73" ], "default": true, + "mac": "aa:aa:96:ff:aa:aa", "dns": {} },{ "name": "macvlan-conf", @@ -42,9 +54,14 @@ func TestExtractNetStatusIPs(t *testing.T) { "dns": {} }] `, - }) + } + keys, err = multusHandler.GetPodUniqueKeys(&pod, secondaryNetConfig) require.NoError(t, err) - require.Equal(t, []string{"10.244.1.73", "192.168.1.205"}, ip) - require.Equal(t, []string{"86:1D:96:FF:55:0D"}, mac) + require.Equal(t, []string{"~~86:1D:96:FF:55:0D"}, keys) + // Composed key + secondaryNetConfig[0].Index = map[string]any{"mac": nil, "ip": nil, "interface": nil} + keys, err = multusHandler.GetPodUniqueKeys(&pod, secondaryNetConfig) + require.NoError(t, err) + require.Equal(t, []string{"net1~192.168.1.205~86:1D:96:FF:55:0D"}, keys) } diff --git a/pkg/pipeline/transform/kubernetes/cni/ovn_kubernetes.go b/pkg/pipeline/transform/kubernetes/cni/ovn_kubernetes.go index 4ce4fd7c9..ae5701de1 100644 --- a/pkg/pipeline/transform/kubernetes/cni/ovn_kubernetes.go +++ b/pkg/pipeline/transform/kubernetes/cni/ovn_kubernetes.go @@ -47,11 +47,6 @@ func (o *OVNPlugin) GetNodeIPs(node *v1.Node) []string { return nil } -func (o *OVNPlugin) GetPodIPsAndMACs(_ *v1.Pod) ([]string, []string) { - // No CNI-specific logic needed for pods - return nil, nil -} - func unmarshalOVNAnnotation(annot []byte) (string, error) { // Depending on OVN (OCP) version, the annotation might be JSON-encoded as a string (legacy), or an array of strings var subnetsAsArray map[string][]string diff --git a/pkg/pipeline/transform/kubernetes/enrich.go b/pkg/pipeline/transform/kubernetes/enrich.go index 702e913cb..899e2f2ca 100644 --- a/pkg/pipeline/transform/kubernetes/enrich.go +++ b/pkg/pipeline/transform/kubernetes/enrich.go @@ -5,6 +5,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" inf "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers" "github.com/sirupsen/logrus" ) @@ -16,23 +17,19 @@ func MockInformers() { informers = inf.NewInformersMock() } -func InitFromConfig(config api.NetworkTransformKubeConfig) error { - return informers.InitFromConfig(config) +func InitFromConfig(config api.NetworkTransformKubeConfig, opMetrics *operational.Metrics) error { + return informers.InitFromConfig(config, opMetrics) } func Enrich(outputEntry config.GenericMap, rule *api.K8sRule) { - ip, _ := outputEntry.LookupString(rule.Input) - var mac string - if len(rule.MacInput) > 0 { - mac, _ = outputEntry.LookupString(rule.MacInput) - } - logrus.Tracef("Enrich IP %s MAC %s", ip, mac) - if ip == "" && mac == "" { + ip, ok := outputEntry.LookupString(rule.IPField) + if !ok { return } - kubeInfo, err := informers.GetInfo(ip, mac) + potentialKeys := informers.BuildSecondaryNetworkKeys(outputEntry, rule) + kubeInfo, err := informers.GetInfo(potentialKeys, ip) if err != nil { - logrus.WithError(err).Tracef("can't find kubernetes info for IP %v", outputEntry[rule.Input]) + logrus.WithError(err).Tracef("can't find kubernetes info for keys %v and IP %s", potentialKeys, ip) return } if rule.Assignee != "otel" { @@ -45,6 +42,7 @@ func Enrich(outputEntry config.GenericMap, rule *api.K8sRule) { outputEntry[rule.Output+"_Type"] = kubeInfo.Type outputEntry[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name outputEntry[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type + outputEntry[rule.Output+"_NetworkName"] = kubeInfo.NetworkName if rule.LabelsPrefix != "" { for labelKey, labelValue := range kubeInfo.Labels { outputEntry[rule.LabelsPrefix+"_"+labelKey] = labelValue diff --git a/pkg/pipeline/transform/kubernetes/enrich_test.go b/pkg/pipeline/transform/kubernetes/enrich_test.go index 634e631ae..3a2566400 100644 --- a/pkg/pipeline/transform/kubernetes/enrich_test.go +++ b/pkg/pipeline/transform/kubernetes/enrich_test.go @@ -18,37 +18,41 @@ var ipInfo = map[string]*inf.Info{ Name: "pod-1", Namespace: "ns-1", }, - Type: "Pod", - HostName: "host-1", - HostIP: "100.0.0.1", + Type: "Pod", + HostName: "host-1", + HostIP: "100.0.0.1", + NetworkName: "primary", }, "10.0.0.2": { ObjectMeta: v1.ObjectMeta{ Name: "pod-2", Namespace: "ns-2", }, - Type: "Pod", - HostName: "host-2", - HostIP: "100.0.0.2", + Type: "Pod", + HostName: "host-2", + HostIP: "100.0.0.2", + NetworkName: "primary", }, "20.0.0.1": { ObjectMeta: v1.ObjectMeta{ Name: "service-1", Namespace: "ns-1", }, - Type: "Service", + Type: "Service", + NetworkName: "primary", }, } -var macInfo = map[string]*inf.Info{ - "AA:BB:CC:DD:EE:FF": { +var customKeysInfo = map[string]*inf.Info{ + "~~AA:BB:CC:DD:EE:FF": { ObjectMeta: v1.ObjectMeta{ Name: "pod-1", Namespace: "ns-1", }, - Type: "Pod", - HostName: "host-1", - HostIP: "100.0.0.1", + Type: "Pod", + HostName: "host-1", + HostIP: "100.0.0.1", + NetworkName: "custom-network", }, } @@ -60,7 +64,8 @@ var nodes = map[string]*inf.Info{ nodeZoneLabelName: "us-east-1a", }, }, - Type: "Node", + Type: "Node", + NetworkName: "primary", }, "host-2": { ObjectMeta: v1.ObjectMeta{ @@ -69,7 +74,8 @@ var nodes = map[string]*inf.Info{ nodeZoneLabelName: "us-east-1b", }, }, - Type: "Node", + Type: "Node", + NetworkName: "primary", }, } @@ -77,8 +83,8 @@ var rules = api.NetworkTransformRules{ { Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "SrcAddr", - MacInput: "SrcMAC", + IPField: "SrcAddr", + MACField: "SrcMAC", Output: "SrcK8s", AddZone: true, }, @@ -86,8 +92,8 @@ var rules = api.NetworkTransformRules{ { Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "DstAddr", - MacInput: "DstMAC", + IPField: "DstAddr", + MACField: "DstMAC", Output: "DstK8s", AddZone: true, }, @@ -95,7 +101,7 @@ var rules = api.NetworkTransformRules{ } func TestEnrich(t *testing.T) { - informers = inf.SetupStubs(ipInfo, macInfo, nodes) + informers = inf.SetupStubs(ipInfo, customKeysInfo, nodes) // Pod to unknown entry := config.GenericMap{ @@ -106,16 +112,17 @@ func TestEnrich(t *testing.T) { Enrich(entry, r.Kubernetes) } assert.Equal(t, config.GenericMap{ - "DstAddr": "42.42.42.42", - "SrcAddr": "10.0.0.1", - "SrcK8s_HostIP": "100.0.0.1", - "SrcK8s_HostName": "host-1", - "SrcK8s_Name": "pod-1", - "SrcK8s_Namespace": "ns-1", - "SrcK8s_OwnerName": "", - "SrcK8s_OwnerType": "", - "SrcK8s_Type": "Pod", - "SrcK8s_Zone": "us-east-1a", + "DstAddr": "42.42.42.42", + "SrcAddr": "10.0.0.1", + "SrcK8s_HostIP": "100.0.0.1", + "SrcK8s_HostName": "host-1", + "SrcK8s_Name": "pod-1", + "SrcK8s_Namespace": "ns-1", + "SrcK8s_OwnerName": "", + "SrcK8s_OwnerType": "", + "SrcK8s_Type": "Pod", + "SrcK8s_Zone": "us-east-1a", + "SrcK8s_NetworkName": "primary", }, entry) // Pod to pod @@ -127,24 +134,26 @@ func TestEnrich(t *testing.T) { Enrich(entry, r.Kubernetes) } assert.Equal(t, config.GenericMap{ - "DstAddr": "10.0.0.2", - "DstK8s_HostIP": "100.0.0.2", - "DstK8s_HostName": "host-2", - "DstK8s_Name": "pod-2", - "DstK8s_Namespace": "ns-2", - "DstK8s_OwnerName": "", - "DstK8s_OwnerType": "", - "DstK8s_Type": "Pod", - "DstK8s_Zone": "us-east-1b", - "SrcAddr": "10.0.0.1", - "SrcK8s_HostIP": "100.0.0.1", - "SrcK8s_HostName": "host-1", - "SrcK8s_Name": "pod-1", - "SrcK8s_Namespace": "ns-1", - "SrcK8s_OwnerName": "", - "SrcK8s_OwnerType": "", - "SrcK8s_Type": "Pod", - "SrcK8s_Zone": "us-east-1a", + "DstAddr": "10.0.0.2", + "DstK8s_HostIP": "100.0.0.2", + "DstK8s_HostName": "host-2", + "DstK8s_Name": "pod-2", + "DstK8s_Namespace": "ns-2", + "DstK8s_OwnerName": "", + "DstK8s_OwnerType": "", + "DstK8s_Type": "Pod", + "DstK8s_Zone": "us-east-1b", + "DstK8s_NetworkName": "primary", + "SrcAddr": "10.0.0.1", + "SrcK8s_HostIP": "100.0.0.1", + "SrcK8s_HostName": "host-1", + "SrcK8s_Name": "pod-1", + "SrcK8s_Namespace": "ns-1", + "SrcK8s_OwnerName": "", + "SrcK8s_OwnerType": "", + "SrcK8s_Type": "Pod", + "SrcK8s_Zone": "us-east-1a", + "SrcK8s_NetworkName": "primary", }, entry) // Pod to service @@ -156,21 +165,23 @@ func TestEnrich(t *testing.T) { Enrich(entry, r.Kubernetes) } assert.Equal(t, config.GenericMap{ - "DstAddr": "20.0.0.1", - "DstK8s_Name": "service-1", - "DstK8s_Namespace": "ns-1", - "DstK8s_OwnerName": "", - "DstK8s_OwnerType": "", - "DstK8s_Type": "Service", - "SrcAddr": "10.0.0.2", - "SrcK8s_HostIP": "100.0.0.2", - "SrcK8s_HostName": "host-2", - "SrcK8s_Name": "pod-2", - "SrcK8s_Namespace": "ns-2", - "SrcK8s_OwnerName": "", - "SrcK8s_OwnerType": "", - "SrcK8s_Type": "Pod", - "SrcK8s_Zone": "us-east-1b", + "DstAddr": "20.0.0.1", + "DstK8s_Name": "service-1", + "DstK8s_Namespace": "ns-1", + "DstK8s_OwnerName": "", + "DstK8s_OwnerType": "", + "DstK8s_Type": "Service", + "DstK8s_NetworkName": "primary", + "SrcAddr": "10.0.0.2", + "SrcK8s_HostIP": "100.0.0.2", + "SrcK8s_HostName": "host-2", + "SrcK8s_Name": "pod-2", + "SrcK8s_Namespace": "ns-2", + "SrcK8s_OwnerName": "", + "SrcK8s_OwnerType": "", + "SrcK8s_Type": "Pod", + "SrcK8s_Zone": "us-east-1b", + "SrcK8s_NetworkName": "primary", }, entry) } @@ -178,7 +189,7 @@ var otelRules = api.NetworkTransformRules{ { Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "source.ip", + IPField: "source.ip", Output: "source.", Assignee: "otel", AddZone: true, @@ -187,7 +198,7 @@ var otelRules = api.NetworkTransformRules{ { Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "destination.ip", + IPField: "destination.ip", Output: "destination.", Assignee: "otel", AddZone: true, @@ -196,7 +207,7 @@ var otelRules = api.NetworkTransformRules{ } func TestEnrich_Otel(t *testing.T) { - informers = inf.SetupStubs(ipInfo, macInfo, nodes) + informers = inf.SetupStubs(ipInfo, customKeysInfo, nodes) // Pod to unknown entry := config.GenericMap{ @@ -286,7 +297,7 @@ func TestEnrich_Otel(t *testing.T) { } func TestEnrich_EmptyNamespace(t *testing.T) { - informers = inf.SetupStubs(ipInfo, macInfo, nodes) + informers = inf.SetupStubs(ipInfo, customKeysInfo, nodes) // We need to check that, whether it returns NotFound or just an empty namespace, // there is no map entry for that namespace (an empty-valued map entry is not valid) @@ -419,27 +430,32 @@ func TestEnrichLayer(t *testing.T) { } func TestEnrichUsingMac(t *testing.T) { - informers = inf.SetupStubs(ipInfo, macInfo, nodes) + informers = inf.SetupStubs(ipInfo, customKeysInfo, nodes) // Pod to unknown using MAC entry := config.GenericMap{ - "SrcMAC": "AA:BB:CC:DD:EE:FF", // pod-1 - "DstMAC": "GG:HH:II:JJ:KK:LL", // unknown + "SrcAddr": "8.8.8.8", + "SrcMAC": "AA:BB:CC:DD:EE:FF", // pod-1 + "DstAddr": "9.9.9.9", + "DstMAC": "GG:HH:II:JJ:KK:LL", // unknown } for _, r := range rules { Enrich(entry, r.Kubernetes) } assert.Equal(t, config.GenericMap{ - "DstMAC": "GG:HH:II:JJ:KK:LL", - "SrcMAC": "AA:BB:CC:DD:EE:FF", - "SrcK8s_HostIP": "100.0.0.1", - "SrcK8s_HostName": "host-1", - "SrcK8s_Name": "pod-1", - "SrcK8s_Namespace": "ns-1", - "SrcK8s_OwnerName": "", - "SrcK8s_OwnerType": "", - "SrcK8s_Type": "Pod", - "SrcK8s_Zone": "us-east-1a", + "SrcAddr": "8.8.8.8", + "SrcMAC": "AA:BB:CC:DD:EE:FF", + "DstAddr": "9.9.9.9", + "DstMAC": "GG:HH:II:JJ:KK:LL", + "SrcK8s_HostIP": "100.0.0.1", + "SrcK8s_HostName": "host-1", + "SrcK8s_Name": "pod-1", + "SrcK8s_Namespace": "ns-1", + "SrcK8s_OwnerName": "", + "SrcK8s_OwnerType": "", + "SrcK8s_Type": "Pod", + "SrcK8s_Zone": "us-east-1a", + "SrcK8s_NetworkName": "custom-network", }, entry) // remove the MAC rules and retry @@ -448,7 +464,7 @@ func TestEnrichUsingMac(t *testing.T) { "DstMAC": "GG:HH:II:JJ:KK:LL", // unknown } for _, r := range rules { - r.Kubernetes.MacInput = "" + r.Kubernetes.MACField = "" Enrich(entry, r.Kubernetes) } assert.Equal(t, config.GenericMap{ diff --git a/pkg/pipeline/transform/kubernetes/informers/informers-mock.go b/pkg/pipeline/transform/kubernetes/informers/informers-mock.go index 273633dc0..513d32ecc 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers-mock.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers-mock.go @@ -4,11 +4,23 @@ import ( "errors" "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" "github.com/stretchr/testify/mock" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" ) +var ( + secondaryNetConfig = []api.SecondaryNetwork{ + { + Name: "my-network", + Index: map[string]any{"mac": nil}, + }, + } +) + type Mock struct { mock.Mock InformersInterface @@ -16,12 +28,12 @@ type Mock struct { func NewInformersMock() *Mock { inf := new(Mock) - inf.On("InitFromConfig", mock.Anything).Return(nil) + inf.On("InitFromConfig", mock.Anything, mock.Anything).Return(nil) return inf } -func (o *Mock) InitFromConfig(cfg api.NetworkTransformKubeConfig) error { - args := o.Called(cfg) +func (o *Mock) InitFromConfig(cfg api.NetworkTransformKubeConfig, opMetrics *operational.Metrics) error { + args := o.Called(cfg, opMetrics) return args.Error(0) } @@ -56,7 +68,7 @@ func (m *InformerMock) GetIndexer() cache.Indexer { return args.Get(0).(cache.Indexer) } -func (m *IndexerMock) MockPod(ip, mac, name, namespace, nodeIP string, owner *Owner) { +func (m *IndexerMock) MockPod(ip, mac, intf, name, namespace, nodeIP string, owner *Owner) { var ownerRef []metav1.OwnerReference if owner != nil { ownerRef = []metav1.OwnerReference{{ @@ -71,13 +83,18 @@ func (m *IndexerMock) MockPod(ip, mac, name, namespace, nodeIP string, owner *Ow Namespace: namespace, OwnerReferences: ownerRef, }, - HostIP: nodeIP, - ips: []string{}, - macs: []string{}, + HostIP: nodeIP, + ips: []string{}, + secondaryNetKeys: []string{}, } if len(mac) > 0 { - info.macs = []string{mac} - m.On("ByIndex", IndexMAC, mac).Return([]interface{}{&info}, nil) + nsi := cni.NetStatItem{ + Interface: intf, + MAC: mac, + IPs: []string{ip}, + } + info.secondaryNetKeys = nsi.Keys(secondaryNetConfig[0]) + m.On("ByIndex", IndexCustom, info.secondaryNetKeys[0]).Return([]interface{}{&info}, nil) } if len(ip) > 0 { info.ips = []string{ip} @@ -90,7 +107,6 @@ func (m *IndexerMock) MockNode(ip, name string) { Type: "Node", ObjectMeta: metav1.ObjectMeta{Name: name}, ips: []string{ip}, - macs: []string{}, }}, nil) } @@ -99,7 +115,6 @@ func (m *IndexerMock) MockService(ip, name, namespace string) { Type: "Service", ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, ips: []string{ip}, - macs: []string{}, }}, nil) } @@ -143,26 +158,26 @@ func SetupIndexerMocks(kd *Informers) (pods, nodes, svc, rs *IndexerMock) { type FakeInformers struct { InformersInterface - ipInfo map[string]*Info - macInfo map[string]*Info - nodes map[string]*Info + ipInfo map[string]*Info + customKeysInfo map[string]*Info + nodes map[string]*Info } -func SetupStubs(ipInfo map[string]*Info, macInfo map[string]*Info, nodes map[string]*Info) *FakeInformers { +func SetupStubs(ipInfo map[string]*Info, customKeysInfo map[string]*Info, nodes map[string]*Info) *FakeInformers { return &FakeInformers{ - ipInfo: ipInfo, - macInfo: macInfo, - nodes: nodes, + ipInfo: ipInfo, + customKeysInfo: customKeysInfo, + nodes: nodes, } } -func (f *FakeInformers) InitFromConfig(_ api.NetworkTransformKubeConfig) error { +func (f *FakeInformers) InitFromConfig(_ api.NetworkTransformKubeConfig, _ *operational.Metrics) error { return nil } -func (f *FakeInformers) GetInfo(ip string, mac string) (*Info, error) { - if len(mac) > 0 { - i := f.macInfo[mac] +func (f *FakeInformers) GetInfo(keys []cni.SecondaryNetKey, ip string) (*Info, error) { + if len(keys) > 0 { + i := f.customKeysInfo[keys[0].Key] if i != nil { return i, nil } @@ -175,6 +190,11 @@ func (f *FakeInformers) GetInfo(ip string, mac string) (*Info, error) { return nil, errors.New("notFound") } +func (f *FakeInformers) BuildSecondaryNetworkKeys(flow config.GenericMap, rule *api.K8sRule) []cni.SecondaryNetKey { + m := cni.MultusHandler{} + return m.BuildKeys(flow, rule, secondaryNetConfig) +} + func (f *FakeInformers) GetNodeInfo(n string) (*Info, error) { i := f.nodes[n] if i != nil { diff --git a/pkg/pipeline/transform/kubernetes/informers/informers.go b/pkg/pipeline/transform/kubernetes/informers/informers.go index 11a195099..2c7a4a4ce 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers.go @@ -20,14 +20,16 @@ package informers import ( "fmt" "net" - "strings" "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" "github.com/netobserv/flowlogs-pipeline/pkg/utils" "github.com/sirupsen/logrus" + "github.com/prometheus/client_golang/prometheus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -41,24 +43,27 @@ import ( const ( kubeConfigEnvVariable = "KUBECONFIG" syncTime = 10 * time.Minute + IndexCustom = "byCustomKey" IndexIP = "byIP" - IndexMAC = "byMAC" TypeNode = "Node" TypePod = "Pod" TypeService = "Service" ) -var log = logrus.WithField("component", "transform.Network.Kubernetes") -var cniPlugins = map[string]cni.Plugin{ - api.OVN: &cni.OVNPlugin{}, - api.Multus: &cni.MultusPlugin{}, -} +var ( + log = logrus.WithField("component", "transform.Network.Kubernetes") + cniPlugins = map[string]cni.Plugin{ + api.OVN: &cni.OVNPlugin{}, + } + multus = cni.MultusHandler{} +) //nolint:revive type InformersInterface interface { - GetInfo(string, string) (*Info, error) + BuildSecondaryNetworkKeys(flow config.GenericMap, rule *api.K8sRule) []cni.SecondaryNetKey + GetInfo([]cni.SecondaryNetKey, string) (*Info, error) GetNodeInfo(string) (*Info, error) - InitFromConfig(api.NetworkTransformKubeConfig) error + InitFromConfig(api.NetworkTransformKubeConfig, *operational.Metrics) error } type Informers struct { @@ -68,9 +73,12 @@ type Informers struct { nodes cache.SharedIndexInformer services cache.SharedIndexInformer // replicaSets caches the ReplicaSets as partially-filled *ObjectMeta pointers - replicaSets cache.SharedIndexInformer - stopChan chan struct{} - mdStopChan chan struct{} + replicaSets cache.SharedIndexInformer + stopChan chan struct{} + mdStopChan chan struct{} + managedCNI []string + secondaryNetworks []api.SecondaryNetwork + indexerHitMetric *prometheus.CounterVec } type Owner struct { @@ -86,25 +94,30 @@ type Owner struct { type Info struct { // Informers need that internal object is an ObjectMeta instance metav1.ObjectMeta - Type string - Owner Owner - HostName string - HostIP string - ips []string - macs []string + Type string + Owner Owner + HostName string + HostIP string + NetworkName string + ips []string + secondaryNetKeys []string } -var commonIndexers = map[string]cache.IndexFunc{ - IndexIP: func(obj interface{}) ([]string, error) { +var ( + ipIndexer = func(obj interface{}) ([]string, error) { return obj.(*Info).ips, nil - }, - IndexMAC: func(obj interface{}) ([]string, error) { - return obj.(*Info).macs, nil - }, + } + customKeyIndexer = func(obj interface{}) ([]string, error) { + return obj.(*Info).secondaryNetKeys, nil + } +) + +func (k *Informers) BuildSecondaryNetworkKeys(flow config.GenericMap, rule *api.K8sRule) []cni.SecondaryNetKey { + return multus.BuildKeys(flow, rule, k.secondaryNetworks) } -func (k *Informers) GetInfo(ip string, mac string) (*Info, error) { - if info, ok := k.fetchInformers(ip, mac); ok { +func (k *Informers) GetInfo(potentialKeys []cni.SecondaryNetKey, ip string) (*Info, error) { + if info, ok := k.fetchInformers(potentialKeys, ip); ok { // Owner data might be discovered after the owned, so we fetch it // at the last moment if info.Owner.Name == "" { @@ -116,46 +129,80 @@ func (k *Informers) GetInfo(ip string, mac string) (*Info, error) { return nil, fmt.Errorf("informers can't find IP %s", ip) } -func (k *Informers) fetchInformers(ip string, mac string) (*Info, bool) { - if info, ok := infoForIPOrMac(k.pods.GetIndexer(), ip, mac); ok { +func (k *Informers) fetchInformers(potentialKeys []cni.SecondaryNetKey, ip string) (*Info, bool) { + if info, ok := k.fetchPodInformer(potentialKeys, ip); ok { // it might happen that the Host is discovered after the Pod if info.HostName == "" { info.HostName = k.getHostName(info.HostIP) } return info, true } - if info, ok := infoForIPOrMac(k.nodes.GetIndexer(), ip, mac); ok { + // Nodes are only indexed by IP + if info, ok := k.infoForIP(k.nodes.GetIndexer(), "Node", ip); ok { return info, true } - if info, ok := infoForIPOrMac(k.services.GetIndexer(), ip, mac); ok { + // Services are only indexed by IP + if info, ok := k.infoForIP(k.services.GetIndexer(), "Service", ip); ok { return info, true } return nil, false } -func infoForIPOrMac(idx cache.Indexer, ip string, mac string) (*Info, bool) { - // check for mac first - if len(mac) > 0 { - objs, err := idx.ByIndex(IndexMAC, strings.ToUpper(mac)) +func (k *Informers) fetchPodInformer(potentialKeys []cni.SecondaryNetKey, ip string) (*Info, bool) { + // 1. Check if the unique key matches any Pod (secondary networks / multus case) + if info, ok := k.infoForCustomKeys(k.pods.GetIndexer(), "Pod", potentialKeys); ok { + return info, ok + } + // 2. Check if the IP matches any Pod (primary network) + return k.infoForIP(k.pods.GetIndexer(), "Pod", ip) +} + +func (k *Informers) increaseIndexerHits(kind, namespace, network, warn string) { + k.indexerHitMetric.WithLabelValues(kind, namespace, network, warn).Inc() +} + +func (k *Informers) infoForCustomKeys(idx cache.Indexer, kind string, potentialKeys []cni.SecondaryNetKey) (*Info, bool) { + for _, key := range potentialKeys { + objs, err := idx.ByIndex(IndexCustom, key.Key) if err != nil { - log.WithError(err).WithField("mac", mac).Debug("error accessing mac index. Ignoring") + k.increaseIndexerHits(kind, "", key.NetworkName, "informer error") + log.WithError(err).WithField("key", key).Debug("error accessing unique key index, ignoring") return nil, false } if len(objs) > 0 { - log.Tracef("infoForIPOrMac found mac %v", objs[0].(*Info)) - return objs[0].(*Info), true + info := objs[0].(*Info) + info.NetworkName = key.NetworkName + if len(objs) > 1 { + k.increaseIndexerHits(kind, info.Namespace, key.NetworkName, "multiple matches") + log.WithField("key", key).Debugf("found %d objects matching this key, returning first", len(objs)) + } else { + k.increaseIndexerHits(kind, info.Namespace, key.NetworkName, "") + } + log.Tracef("infoForUniqueKey found key %v", info) + return info, true } } + return nil, false +} - // then check for ip +func (k *Informers) infoForIP(idx cache.Indexer, kind string, ip string) (*Info, bool) { objs, err := idx.ByIndex(IndexIP, ip) if err != nil { - log.WithError(err).WithField("ip", ip).Debug("error accessing ip index. Ignoring") + k.increaseIndexerHits(kind, "", "primary", "informer error") + log.WithError(err).WithField("ip", ip).Debug("error accessing IP index, ignoring") return nil, false } if len(objs) > 0 { - log.Tracef("infoForIPOrMac found ip %v", objs[0].(*Info)) - return objs[0].(*Info), true + info := objs[0].(*Info) + info.NetworkName = "primary" + if len(objs) > 1 { + k.increaseIndexerHits(kind, info.Namespace, "primary", "multiple matches") + log.WithField("ip", ip).Debugf("found %d objects matching this IP, returning first", len(objs)) + } else { + k.increaseIndexerHits(kind, info.Namespace, "primary", "") + } + log.Tracef("infoForIP found ip %v", info) + return info, true } return nil, false } @@ -203,14 +250,14 @@ func (k *Informers) getOwner(info *Info) Owner { func (k *Informers) getHostName(hostIP string) string { if hostIP != "" { - if info, ok := infoForIPOrMac(k.nodes.GetIndexer(), hostIP, ""); ok { + if info, ok := k.infoForIP(k.nodes.GetIndexer(), "Node (indirect)", hostIP); ok { return info.Name } } return "" } -func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, managedCNI []string) error { +func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory) error { nodes := informerFactory.Core().V1().Nodes().Informer() // Transform any *v1.Node instance into a *Info instance to save space // in the informer's cache @@ -231,7 +278,7 @@ func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, } } // CNI-dependent logic (must not fail when the CNI is not installed) - for _, name := range managedCNI { + for _, name := range k.managedCNI { if plugin := cniPlugins[name]; plugin != nil { moreIPs := plugin.GetNodeIPs(node) if moreIPs != nil { @@ -242,12 +289,10 @@ func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, return &Info{ ObjectMeta: metav1.ObjectMeta{ - Name: node.Name, - Namespace: "", - Labels: node.Labels, + Name: node.Name, + Labels: node.Labels, }, ips: ips, - macs: []string{}, Type: TypeNode, // We duplicate HostIP and HostName information to simplify later filtering e.g. by // Host IP, where we want to get all the Pod flows by src/dst host, but also the actual @@ -258,14 +303,15 @@ func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, }); err != nil { return fmt.Errorf("can't set nodes transform: %w", err) } - if err := nodes.AddIndexers(commonIndexers); err != nil { + indexers := cache.Indexers{IndexIP: ipIndexer} + if err := nodes.AddIndexers(indexers); err != nil { return fmt.Errorf("can't add %s indexer to Nodes informer: %w", IndexIP, err) } k.nodes = nodes return nil } -func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, managedCNI []string) error { +func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory) error { pods := informerFactory.Core().V1().Pods().Informer() // Transform any *v1.Pod instance into a *Info instance to save space // in the informer's cache @@ -275,25 +321,19 @@ func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, m return nil, fmt.Errorf("was expecting a Pod. Got: %T", i) } ips := make([]string, 0, len(pod.Status.PodIPs)) - macs := []string{} for _, ip := range pod.Status.PodIPs { // ignoring host-networked Pod IPs if ip.IP != pod.Status.HostIP { ips = append(ips, ip.IP) } } - // CNI-dependent logic (must not fail when the CNI is not installed) - for _, name := range managedCNI { - if plugin := cniPlugins[name]; plugin != nil { - moreIPs, moreMacs := plugin.GetPodIPsAndMACs(pod) - if moreIPs != nil { - ips = append(ips, moreIPs...) - } - if moreMacs != nil { - macs = append(macs, moreMacs...) - } - } + // Index from secondary network info + keys, err := multus.GetPodUniqueKeys(pod, k.secondaryNetworks) + if err != nil { + // Log the error as Info, do not block other ips indexing + log.WithError(err).Infof("Secondary network cannot be identified") } + return &Info{ ObjectMeta: metav1.ObjectMeta{ Name: pod.Name, @@ -301,17 +341,21 @@ func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, m Labels: pod.Labels, OwnerReferences: pod.OwnerReferences, }, - Type: TypePod, - HostIP: pod.Status.HostIP, - HostName: pod.Spec.NodeName, - ips: ips, - macs: macs, + Type: TypePod, + HostIP: pod.Status.HostIP, + HostName: pod.Spec.NodeName, + secondaryNetKeys: keys, + ips: ips, }, nil }); err != nil { return fmt.Errorf("can't set pods transform: %w", err) } - if err := pods.AddIndexers(commonIndexers); err != nil { - return fmt.Errorf("can't add %s indexer to Pods informer: %w", IndexIP, err) + indexers := cache.Indexers{ + IndexIP: ipIndexer, + IndexCustom: customKeyIndexer, + } + if err := pods.AddIndexers(indexers); err != nil { + return fmt.Errorf("can't add indexers to Pods informer: %w", err) } k.pods = pods @@ -342,12 +386,12 @@ func (k *Informers) initServiceInformer(informerFactory inf.SharedInformerFactor }, Type: TypeService, ips: ips, - macs: []string{}, }, nil }); err != nil { return fmt.Errorf("can't set services transform: %w", err) } - if err := services.AddIndexers(commonIndexers); err != nil { + indexers := cache.Indexers{IndexIP: ipIndexer} + if err := services.AddIndexers(indexers); err != nil { return fmt.Errorf("can't add %s indexer to Pods informer: %w", IndexIP, err) } @@ -380,7 +424,7 @@ func (k *Informers) initReplicaSetInformer(informerFactory metadatainformer.Shar return nil } -func (k *Informers) InitFromConfig(cfg api.NetworkTransformKubeConfig) error { +func (k *Informers) InitFromConfig(cfg api.NetworkTransformKubeConfig, opMetrics *operational.Metrics) error { // Initialization variables k.stopChan = make(chan struct{}) k.mdStopChan = make(chan struct{}) @@ -400,11 +444,13 @@ func (k *Informers) InitFromConfig(cfg api.NetworkTransformKubeConfig) error { return err } - cnis := cfg.ManagedCNI - if cnis == nil { - cnis = []string{api.OVN, api.Multus} + k.managedCNI = cfg.ManagedCNI + if k.managedCNI == nil { + k.managedCNI = []string{api.OVN} } - err = k.initInformers(kubeClient, metaKubeClient, cnis) + k.secondaryNetworks = cfg.SecondaryNetworks + k.indexerHitMetric = opMetrics.CreateIndexerHitCounter() + err = k.initInformers(kubeClient, metaKubeClient) if err != nil { return err } @@ -412,14 +458,14 @@ func (k *Informers) InitFromConfig(cfg api.NetworkTransformKubeConfig) error { return nil } -func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, managedCNI []string) error { +func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface) error { informerFactory := inf.NewSharedInformerFactory(client, syncTime) metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime) - err := k.initNodeInformer(informerFactory, managedCNI) + err := k.initNodeInformer(informerFactory) if err != nil { return err } - err = k.initPodInformer(informerFactory, managedCNI) + err = k.initPodInformer(informerFactory) if err != nil { return err } diff --git a/pkg/pipeline/transform/kubernetes/informers/informers_test.go b/pkg/pipeline/transform/kubernetes/informers/informers_test.go index 09a6c2d84..0de4ba5d6 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers_test.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers_test.go @@ -20,15 +20,19 @@ package informers import ( "testing" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestGetInfo(t *testing.T) { - kubeData := Informers{} + metrics := operational.NewMetrics(&config.MetricsSettings{}) + kubeData := Informers{indexerHitMetric: metrics.CreateIndexerHitCounter()} pidx, hidx, sidx, ridx := SetupIndexerMocks(&kubeData) - pidx.MockPod("1.2.3.4", "AA:BB:CC:DD:EE:FF", "pod1", "podNamespace", "10.0.0.1", nil) - pidx.MockPod("1.2.3.5", "", "pod2", "podNamespace", "10.0.0.1", &Owner{Name: "rs1", Type: "ReplicaSet"}) + pidx.MockPod("1.2.3.4", "AA:BB:CC:DD:EE:FF", "eth0", "pod1", "podNamespace", "10.0.0.1", nil) + pidx.MockPod("1.2.3.5", "", "", "pod2", "podNamespace", "10.0.0.1", &Owner{Name: "rs1", Type: "ReplicaSet"}) pidx.FallbackNotFound() ridx.MockReplicaSet("rs1", "podNamespace", Owner{Name: "dep1", Type: "Deployment"}) ridx.FallbackNotFound() @@ -38,7 +42,7 @@ func TestGetInfo(t *testing.T) { hidx.FallbackNotFound() // Test get orphan pod - info, err := kubeData.GetInfo("1.2.3.4", "") + info, err := kubeData.GetInfo(nil, "1.2.3.4") require.NoError(t, err) pod1 := Info{ Type: "Pod", @@ -46,24 +50,26 @@ func TestGetInfo(t *testing.T) { Name: "pod1", Namespace: "podNamespace", }, - HostName: "node1", - HostIP: "10.0.0.1", - Owner: Owner{Name: "pod1", Type: "Pod"}, - ips: []string{"1.2.3.4"}, - macs: []string{"AA:BB:CC:DD:EE:FF"}, + HostName: "node1", + HostIP: "10.0.0.1", + Owner: Owner{Name: "pod1", Type: "Pod"}, + NetworkName: "primary", + ips: []string{"1.2.3.4"}, + secondaryNetKeys: []string{"~~AA:BB:CC:DD:EE:FF"}, } - require.Equal(t, *info, pod1) + require.Equal(t, pod1, *info) // Test get same pod by mac - info, err = kubeData.GetInfo("", "AA:BB:CC:DD:EE:FF") + info, err = kubeData.GetInfo([]cni.SecondaryNetKey{{NetworkName: "custom-network", Key: "~~AA:BB:CC:DD:EE:FF"}}, "") require.NoError(t, err) - require.Equal(t, *info, pod1) + pod1.NetworkName = "custom-network" + require.Equal(t, pod1, *info) // Test get pod owned - info, err = kubeData.GetInfo("1.2.3.5", "") + info, err = kubeData.GetInfo(nil, "1.2.3.5") require.NoError(t, err) - require.Equal(t, *info, Info{ + require.Equal(t, Info{ Type: "Pod", ObjectMeta: metav1.ObjectMeta{ Name: "pod2", @@ -73,44 +79,45 @@ func TestGetInfo(t *testing.T) { Name: "rs1", }}, }, - HostName: "node1", - HostIP: "10.0.0.1", - Owner: Owner{Name: "dep1", Type: "Deployment"}, - ips: []string{"1.2.3.5"}, - macs: []string{}, - }) + HostName: "node1", + HostIP: "10.0.0.1", + Owner: Owner{Name: "dep1", Type: "Deployment"}, + NetworkName: "primary", + ips: []string{"1.2.3.5"}, + secondaryNetKeys: []string{}, + }, *info) // Test get node - info, err = kubeData.GetInfo("10.0.0.1", "") + info, err = kubeData.GetInfo(nil, "10.0.0.1") require.NoError(t, err) - require.Equal(t, *info, Info{ + require.Equal(t, Info{ Type: "Node", ObjectMeta: metav1.ObjectMeta{ Name: "node1", }, - Owner: Owner{Name: "node1", Type: "Node"}, - ips: []string{"10.0.0.1"}, - macs: []string{}, - }) + Owner: Owner{Name: "node1", Type: "Node"}, + NetworkName: "primary", + ips: []string{"10.0.0.1"}, + }, *info) // Test get service - info, err = kubeData.GetInfo("1.2.3.100", "") + info, err = kubeData.GetInfo(nil, "1.2.3.100") require.NoError(t, err) - require.Equal(t, *info, Info{ + require.Equal(t, Info{ Type: "Service", ObjectMeta: metav1.ObjectMeta{ Name: "svc1", Namespace: "svcNamespace", }, - Owner: Owner{Name: "svc1", Type: "Service"}, - ips: []string{"1.2.3.100"}, - macs: []string{}, - }) + Owner: Owner{Name: "svc1", Type: "Service"}, + NetworkName: "primary", + ips: []string{"1.2.3.100"}, + }, *info) // Test no match - info, err = kubeData.GetInfo("1.2.3.200", "") + info, err = kubeData.GetInfo(nil, "1.2.3.200") require.NotNil(t, err) require.Nil(t, info) } diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index e8ec0c2c5..b79e6759c 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -26,6 +26,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/location" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/netdb" @@ -166,7 +167,7 @@ func (n *Network) applySubnetLabel(strIP string) string { // NewTransformNetwork create a new transform // //nolint:cyclop -func NewTransformNetwork(params config.StageParam) (Transformer, error) { +func NewTransformNetwork(params config.StageParam, opMetrics *operational.Metrics) (Transformer, error) { var needToInitLocationDB = false var needToInitKubeData = false var needToInitNetworkServices = false @@ -205,7 +206,7 @@ func NewTransformNetwork(params config.StageParam) (Transformer, error) { } if needToInitKubeData { - err := kubernetes.InitFromConfig(jsonNetworkTransform.KubeConfig) + err := kubernetes.InitFromConfig(jsonNetworkTransform.KubeConfig, opMetrics) if err != nil { return nil, err } diff --git a/pkg/pipeline/transform/transform_network_test.go b/pkg/pipeline/transform/transform_network_test.go index 20e2bc700..4738dc9bc 100644 --- a/pkg/pipeline/transform/transform_network_test.go +++ b/pkg/pipeline/transform/transform_network_test.go @@ -211,7 +211,7 @@ func InitNewTransformNetwork(t *testing.T, configFile string) Transformer { v, cfg := test.InitConfig(t, configFile) require.NotNil(t, v) config := cfg.Parameters[0] - newTransform, err := NewTransformNetwork(config) + newTransform, err := NewTransformNetwork(config, nil) require.NoError(t, err) return newTransform } @@ -246,7 +246,7 @@ func Test_Categorize(t *testing.T) { }, } - tr, err := NewTransformNetwork(cfg) + tr, err := NewTransformNetwork(cfg, nil) require.NoError(t, err) output, ok := tr.Transform(entry) @@ -280,7 +280,7 @@ func Test_ReinterpretDirection(t *testing.T) { }, } - tr, err := NewTransformNetwork(cfg) + tr, err := NewTransformNetwork(cfg, nil) require.NoError(t, err) output, ok := tr.Transform(config.GenericMap{ @@ -430,7 +430,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { }, }, }, - }) + }, nil) require.Contains(t, err.Error(), "missing ReporterIPField") // Missing src field @@ -448,7 +448,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { }, }, }, - }) + }, nil) require.Contains(t, err.Error(), "missing SrcHostField") // Missing dst field @@ -466,7 +466,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { }, }, }, - }) + }, nil) require.Contains(t, err.Error(), "missing DstHostField") // Missing flow direction field @@ -484,7 +484,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { }, }, }, - }) + }, nil) require.Contains(t, err.Error(), "missing FlowDirectionField") // Missing if direction field does not trigger an error (this field will just not be populated) @@ -502,7 +502,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { }, }, }, - }) + }, nil) require.NoError(t, err) // Test transformation when IfDirection is missing diff --git a/pkg/test/e2e/pipline/flp-config.yaml b/pkg/test/e2e/pipline/flp-config.yaml index 8ea125330..166e1cfeb 100644 --- a/pkg/test/e2e/pipline/flp-config.yaml +++ b/pkg/test/e2e/pipline/flp-config.yaml @@ -69,7 +69,7 @@ data: subnet_mask: /16 - type: add_kubernetes kubernetes: - input: srcIP + ipField: srcIP output: srcK8S labels_prefix: srcK8S_labels - type: add_location