diff --git a/README.md b/README.md index 4ca444be6..eb4dc339b 100644 --- a/README.md +++ b/README.md @@ -422,7 +422,7 @@ parameters: output: dstLocation - type: add_kubernetes kubernetes: - input: srcIP + ipField: srcIP output: srcK8S ``` @@ -443,7 +443,7 @@ All the geo-location fields will be named by appending `output` value (e.g., `CountryName`, `CountryLongName`, `RegionName`, `CityName` , `Longitude` and `Latitude`) The rule `add_kubernetes` generates new fields with kubernetes information by -matching the `input` value (`srcIP` in the example above) with kubernetes `nodes`, `pods` and `services` IPs. +matching the `ipField` value (`srcIP` in the example above) with kubernetes `nodes`, `pods` and `services` IPs. All the kubernetes fields will be named by appending `output` value (`srcK8S` in the example above) to the kubernetes metadata field names (e.g., `Namespace`, `Name`, `Type`, `HostIP`, `OwnerName`, `OwnerType` ) diff --git a/cmd/flowlogs-pipeline/main_test.go b/cmd/flowlogs-pipeline/main_test.go index dded3bb69..2f0a3c8ae 100644 --- a/cmd/flowlogs-pipeline/main_test.go +++ b/cmd/flowlogs-pipeline/main_test.go @@ -52,7 +52,7 @@ func TestPipelineConfigSetup(t *testing.T) { js := `{ "PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]", - "Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"kubernetes\":{\"input\":\"SrcAddr\",\"output\":\"SrcK8S\"},\"type\":\"add_kubernetes\"},{\"kubernetes\":{\"input\":\"DstAddr\",\"output\":\"DstK8S\"},\"type\":\"add_kubernetes\"},{\"add_service\":{\"input\":\"DstPort\",\"output\":\"Service\",\"protocol\":\"Proto\"},\"type\":\"add_service\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]", + "Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"kubernetes\":{\"ipField\":\"SrcAddr\",\"output\":\"SrcK8S\"},\"type\":\"add_kubernetes\"},{\"kubernetes\":{\"ipField\":\"DstAddr\",\"output\":\"DstK8S\"},\"type\":\"add_kubernetes\"},{\"add_service\":{\"input\":\"DstPort\",\"output\":\"Service\",\"protocol\":\"Proto\"},\"type\":\"add_service\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]", "Health": { "Port": "8080" }, diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index f597decea..e09e62d9e 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -144,7 +144,7 @@ parameters: subnet_mask: /16 - type: add_kubernetes kubernetes: - input: srcIP + ipField: srcIP output: srcK8S labels_prefix: srcK8S_labels - type: add_location diff --git a/docs/api.md b/docs/api.md index 13a192d43..19fb3f54a 100644 --- a/docs/api.md +++ b/docs/api.md @@ -250,8 +250,9 @@ Following is the supported API format for network transformations: name: name of the object namespace: namespace of the object kubernetes: Kubernetes rule configuration - input: entry IP input field - mac-input: Optional entry MAC input field + ipField: entry IP input field + interfacesField: entry Interfaces input field + macField: entry MAC input field output: entry output field assignee: value needs to assign to output field labels_prefix: labels prefix to use to copy input lables, if empty labels will not be copied @@ -272,7 +273,10 @@ Following is the supported API format for network transformations: protocol: entry protocol field kubeConfig: global configuration related to Kubernetes (optional) configPath: path to kubeconfig file (optional) - managedCNI: a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn, multus + secondaryNetworks: configuration for secondary networks + name: name of the secondary network, as mentioned in the annotation 'k8s.v1.cni.cncf.io/network-status' + index: fields to use for indexing, must be any combination of 'mac', 'ip', 'interface' + managedCNI: a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn servicesFile: path to services file (optional, default: /etc/services) protocolsFile: path to protocols file (optional, default: /etc/protocols) subnetLabels: configure subnet and IPs custom labels diff --git a/docs/operational-metrics.md b/docs/operational-metrics.md index 641d265f2..e79e7e556 100644 --- a/docs/operational-metrics.md +++ b/docs/operational-metrics.md @@ -143,6 +143,14 @@ Each table below provides documentation for an exported flowlogs-pipeline operat | **Labels** | stage | +### secondary_network_indexer_hit +| **Name** | secondary_network_indexer_hit | +|:---|:---| +| **Description** | Counter of hits per secondary network index for Kubernetes enrichment | +| **Type** | counter | +| **Labels** | kind, network, warning | + + ### stage_duration_ms | **Name** | stage_duration_ms | |:---|:---| diff --git a/pkg/api/transform_network.go b/pkg/api/transform_network.go index 8c60c0aa8..baf922230 100644 --- a/pkg/api/transform_network.go +++ b/pkg/api/transform_network.go @@ -39,13 +39,13 @@ func (tn *TransformNetwork) GetServiceFiles() (string, string) { } const ( - OVN = "ovn" - Multus = "multus" + OVN = "ovn" ) type NetworkTransformKubeConfig struct { - ConfigPath string `yaml:"configPath,omitempty" json:"configPath,omitempty" doc:"path to kubeconfig file (optional)"` - ManagedCNI []string `yaml:"managedCNI,omitempty" json:"managedCNI,omitempty" doc:"a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn, multus"` + ConfigPath string `yaml:"configPath,omitempty" json:"configPath,omitempty" doc:"path to kubeconfig file (optional)"` + SecondaryNetworks []SecondaryNetwork `yaml:"secondaryNetworks,omitempty" json:"secondaryNetworks,omitempty" doc:"configuration for secondary networks"` + ManagedCNI []string `yaml:"managedCNI,omitempty" json:"managedCNI,omitempty" doc:"a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn"` } type TransformNetworkOperationEnum string @@ -84,12 +84,18 @@ type K8sReference struct { } type K8sRule struct { - Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry IP input field"` - MacInput string `yaml:"mac-input,omitempty" json:"mac-input,omitempty" doc:"Optional entry MAC input field"` - Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` - Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"` - LabelsPrefix string `yaml:"labels_prefix,omitempty" json:"labels_prefix,omitempty" doc:"labels prefix to use to copy input lables, if empty labels will not be copied"` - AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"if true the rule will add the zone"` + IPField string `yaml:"ipField,omitempty" json:"ipField,omitempty" doc:"entry IP input field"` + InterfacesField string `yaml:"interfacesField,omitempty" json:"interfacesField,omitempty" doc:"entry Interfaces input field"` + MACField string `yaml:"macField,omitempty" json:"macField,omitempty" doc:"entry MAC input field"` + Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` + Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"` + LabelsPrefix string `yaml:"labels_prefix,omitempty" json:"labels_prefix,omitempty" doc:"labels prefix to use to copy input lables, if empty labels will not be copied"` + AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"if true the rule will add the zone"` +} + +type SecondaryNetwork struct { + Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"name of the secondary network, as mentioned in the annotation 'k8s.v1.cni.cncf.io/network-status'"` + Index map[string]any `yaml:"index,omitempty" json:"index,omitempty" doc:"fields to use for indexing, must be any combination of 'mac', 'ip', 'interface'"` } type NetworkGenericRule struct { diff --git a/pkg/confgen/dedup_test.go b/pkg/confgen/dedup_test.go index 70bddae15..e7a59367f 100644 --- a/pkg/confgen/dedup_test.go +++ b/pkg/confgen/dedup_test.go @@ -26,15 +26,15 @@ import ( func Test_dedupeNetworkTransformRules(t *testing.T) { slice := api.NetworkTransformRules{ - api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i1", Output: "o1"}}, - api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}}, - api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i3", Output: "o3"}}, - api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i1", Output: "o1"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i2", Output: "o2"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i3", Output: "o3"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i2", Output: "o2"}}, } expected := api.NetworkTransformRules{ - api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i1", Output: "o1"}}, - api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}}, - api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i3", Output: "o3"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i1", Output: "o1"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i2", Output: "o2"}}, + api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i3", Output: "o3"}}, } actual := dedupeNetworkTransformRules(slice) diff --git a/pkg/config/pipeline_builder_test.go b/pkg/config/pipeline_builder_test.go index 8b10142fb..2f8d8d357 100644 --- a/pkg/config/pipeline_builder_test.go +++ b/pkg/config/pipeline_builder_test.go @@ -31,14 +31,14 @@ func TestLokiPipeline(t *testing.T) { pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{ Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "SrcAddr", - Output: "SrcK8S", + IPField: "SrcAddr", + Output: "SrcK8S", }, }, { Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "DstAddr", - Output: "DstK8S", + IPField: "DstAddr", + Output: "DstK8S", }, }}}) pl = pl.WriteLoki("loki", api.WriteLoki{URL: "http://loki:3100/"}) @@ -58,7 +58,7 @@ func TestLokiPipeline(t *testing.T) { b, err = json.Marshal(params[1]) require.NoError(t, err) - require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b)) + require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"ipField":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"ipField":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b)) b, err = json.Marshal(params[2]) require.NoError(t, err) @@ -206,14 +206,14 @@ func TestIPFIXPipeline(t *testing.T) { pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{ Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "SrcAddr", - Output: "SrcK8S", + IPField: "SrcAddr", + Output: "SrcK8S", }, }, { Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "DstAddr", - Output: "DstK8S", + IPField: "DstAddr", + Output: "DstK8S", }, }}}) pl = pl.WriteIpfix("ipfix", api.WriteIpfix{ @@ -238,7 +238,7 @@ func TestIPFIXPipeline(t *testing.T) { b, err = json.Marshal(params[1]) require.NoError(t, err) - require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b)) + require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"ipField":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"ipField":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b)) b, err = json.Marshal(params[2]) require.NoError(t, err) diff --git a/pkg/operational/metrics.go b/pkg/operational/metrics.go index 44488f232..d0008aa3f 100644 --- a/pkg/operational/metrics.go +++ b/pkg/operational/metrics.go @@ -80,6 +80,15 @@ var ( TypeHistogram, "stage", ) + indexerHit = DefineMetric( + "secondary_network_indexer_hit", + "Counter of hits per secondary network index for Kubernetes enrichment", + TypeCounter, + "kind", + "namespace", + "network", + "warning", + ) ) func (def *MetricDefinition) mapLabels(labels []string) prometheus.Labels { @@ -241,6 +250,10 @@ func (o *Metrics) GetOrCreateStageDurationHisto() *prometheus.HistogramVec { return o.stageDurationHisto } +func (o *Metrics) CreateIndexerHitCounter() *prometheus.CounterVec { + return o.NewCounterVec(&indexerHit) +} + func GetDocumentation() string { doc := "" sort.Slice(allMetrics, func(i, j int) bool { diff --git a/pkg/pipeline/pipeline_builder.go b/pkg/pipeline/pipeline_builder.go index dfa10cce8..cfffc894f 100644 --- a/pkg/pipeline/pipeline_builder.go +++ b/pkg/pipeline/pipeline_builder.go @@ -434,7 +434,7 @@ func getWriter(opMetrics *operational.Metrics, params config.StageParam) (write. return writer, err } -func getTransformer(_ *operational.Metrics, params config.StageParam) (transform.Transformer, error) { +func getTransformer(opMetrics *operational.Metrics, params config.StageParam) (transform.Transformer, error) { var transformer transform.Transformer var err error switch params.Transform.Type { @@ -443,7 +443,7 @@ func getTransformer(_ *operational.Metrics, params config.StageParam) (transform case api.FilterType: transformer, err = transform.NewTransformFilter(params) case api.NetworkType: - transformer, err = transform.NewTransformNetwork(params) + transformer, err = transform.NewTransformNetwork(params, opMetrics) case api.NoneType: transformer, err = transform.NewTransformNone() default: diff --git a/pkg/pipeline/transform/kubernetes/cni/cni.go b/pkg/pipeline/transform/kubernetes/cni/cni.go index ef787a74d..05533886d 100644 --- a/pkg/pipeline/transform/kubernetes/cni/cni.go +++ b/pkg/pipeline/transform/kubernetes/cni/cni.go @@ -6,5 +6,4 @@ import ( type Plugin interface { GetNodeIPs(node *v1.Node) []string - GetPodIPsAndMACs(pod *v1.Pod) ([]string, []string) } diff --git a/pkg/pipeline/transform/kubernetes/cni/multus.go b/pkg/pipeline/transform/kubernetes/cni/multus.go index 2fd073377..b37d45ea9 100644 --- a/pkg/pipeline/transform/kubernetes/cni/multus.go +++ b/pkg/pipeline/transform/kubernetes/cni/multus.go @@ -5,59 +5,130 @@ import ( "fmt" "strings" - log "github.com/sirupsen/logrus" + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" v1 "k8s.io/api/core/v1" ) const ( statusAnnotation = "k8s.v1.cni.cncf.io/network-status" + // Index names + indexIP = "ip" + indexMAC = "mac" + indexInterface = "interface" ) -type MultusPlugin struct { - Plugin +type MultusHandler struct { } -func (m *MultusPlugin) GetNodeIPs(_ *v1.Node) []string { - // No CNI-specific logic needed for pods - return nil +type SecondaryNetKey struct { + NetworkName string + Key string } -func (m *MultusPlugin) GetPodIPsAndMACs(pod *v1.Pod) ([]string, []string) { - // Cf https://k8snetworkplumbingwg.github.io/multus-cni/docs/quickstart.html#network-status-annotations - ips, macs, err := extractNetStatusIPsAndMACs(pod.Annotations) - if err != nil { - // Log the error as Info, do not block other ips indexing - log.Infof("failed to index IPs from network-status annotation: %v", err) +func (m *MultusHandler) BuildKeys(flow config.GenericMap, rule *api.K8sRule, secNets []api.SecondaryNetwork) []SecondaryNetKey { + if len(secNets) == 0 { + return nil + } + var keys []SecondaryNetKey + for _, sn := range secNets { + snKeys := m.buildSNKeys(flow, rule, &sn) + if snKeys != nil { + keys = append(keys, snKeys...) + } } - log.Tracef("GetPodIPsAndMACs found ips: %v macs: %v for pod %s", ips, macs, pod.Name) - return ips, macs + return keys } -type netStatItem struct { - IPs []string `json:"ips"` - MAC string `json:"mac"` -} +func (m *MultusHandler) buildSNKeys(flow config.GenericMap, rule *api.K8sRule, sn *api.SecondaryNetwork) []SecondaryNetKey { + var keys []SecondaryNetKey -func extractNetStatusIPsAndMACs(annotations map[string]string) ([]string, []string, error) { - if statusAnnotationJSON, ok := annotations[statusAnnotation]; ok { - var ips, macs []string - var networks []netStatItem - err := json.Unmarshal([]byte(statusAnnotationJSON), &networks) - if err == nil { - for _, network := range networks { - if len(network.IPs) > 0 { - ips = append(ips, network.IPs...) - } + var ip, mac string + var interfaces []string + if _, ok := sn.Index[indexIP]; ok && len(rule.IPField) > 0 { + ip, ok = flow.LookupString(rule.IPField) + if !ok { + return nil + } + } + if _, ok := sn.Index[indexMAC]; ok && len(rule.MACField) > 0 { + mac, ok = flow.LookupString(rule.MACField) + if !ok { + return nil + } + } + if _, ok := sn.Index[indexInterface]; ok && len(rule.InterfacesField) > 0 { + v, ok := flow[rule.InterfacesField] + if !ok { + return nil + } + interfaces, ok = v.([]string) + if !ok { + return nil + } + } - if len(network.MAC) > 0 { - macs = append(macs, strings.ToUpper(network.MAC)) + macIP := "~" + ip + "~" + mac + if interfaces == nil { + return []SecondaryNetKey{{NetworkName: sn.Name, Key: macIP}} + } + for _, intf := range interfaces { + keys = append(keys, SecondaryNetKey{NetworkName: sn.Name, Key: intf + macIP}) + } + + return keys +} + +func (m *MultusHandler) GetPodUniqueKeys(pod *v1.Pod, secNets []api.SecondaryNetwork) ([]string, error) { + if len(secNets) == 0 { + return nil, nil + } + // Cf https://k8snetworkplumbingwg.github.io/multus-cni/docs/quickstart.html#network-status-annotations + if statusAnnotationJSON, ok := pod.Annotations[statusAnnotation]; ok { + var networks []NetStatItem + if err := json.Unmarshal([]byte(statusAnnotationJSON), &networks); err != nil { + return nil, fmt.Errorf("failed to index from network-status annotation, cannot read annotation %s: %w", statusAnnotation, err) + } + var keys []string + for _, network := range networks { + for _, snConfig := range secNets { + if snConfig.Name == network.Name { + keys = append(keys, network.Keys(snConfig)...) } } - return ips, macs, nil } - - return nil, nil, fmt.Errorf("cannot read annotation %s: %w", statusAnnotation, err) + return keys, nil } // Annotation not present => just ignore, no error - return nil, nil, nil + return nil, nil +} + +type NetStatItem struct { + Name string `json:"name"` + Interface string `json:"interface"` + IPs []string `json:"ips"` + MAC string `json:"mac"` +} + +func (n *NetStatItem) Keys(snConfig api.SecondaryNetwork) []string { + var mac, intf string + if _, ok := snConfig.Index[indexMAC]; ok { + mac = n.MAC + } + if _, ok := snConfig.Index[indexInterface]; ok { + intf = n.Interface + } + if _, ok := snConfig.Index[indexIP]; ok { + var keys []string + for _, ip := range n.IPs { + keys = append(keys, key(intf, ip, mac)) + } + return keys + } + // Ignore IP + return []string{key(intf, "", mac)} +} + +func key(intf, ip, mac string) string { + return intf + "~" + ip + "~" + strings.ToUpper(mac) } diff --git a/pkg/pipeline/transform/kubernetes/cni/multus_test.go b/pkg/pipeline/transform/kubernetes/cni/multus_test.go index 69772b00e..88481d46e 100644 --- a/pkg/pipeline/transform/kubernetes/cni/multus_test.go +++ b/pkg/pipeline/transform/kubernetes/cni/multus_test.go @@ -3,27 +3,38 @@ package cni import ( "testing" + "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestExtractNetStatusIPs(t *testing.T) { - // Annotation not found => no error, no ip - ip, mac, err := extractNetStatusIPsAndMACs(map[string]string{}) +var ( + multusHandler = MultusHandler{} + pod = v1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{}}} + secondaryNetConfig = []api.SecondaryNetwork{ + { + Name: "macvlan-conf", + Index: map[string]any{"mac": nil}, + }, + } +) + +func TestExtractNetStatusKeys(t *testing.T) { + // Annotation not found => no error, no key + keys, err := multusHandler.GetPodUniqueKeys(&pod, secondaryNetConfig) require.NoError(t, err) - require.Empty(t, ip) - require.Empty(t, mac) + require.Empty(t, keys) - // Annotation malformed => error, no ip - ip, mac, err = extractNetStatusIPsAndMACs(map[string]string{ - statusAnnotation: "whatever", - }) + // Annotation malformed => error, no key + pod.Annotations = map[string]string{statusAnnotation: "whatever"} + keys, err = multusHandler.GetPodUniqueKeys(&pod, secondaryNetConfig) require.Error(t, err) require.Contains(t, err.Error(), "cannot read annotation") - require.Empty(t, ip) - require.Empty(t, mac) + require.Empty(t, keys) - // Valid annotation => no error, ip - ip, mac, err = extractNetStatusIPsAndMACs(map[string]string{ + // Valid annotation => no error, key + pod.Annotations = map[string]string{ statusAnnotation: ` [{ "name": "cbr0", @@ -31,6 +42,7 @@ func TestExtractNetStatusIPs(t *testing.T) { "10.244.1.73" ], "default": true, + "mac": "aa:aa:96:ff:aa:aa", "dns": {} },{ "name": "macvlan-conf", @@ -42,9 +54,14 @@ func TestExtractNetStatusIPs(t *testing.T) { "dns": {} }] `, - }) + } + keys, err = multusHandler.GetPodUniqueKeys(&pod, secondaryNetConfig) require.NoError(t, err) - require.Equal(t, []string{"10.244.1.73", "192.168.1.205"}, ip) - require.Equal(t, []string{"86:1D:96:FF:55:0D"}, mac) + require.Equal(t, []string{"~~86:1D:96:FF:55:0D"}, keys) + // Composed key + secondaryNetConfig[0].Index = map[string]any{"mac": nil, "ip": nil, "interface": nil} + keys, err = multusHandler.GetPodUniqueKeys(&pod, secondaryNetConfig) + require.NoError(t, err) + require.Equal(t, []string{"net1~192.168.1.205~86:1D:96:FF:55:0D"}, keys) } diff --git a/pkg/pipeline/transform/kubernetes/cni/ovn_kubernetes.go b/pkg/pipeline/transform/kubernetes/cni/ovn_kubernetes.go index 4ce4fd7c9..ae5701de1 100644 --- a/pkg/pipeline/transform/kubernetes/cni/ovn_kubernetes.go +++ b/pkg/pipeline/transform/kubernetes/cni/ovn_kubernetes.go @@ -47,11 +47,6 @@ func (o *OVNPlugin) GetNodeIPs(node *v1.Node) []string { return nil } -func (o *OVNPlugin) GetPodIPsAndMACs(_ *v1.Pod) ([]string, []string) { - // No CNI-specific logic needed for pods - return nil, nil -} - func unmarshalOVNAnnotation(annot []byte) (string, error) { // Depending on OVN (OCP) version, the annotation might be JSON-encoded as a string (legacy), or an array of strings var subnetsAsArray map[string][]string diff --git a/pkg/pipeline/transform/kubernetes/enrich.go b/pkg/pipeline/transform/kubernetes/enrich.go index 702e913cb..899e2f2ca 100644 --- a/pkg/pipeline/transform/kubernetes/enrich.go +++ b/pkg/pipeline/transform/kubernetes/enrich.go @@ -5,6 +5,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" inf "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/informers" "github.com/sirupsen/logrus" ) @@ -16,23 +17,19 @@ func MockInformers() { informers = inf.NewInformersMock() } -func InitFromConfig(config api.NetworkTransformKubeConfig) error { - return informers.InitFromConfig(config) +func InitFromConfig(config api.NetworkTransformKubeConfig, opMetrics *operational.Metrics) error { + return informers.InitFromConfig(config, opMetrics) } func Enrich(outputEntry config.GenericMap, rule *api.K8sRule) { - ip, _ := outputEntry.LookupString(rule.Input) - var mac string - if len(rule.MacInput) > 0 { - mac, _ = outputEntry.LookupString(rule.MacInput) - } - logrus.Tracef("Enrich IP %s MAC %s", ip, mac) - if ip == "" && mac == "" { + ip, ok := outputEntry.LookupString(rule.IPField) + if !ok { return } - kubeInfo, err := informers.GetInfo(ip, mac) + potentialKeys := informers.BuildSecondaryNetworkKeys(outputEntry, rule) + kubeInfo, err := informers.GetInfo(potentialKeys, ip) if err != nil { - logrus.WithError(err).Tracef("can't find kubernetes info for IP %v", outputEntry[rule.Input]) + logrus.WithError(err).Tracef("can't find kubernetes info for keys %v and IP %s", potentialKeys, ip) return } if rule.Assignee != "otel" { @@ -45,6 +42,7 @@ func Enrich(outputEntry config.GenericMap, rule *api.K8sRule) { outputEntry[rule.Output+"_Type"] = kubeInfo.Type outputEntry[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name outputEntry[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type + outputEntry[rule.Output+"_NetworkName"] = kubeInfo.NetworkName if rule.LabelsPrefix != "" { for labelKey, labelValue := range kubeInfo.Labels { outputEntry[rule.LabelsPrefix+"_"+labelKey] = labelValue diff --git a/pkg/pipeline/transform/kubernetes/enrich_test.go b/pkg/pipeline/transform/kubernetes/enrich_test.go index 634e631ae..3a2566400 100644 --- a/pkg/pipeline/transform/kubernetes/enrich_test.go +++ b/pkg/pipeline/transform/kubernetes/enrich_test.go @@ -18,37 +18,41 @@ var ipInfo = map[string]*inf.Info{ Name: "pod-1", Namespace: "ns-1", }, - Type: "Pod", - HostName: "host-1", - HostIP: "100.0.0.1", + Type: "Pod", + HostName: "host-1", + HostIP: "100.0.0.1", + NetworkName: "primary", }, "10.0.0.2": { ObjectMeta: v1.ObjectMeta{ Name: "pod-2", Namespace: "ns-2", }, - Type: "Pod", - HostName: "host-2", - HostIP: "100.0.0.2", + Type: "Pod", + HostName: "host-2", + HostIP: "100.0.0.2", + NetworkName: "primary", }, "20.0.0.1": { ObjectMeta: v1.ObjectMeta{ Name: "service-1", Namespace: "ns-1", }, - Type: "Service", + Type: "Service", + NetworkName: "primary", }, } -var macInfo = map[string]*inf.Info{ - "AA:BB:CC:DD:EE:FF": { +var customKeysInfo = map[string]*inf.Info{ + "~~AA:BB:CC:DD:EE:FF": { ObjectMeta: v1.ObjectMeta{ Name: "pod-1", Namespace: "ns-1", }, - Type: "Pod", - HostName: "host-1", - HostIP: "100.0.0.1", + Type: "Pod", + HostName: "host-1", + HostIP: "100.0.0.1", + NetworkName: "custom-network", }, } @@ -60,7 +64,8 @@ var nodes = map[string]*inf.Info{ nodeZoneLabelName: "us-east-1a", }, }, - Type: "Node", + Type: "Node", + NetworkName: "primary", }, "host-2": { ObjectMeta: v1.ObjectMeta{ @@ -69,7 +74,8 @@ var nodes = map[string]*inf.Info{ nodeZoneLabelName: "us-east-1b", }, }, - Type: "Node", + Type: "Node", + NetworkName: "primary", }, } @@ -77,8 +83,8 @@ var rules = api.NetworkTransformRules{ { Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "SrcAddr", - MacInput: "SrcMAC", + IPField: "SrcAddr", + MACField: "SrcMAC", Output: "SrcK8s", AddZone: true, }, @@ -86,8 +92,8 @@ var rules = api.NetworkTransformRules{ { Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "DstAddr", - MacInput: "DstMAC", + IPField: "DstAddr", + MACField: "DstMAC", Output: "DstK8s", AddZone: true, }, @@ -95,7 +101,7 @@ var rules = api.NetworkTransformRules{ } func TestEnrich(t *testing.T) { - informers = inf.SetupStubs(ipInfo, macInfo, nodes) + informers = inf.SetupStubs(ipInfo, customKeysInfo, nodes) // Pod to unknown entry := config.GenericMap{ @@ -106,16 +112,17 @@ func TestEnrich(t *testing.T) { Enrich(entry, r.Kubernetes) } assert.Equal(t, config.GenericMap{ - "DstAddr": "42.42.42.42", - "SrcAddr": "10.0.0.1", - "SrcK8s_HostIP": "100.0.0.1", - "SrcK8s_HostName": "host-1", - "SrcK8s_Name": "pod-1", - "SrcK8s_Namespace": "ns-1", - "SrcK8s_OwnerName": "", - "SrcK8s_OwnerType": "", - "SrcK8s_Type": "Pod", - "SrcK8s_Zone": "us-east-1a", + "DstAddr": "42.42.42.42", + "SrcAddr": "10.0.0.1", + "SrcK8s_HostIP": "100.0.0.1", + "SrcK8s_HostName": "host-1", + "SrcK8s_Name": "pod-1", + "SrcK8s_Namespace": "ns-1", + "SrcK8s_OwnerName": "", + "SrcK8s_OwnerType": "", + "SrcK8s_Type": "Pod", + "SrcK8s_Zone": "us-east-1a", + "SrcK8s_NetworkName": "primary", }, entry) // Pod to pod @@ -127,24 +134,26 @@ func TestEnrich(t *testing.T) { Enrich(entry, r.Kubernetes) } assert.Equal(t, config.GenericMap{ - "DstAddr": "10.0.0.2", - "DstK8s_HostIP": "100.0.0.2", - "DstK8s_HostName": "host-2", - "DstK8s_Name": "pod-2", - "DstK8s_Namespace": "ns-2", - "DstK8s_OwnerName": "", - "DstK8s_OwnerType": "", - "DstK8s_Type": "Pod", - "DstK8s_Zone": "us-east-1b", - "SrcAddr": "10.0.0.1", - "SrcK8s_HostIP": "100.0.0.1", - "SrcK8s_HostName": "host-1", - "SrcK8s_Name": "pod-1", - "SrcK8s_Namespace": "ns-1", - "SrcK8s_OwnerName": "", - "SrcK8s_OwnerType": "", - "SrcK8s_Type": "Pod", - "SrcK8s_Zone": "us-east-1a", + "DstAddr": "10.0.0.2", + "DstK8s_HostIP": "100.0.0.2", + "DstK8s_HostName": "host-2", + "DstK8s_Name": "pod-2", + "DstK8s_Namespace": "ns-2", + "DstK8s_OwnerName": "", + "DstK8s_OwnerType": "", + "DstK8s_Type": "Pod", + "DstK8s_Zone": "us-east-1b", + "DstK8s_NetworkName": "primary", + "SrcAddr": "10.0.0.1", + "SrcK8s_HostIP": "100.0.0.1", + "SrcK8s_HostName": "host-1", + "SrcK8s_Name": "pod-1", + "SrcK8s_Namespace": "ns-1", + "SrcK8s_OwnerName": "", + "SrcK8s_OwnerType": "", + "SrcK8s_Type": "Pod", + "SrcK8s_Zone": "us-east-1a", + "SrcK8s_NetworkName": "primary", }, entry) // Pod to service @@ -156,21 +165,23 @@ func TestEnrich(t *testing.T) { Enrich(entry, r.Kubernetes) } assert.Equal(t, config.GenericMap{ - "DstAddr": "20.0.0.1", - "DstK8s_Name": "service-1", - "DstK8s_Namespace": "ns-1", - "DstK8s_OwnerName": "", - "DstK8s_OwnerType": "", - "DstK8s_Type": "Service", - "SrcAddr": "10.0.0.2", - "SrcK8s_HostIP": "100.0.0.2", - "SrcK8s_HostName": "host-2", - "SrcK8s_Name": "pod-2", - "SrcK8s_Namespace": "ns-2", - "SrcK8s_OwnerName": "", - "SrcK8s_OwnerType": "", - "SrcK8s_Type": "Pod", - "SrcK8s_Zone": "us-east-1b", + "DstAddr": "20.0.0.1", + "DstK8s_Name": "service-1", + "DstK8s_Namespace": "ns-1", + "DstK8s_OwnerName": "", + "DstK8s_OwnerType": "", + "DstK8s_Type": "Service", + "DstK8s_NetworkName": "primary", + "SrcAddr": "10.0.0.2", + "SrcK8s_HostIP": "100.0.0.2", + "SrcK8s_HostName": "host-2", + "SrcK8s_Name": "pod-2", + "SrcK8s_Namespace": "ns-2", + "SrcK8s_OwnerName": "", + "SrcK8s_OwnerType": "", + "SrcK8s_Type": "Pod", + "SrcK8s_Zone": "us-east-1b", + "SrcK8s_NetworkName": "primary", }, entry) } @@ -178,7 +189,7 @@ var otelRules = api.NetworkTransformRules{ { Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "source.ip", + IPField: "source.ip", Output: "source.", Assignee: "otel", AddZone: true, @@ -187,7 +198,7 @@ var otelRules = api.NetworkTransformRules{ { Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ - Input: "destination.ip", + IPField: "destination.ip", Output: "destination.", Assignee: "otel", AddZone: true, @@ -196,7 +207,7 @@ var otelRules = api.NetworkTransformRules{ } func TestEnrich_Otel(t *testing.T) { - informers = inf.SetupStubs(ipInfo, macInfo, nodes) + informers = inf.SetupStubs(ipInfo, customKeysInfo, nodes) // Pod to unknown entry := config.GenericMap{ @@ -286,7 +297,7 @@ func TestEnrich_Otel(t *testing.T) { } func TestEnrich_EmptyNamespace(t *testing.T) { - informers = inf.SetupStubs(ipInfo, macInfo, nodes) + informers = inf.SetupStubs(ipInfo, customKeysInfo, nodes) // We need to check that, whether it returns NotFound or just an empty namespace, // there is no map entry for that namespace (an empty-valued map entry is not valid) @@ -419,27 +430,32 @@ func TestEnrichLayer(t *testing.T) { } func TestEnrichUsingMac(t *testing.T) { - informers = inf.SetupStubs(ipInfo, macInfo, nodes) + informers = inf.SetupStubs(ipInfo, customKeysInfo, nodes) // Pod to unknown using MAC entry := config.GenericMap{ - "SrcMAC": "AA:BB:CC:DD:EE:FF", // pod-1 - "DstMAC": "GG:HH:II:JJ:KK:LL", // unknown + "SrcAddr": "8.8.8.8", + "SrcMAC": "AA:BB:CC:DD:EE:FF", // pod-1 + "DstAddr": "9.9.9.9", + "DstMAC": "GG:HH:II:JJ:KK:LL", // unknown } for _, r := range rules { Enrich(entry, r.Kubernetes) } assert.Equal(t, config.GenericMap{ - "DstMAC": "GG:HH:II:JJ:KK:LL", - "SrcMAC": "AA:BB:CC:DD:EE:FF", - "SrcK8s_HostIP": "100.0.0.1", - "SrcK8s_HostName": "host-1", - "SrcK8s_Name": "pod-1", - "SrcK8s_Namespace": "ns-1", - "SrcK8s_OwnerName": "", - "SrcK8s_OwnerType": "", - "SrcK8s_Type": "Pod", - "SrcK8s_Zone": "us-east-1a", + "SrcAddr": "8.8.8.8", + "SrcMAC": "AA:BB:CC:DD:EE:FF", + "DstAddr": "9.9.9.9", + "DstMAC": "GG:HH:II:JJ:KK:LL", + "SrcK8s_HostIP": "100.0.0.1", + "SrcK8s_HostName": "host-1", + "SrcK8s_Name": "pod-1", + "SrcK8s_Namespace": "ns-1", + "SrcK8s_OwnerName": "", + "SrcK8s_OwnerType": "", + "SrcK8s_Type": "Pod", + "SrcK8s_Zone": "us-east-1a", + "SrcK8s_NetworkName": "custom-network", }, entry) // remove the MAC rules and retry @@ -448,7 +464,7 @@ func TestEnrichUsingMac(t *testing.T) { "DstMAC": "GG:HH:II:JJ:KK:LL", // unknown } for _, r := range rules { - r.Kubernetes.MacInput = "" + r.Kubernetes.MACField = "" Enrich(entry, r.Kubernetes) } assert.Equal(t, config.GenericMap{ diff --git a/pkg/pipeline/transform/kubernetes/informers/informers-mock.go b/pkg/pipeline/transform/kubernetes/informers/informers-mock.go index 273633dc0..513d32ecc 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers-mock.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers-mock.go @@ -4,11 +4,23 @@ import ( "errors" "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" "github.com/stretchr/testify/mock" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" ) +var ( + secondaryNetConfig = []api.SecondaryNetwork{ + { + Name: "my-network", + Index: map[string]any{"mac": nil}, + }, + } +) + type Mock struct { mock.Mock InformersInterface @@ -16,12 +28,12 @@ type Mock struct { func NewInformersMock() *Mock { inf := new(Mock) - inf.On("InitFromConfig", mock.Anything).Return(nil) + inf.On("InitFromConfig", mock.Anything, mock.Anything).Return(nil) return inf } -func (o *Mock) InitFromConfig(cfg api.NetworkTransformKubeConfig) error { - args := o.Called(cfg) +func (o *Mock) InitFromConfig(cfg api.NetworkTransformKubeConfig, opMetrics *operational.Metrics) error { + args := o.Called(cfg, opMetrics) return args.Error(0) } @@ -56,7 +68,7 @@ func (m *InformerMock) GetIndexer() cache.Indexer { return args.Get(0).(cache.Indexer) } -func (m *IndexerMock) MockPod(ip, mac, name, namespace, nodeIP string, owner *Owner) { +func (m *IndexerMock) MockPod(ip, mac, intf, name, namespace, nodeIP string, owner *Owner) { var ownerRef []metav1.OwnerReference if owner != nil { ownerRef = []metav1.OwnerReference{{ @@ -71,13 +83,18 @@ func (m *IndexerMock) MockPod(ip, mac, name, namespace, nodeIP string, owner *Ow Namespace: namespace, OwnerReferences: ownerRef, }, - HostIP: nodeIP, - ips: []string{}, - macs: []string{}, + HostIP: nodeIP, + ips: []string{}, + secondaryNetKeys: []string{}, } if len(mac) > 0 { - info.macs = []string{mac} - m.On("ByIndex", IndexMAC, mac).Return([]interface{}{&info}, nil) + nsi := cni.NetStatItem{ + Interface: intf, + MAC: mac, + IPs: []string{ip}, + } + info.secondaryNetKeys = nsi.Keys(secondaryNetConfig[0]) + m.On("ByIndex", IndexCustom, info.secondaryNetKeys[0]).Return([]interface{}{&info}, nil) } if len(ip) > 0 { info.ips = []string{ip} @@ -90,7 +107,6 @@ func (m *IndexerMock) MockNode(ip, name string) { Type: "Node", ObjectMeta: metav1.ObjectMeta{Name: name}, ips: []string{ip}, - macs: []string{}, }}, nil) } @@ -99,7 +115,6 @@ func (m *IndexerMock) MockService(ip, name, namespace string) { Type: "Service", ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, ips: []string{ip}, - macs: []string{}, }}, nil) } @@ -143,26 +158,26 @@ func SetupIndexerMocks(kd *Informers) (pods, nodes, svc, rs *IndexerMock) { type FakeInformers struct { InformersInterface - ipInfo map[string]*Info - macInfo map[string]*Info - nodes map[string]*Info + ipInfo map[string]*Info + customKeysInfo map[string]*Info + nodes map[string]*Info } -func SetupStubs(ipInfo map[string]*Info, macInfo map[string]*Info, nodes map[string]*Info) *FakeInformers { +func SetupStubs(ipInfo map[string]*Info, customKeysInfo map[string]*Info, nodes map[string]*Info) *FakeInformers { return &FakeInformers{ - ipInfo: ipInfo, - macInfo: macInfo, - nodes: nodes, + ipInfo: ipInfo, + customKeysInfo: customKeysInfo, + nodes: nodes, } } -func (f *FakeInformers) InitFromConfig(_ api.NetworkTransformKubeConfig) error { +func (f *FakeInformers) InitFromConfig(_ api.NetworkTransformKubeConfig, _ *operational.Metrics) error { return nil } -func (f *FakeInformers) GetInfo(ip string, mac string) (*Info, error) { - if len(mac) > 0 { - i := f.macInfo[mac] +func (f *FakeInformers) GetInfo(keys []cni.SecondaryNetKey, ip string) (*Info, error) { + if len(keys) > 0 { + i := f.customKeysInfo[keys[0].Key] if i != nil { return i, nil } @@ -175,6 +190,11 @@ func (f *FakeInformers) GetInfo(ip string, mac string) (*Info, error) { return nil, errors.New("notFound") } +func (f *FakeInformers) BuildSecondaryNetworkKeys(flow config.GenericMap, rule *api.K8sRule) []cni.SecondaryNetKey { + m := cni.MultusHandler{} + return m.BuildKeys(flow, rule, secondaryNetConfig) +} + func (f *FakeInformers) GetNodeInfo(n string) (*Info, error) { i := f.nodes[n] if i != nil { diff --git a/pkg/pipeline/transform/kubernetes/informers/informers.go b/pkg/pipeline/transform/kubernetes/informers/informers.go index 11a195099..2c7a4a4ce 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers.go @@ -20,14 +20,16 @@ package informers import ( "fmt" "net" - "strings" "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" "github.com/netobserv/flowlogs-pipeline/pkg/utils" "github.com/sirupsen/logrus" + "github.com/prometheus/client_golang/prometheus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -41,24 +43,27 @@ import ( const ( kubeConfigEnvVariable = "KUBECONFIG" syncTime = 10 * time.Minute + IndexCustom = "byCustomKey" IndexIP = "byIP" - IndexMAC = "byMAC" TypeNode = "Node" TypePod = "Pod" TypeService = "Service" ) -var log = logrus.WithField("component", "transform.Network.Kubernetes") -var cniPlugins = map[string]cni.Plugin{ - api.OVN: &cni.OVNPlugin{}, - api.Multus: &cni.MultusPlugin{}, -} +var ( + log = logrus.WithField("component", "transform.Network.Kubernetes") + cniPlugins = map[string]cni.Plugin{ + api.OVN: &cni.OVNPlugin{}, + } + multus = cni.MultusHandler{} +) //nolint:revive type InformersInterface interface { - GetInfo(string, string) (*Info, error) + BuildSecondaryNetworkKeys(flow config.GenericMap, rule *api.K8sRule) []cni.SecondaryNetKey + GetInfo([]cni.SecondaryNetKey, string) (*Info, error) GetNodeInfo(string) (*Info, error) - InitFromConfig(api.NetworkTransformKubeConfig) error + InitFromConfig(api.NetworkTransformKubeConfig, *operational.Metrics) error } type Informers struct { @@ -68,9 +73,12 @@ type Informers struct { nodes cache.SharedIndexInformer services cache.SharedIndexInformer // replicaSets caches the ReplicaSets as partially-filled *ObjectMeta pointers - replicaSets cache.SharedIndexInformer - stopChan chan struct{} - mdStopChan chan struct{} + replicaSets cache.SharedIndexInformer + stopChan chan struct{} + mdStopChan chan struct{} + managedCNI []string + secondaryNetworks []api.SecondaryNetwork + indexerHitMetric *prometheus.CounterVec } type Owner struct { @@ -86,25 +94,30 @@ type Owner struct { type Info struct { // Informers need that internal object is an ObjectMeta instance metav1.ObjectMeta - Type string - Owner Owner - HostName string - HostIP string - ips []string - macs []string + Type string + Owner Owner + HostName string + HostIP string + NetworkName string + ips []string + secondaryNetKeys []string } -var commonIndexers = map[string]cache.IndexFunc{ - IndexIP: func(obj interface{}) ([]string, error) { +var ( + ipIndexer = func(obj interface{}) ([]string, error) { return obj.(*Info).ips, nil - }, - IndexMAC: func(obj interface{}) ([]string, error) { - return obj.(*Info).macs, nil - }, + } + customKeyIndexer = func(obj interface{}) ([]string, error) { + return obj.(*Info).secondaryNetKeys, nil + } +) + +func (k *Informers) BuildSecondaryNetworkKeys(flow config.GenericMap, rule *api.K8sRule) []cni.SecondaryNetKey { + return multus.BuildKeys(flow, rule, k.secondaryNetworks) } -func (k *Informers) GetInfo(ip string, mac string) (*Info, error) { - if info, ok := k.fetchInformers(ip, mac); ok { +func (k *Informers) GetInfo(potentialKeys []cni.SecondaryNetKey, ip string) (*Info, error) { + if info, ok := k.fetchInformers(potentialKeys, ip); ok { // Owner data might be discovered after the owned, so we fetch it // at the last moment if info.Owner.Name == "" { @@ -116,46 +129,80 @@ func (k *Informers) GetInfo(ip string, mac string) (*Info, error) { return nil, fmt.Errorf("informers can't find IP %s", ip) } -func (k *Informers) fetchInformers(ip string, mac string) (*Info, bool) { - if info, ok := infoForIPOrMac(k.pods.GetIndexer(), ip, mac); ok { +func (k *Informers) fetchInformers(potentialKeys []cni.SecondaryNetKey, ip string) (*Info, bool) { + if info, ok := k.fetchPodInformer(potentialKeys, ip); ok { // it might happen that the Host is discovered after the Pod if info.HostName == "" { info.HostName = k.getHostName(info.HostIP) } return info, true } - if info, ok := infoForIPOrMac(k.nodes.GetIndexer(), ip, mac); ok { + // Nodes are only indexed by IP + if info, ok := k.infoForIP(k.nodes.GetIndexer(), "Node", ip); ok { return info, true } - if info, ok := infoForIPOrMac(k.services.GetIndexer(), ip, mac); ok { + // Services are only indexed by IP + if info, ok := k.infoForIP(k.services.GetIndexer(), "Service", ip); ok { return info, true } return nil, false } -func infoForIPOrMac(idx cache.Indexer, ip string, mac string) (*Info, bool) { - // check for mac first - if len(mac) > 0 { - objs, err := idx.ByIndex(IndexMAC, strings.ToUpper(mac)) +func (k *Informers) fetchPodInformer(potentialKeys []cni.SecondaryNetKey, ip string) (*Info, bool) { + // 1. Check if the unique key matches any Pod (secondary networks / multus case) + if info, ok := k.infoForCustomKeys(k.pods.GetIndexer(), "Pod", potentialKeys); ok { + return info, ok + } + // 2. Check if the IP matches any Pod (primary network) + return k.infoForIP(k.pods.GetIndexer(), "Pod", ip) +} + +func (k *Informers) increaseIndexerHits(kind, namespace, network, warn string) { + k.indexerHitMetric.WithLabelValues(kind, namespace, network, warn).Inc() +} + +func (k *Informers) infoForCustomKeys(idx cache.Indexer, kind string, potentialKeys []cni.SecondaryNetKey) (*Info, bool) { + for _, key := range potentialKeys { + objs, err := idx.ByIndex(IndexCustom, key.Key) if err != nil { - log.WithError(err).WithField("mac", mac).Debug("error accessing mac index. Ignoring") + k.increaseIndexerHits(kind, "", key.NetworkName, "informer error") + log.WithError(err).WithField("key", key).Debug("error accessing unique key index, ignoring") return nil, false } if len(objs) > 0 { - log.Tracef("infoForIPOrMac found mac %v", objs[0].(*Info)) - return objs[0].(*Info), true + info := objs[0].(*Info) + info.NetworkName = key.NetworkName + if len(objs) > 1 { + k.increaseIndexerHits(kind, info.Namespace, key.NetworkName, "multiple matches") + log.WithField("key", key).Debugf("found %d objects matching this key, returning first", len(objs)) + } else { + k.increaseIndexerHits(kind, info.Namespace, key.NetworkName, "") + } + log.Tracef("infoForUniqueKey found key %v", info) + return info, true } } + return nil, false +} - // then check for ip +func (k *Informers) infoForIP(idx cache.Indexer, kind string, ip string) (*Info, bool) { objs, err := idx.ByIndex(IndexIP, ip) if err != nil { - log.WithError(err).WithField("ip", ip).Debug("error accessing ip index. Ignoring") + k.increaseIndexerHits(kind, "", "primary", "informer error") + log.WithError(err).WithField("ip", ip).Debug("error accessing IP index, ignoring") return nil, false } if len(objs) > 0 { - log.Tracef("infoForIPOrMac found ip %v", objs[0].(*Info)) - return objs[0].(*Info), true + info := objs[0].(*Info) + info.NetworkName = "primary" + if len(objs) > 1 { + k.increaseIndexerHits(kind, info.Namespace, "primary", "multiple matches") + log.WithField("ip", ip).Debugf("found %d objects matching this IP, returning first", len(objs)) + } else { + k.increaseIndexerHits(kind, info.Namespace, "primary", "") + } + log.Tracef("infoForIP found ip %v", info) + return info, true } return nil, false } @@ -203,14 +250,14 @@ func (k *Informers) getOwner(info *Info) Owner { func (k *Informers) getHostName(hostIP string) string { if hostIP != "" { - if info, ok := infoForIPOrMac(k.nodes.GetIndexer(), hostIP, ""); ok { + if info, ok := k.infoForIP(k.nodes.GetIndexer(), "Node (indirect)", hostIP); ok { return info.Name } } return "" } -func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, managedCNI []string) error { +func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory) error { nodes := informerFactory.Core().V1().Nodes().Informer() // Transform any *v1.Node instance into a *Info instance to save space // in the informer's cache @@ -231,7 +278,7 @@ func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, } } // CNI-dependent logic (must not fail when the CNI is not installed) - for _, name := range managedCNI { + for _, name := range k.managedCNI { if plugin := cniPlugins[name]; plugin != nil { moreIPs := plugin.GetNodeIPs(node) if moreIPs != nil { @@ -242,12 +289,10 @@ func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, return &Info{ ObjectMeta: metav1.ObjectMeta{ - Name: node.Name, - Namespace: "", - Labels: node.Labels, + Name: node.Name, + Labels: node.Labels, }, ips: ips, - macs: []string{}, Type: TypeNode, // We duplicate HostIP and HostName information to simplify later filtering e.g. by // Host IP, where we want to get all the Pod flows by src/dst host, but also the actual @@ -258,14 +303,15 @@ func (k *Informers) initNodeInformer(informerFactory inf.SharedInformerFactory, }); err != nil { return fmt.Errorf("can't set nodes transform: %w", err) } - if err := nodes.AddIndexers(commonIndexers); err != nil { + indexers := cache.Indexers{IndexIP: ipIndexer} + if err := nodes.AddIndexers(indexers); err != nil { return fmt.Errorf("can't add %s indexer to Nodes informer: %w", IndexIP, err) } k.nodes = nodes return nil } -func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, managedCNI []string) error { +func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory) error { pods := informerFactory.Core().V1().Pods().Informer() // Transform any *v1.Pod instance into a *Info instance to save space // in the informer's cache @@ -275,25 +321,19 @@ func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, m return nil, fmt.Errorf("was expecting a Pod. Got: %T", i) } ips := make([]string, 0, len(pod.Status.PodIPs)) - macs := []string{} for _, ip := range pod.Status.PodIPs { // ignoring host-networked Pod IPs if ip.IP != pod.Status.HostIP { ips = append(ips, ip.IP) } } - // CNI-dependent logic (must not fail when the CNI is not installed) - for _, name := range managedCNI { - if plugin := cniPlugins[name]; plugin != nil { - moreIPs, moreMacs := plugin.GetPodIPsAndMACs(pod) - if moreIPs != nil { - ips = append(ips, moreIPs...) - } - if moreMacs != nil { - macs = append(macs, moreMacs...) - } - } + // Index from secondary network info + keys, err := multus.GetPodUniqueKeys(pod, k.secondaryNetworks) + if err != nil { + // Log the error as Info, do not block other ips indexing + log.WithError(err).Infof("Secondary network cannot be identified") } + return &Info{ ObjectMeta: metav1.ObjectMeta{ Name: pod.Name, @@ -301,17 +341,21 @@ func (k *Informers) initPodInformer(informerFactory inf.SharedInformerFactory, m Labels: pod.Labels, OwnerReferences: pod.OwnerReferences, }, - Type: TypePod, - HostIP: pod.Status.HostIP, - HostName: pod.Spec.NodeName, - ips: ips, - macs: macs, + Type: TypePod, + HostIP: pod.Status.HostIP, + HostName: pod.Spec.NodeName, + secondaryNetKeys: keys, + ips: ips, }, nil }); err != nil { return fmt.Errorf("can't set pods transform: %w", err) } - if err := pods.AddIndexers(commonIndexers); err != nil { - return fmt.Errorf("can't add %s indexer to Pods informer: %w", IndexIP, err) + indexers := cache.Indexers{ + IndexIP: ipIndexer, + IndexCustom: customKeyIndexer, + } + if err := pods.AddIndexers(indexers); err != nil { + return fmt.Errorf("can't add indexers to Pods informer: %w", err) } k.pods = pods @@ -342,12 +386,12 @@ func (k *Informers) initServiceInformer(informerFactory inf.SharedInformerFactor }, Type: TypeService, ips: ips, - macs: []string{}, }, nil }); err != nil { return fmt.Errorf("can't set services transform: %w", err) } - if err := services.AddIndexers(commonIndexers); err != nil { + indexers := cache.Indexers{IndexIP: ipIndexer} + if err := services.AddIndexers(indexers); err != nil { return fmt.Errorf("can't add %s indexer to Pods informer: %w", IndexIP, err) } @@ -380,7 +424,7 @@ func (k *Informers) initReplicaSetInformer(informerFactory metadatainformer.Shar return nil } -func (k *Informers) InitFromConfig(cfg api.NetworkTransformKubeConfig) error { +func (k *Informers) InitFromConfig(cfg api.NetworkTransformKubeConfig, opMetrics *operational.Metrics) error { // Initialization variables k.stopChan = make(chan struct{}) k.mdStopChan = make(chan struct{}) @@ -400,11 +444,13 @@ func (k *Informers) InitFromConfig(cfg api.NetworkTransformKubeConfig) error { return err } - cnis := cfg.ManagedCNI - if cnis == nil { - cnis = []string{api.OVN, api.Multus} + k.managedCNI = cfg.ManagedCNI + if k.managedCNI == nil { + k.managedCNI = []string{api.OVN} } - err = k.initInformers(kubeClient, metaKubeClient, cnis) + k.secondaryNetworks = cfg.SecondaryNetworks + k.indexerHitMetric = opMetrics.CreateIndexerHitCounter() + err = k.initInformers(kubeClient, metaKubeClient) if err != nil { return err } @@ -412,14 +458,14 @@ func (k *Informers) InitFromConfig(cfg api.NetworkTransformKubeConfig) error { return nil } -func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface, managedCNI []string) error { +func (k *Informers) initInformers(client kubernetes.Interface, metaClient metadata.Interface) error { informerFactory := inf.NewSharedInformerFactory(client, syncTime) metadataInformerFactory := metadatainformer.NewSharedInformerFactory(metaClient, syncTime) - err := k.initNodeInformer(informerFactory, managedCNI) + err := k.initNodeInformer(informerFactory) if err != nil { return err } - err = k.initPodInformer(informerFactory, managedCNI) + err = k.initPodInformer(informerFactory) if err != nil { return err } diff --git a/pkg/pipeline/transform/kubernetes/informers/informers_test.go b/pkg/pipeline/transform/kubernetes/informers/informers_test.go index 09a6c2d84..0de4ba5d6 100644 --- a/pkg/pipeline/transform/kubernetes/informers/informers_test.go +++ b/pkg/pipeline/transform/kubernetes/informers/informers_test.go @@ -20,15 +20,19 @@ package informers import ( "testing" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/cni" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestGetInfo(t *testing.T) { - kubeData := Informers{} + metrics := operational.NewMetrics(&config.MetricsSettings{}) + kubeData := Informers{indexerHitMetric: metrics.CreateIndexerHitCounter()} pidx, hidx, sidx, ridx := SetupIndexerMocks(&kubeData) - pidx.MockPod("1.2.3.4", "AA:BB:CC:DD:EE:FF", "pod1", "podNamespace", "10.0.0.1", nil) - pidx.MockPod("1.2.3.5", "", "pod2", "podNamespace", "10.0.0.1", &Owner{Name: "rs1", Type: "ReplicaSet"}) + pidx.MockPod("1.2.3.4", "AA:BB:CC:DD:EE:FF", "eth0", "pod1", "podNamespace", "10.0.0.1", nil) + pidx.MockPod("1.2.3.5", "", "", "pod2", "podNamespace", "10.0.0.1", &Owner{Name: "rs1", Type: "ReplicaSet"}) pidx.FallbackNotFound() ridx.MockReplicaSet("rs1", "podNamespace", Owner{Name: "dep1", Type: "Deployment"}) ridx.FallbackNotFound() @@ -38,7 +42,7 @@ func TestGetInfo(t *testing.T) { hidx.FallbackNotFound() // Test get orphan pod - info, err := kubeData.GetInfo("1.2.3.4", "") + info, err := kubeData.GetInfo(nil, "1.2.3.4") require.NoError(t, err) pod1 := Info{ Type: "Pod", @@ -46,24 +50,26 @@ func TestGetInfo(t *testing.T) { Name: "pod1", Namespace: "podNamespace", }, - HostName: "node1", - HostIP: "10.0.0.1", - Owner: Owner{Name: "pod1", Type: "Pod"}, - ips: []string{"1.2.3.4"}, - macs: []string{"AA:BB:CC:DD:EE:FF"}, + HostName: "node1", + HostIP: "10.0.0.1", + Owner: Owner{Name: "pod1", Type: "Pod"}, + NetworkName: "primary", + ips: []string{"1.2.3.4"}, + secondaryNetKeys: []string{"~~AA:BB:CC:DD:EE:FF"}, } - require.Equal(t, *info, pod1) + require.Equal(t, pod1, *info) // Test get same pod by mac - info, err = kubeData.GetInfo("", "AA:BB:CC:DD:EE:FF") + info, err = kubeData.GetInfo([]cni.SecondaryNetKey{{NetworkName: "custom-network", Key: "~~AA:BB:CC:DD:EE:FF"}}, "") require.NoError(t, err) - require.Equal(t, *info, pod1) + pod1.NetworkName = "custom-network" + require.Equal(t, pod1, *info) // Test get pod owned - info, err = kubeData.GetInfo("1.2.3.5", "") + info, err = kubeData.GetInfo(nil, "1.2.3.5") require.NoError(t, err) - require.Equal(t, *info, Info{ + require.Equal(t, Info{ Type: "Pod", ObjectMeta: metav1.ObjectMeta{ Name: "pod2", @@ -73,44 +79,45 @@ func TestGetInfo(t *testing.T) { Name: "rs1", }}, }, - HostName: "node1", - HostIP: "10.0.0.1", - Owner: Owner{Name: "dep1", Type: "Deployment"}, - ips: []string{"1.2.3.5"}, - macs: []string{}, - }) + HostName: "node1", + HostIP: "10.0.0.1", + Owner: Owner{Name: "dep1", Type: "Deployment"}, + NetworkName: "primary", + ips: []string{"1.2.3.5"}, + secondaryNetKeys: []string{}, + }, *info) // Test get node - info, err = kubeData.GetInfo("10.0.0.1", "") + info, err = kubeData.GetInfo(nil, "10.0.0.1") require.NoError(t, err) - require.Equal(t, *info, Info{ + require.Equal(t, Info{ Type: "Node", ObjectMeta: metav1.ObjectMeta{ Name: "node1", }, - Owner: Owner{Name: "node1", Type: "Node"}, - ips: []string{"10.0.0.1"}, - macs: []string{}, - }) + Owner: Owner{Name: "node1", Type: "Node"}, + NetworkName: "primary", + ips: []string{"10.0.0.1"}, + }, *info) // Test get service - info, err = kubeData.GetInfo("1.2.3.100", "") + info, err = kubeData.GetInfo(nil, "1.2.3.100") require.NoError(t, err) - require.Equal(t, *info, Info{ + require.Equal(t, Info{ Type: "Service", ObjectMeta: metav1.ObjectMeta{ Name: "svc1", Namespace: "svcNamespace", }, - Owner: Owner{Name: "svc1", Type: "Service"}, - ips: []string{"1.2.3.100"}, - macs: []string{}, - }) + Owner: Owner{Name: "svc1", Type: "Service"}, + NetworkName: "primary", + ips: []string{"1.2.3.100"}, + }, *info) // Test no match - info, err = kubeData.GetInfo("1.2.3.200", "") + info, err = kubeData.GetInfo(nil, "1.2.3.200") require.NotNil(t, err) require.Nil(t, info) } diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index e8ec0c2c5..b79e6759c 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -26,6 +26,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/location" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/netdb" @@ -166,7 +167,7 @@ func (n *Network) applySubnetLabel(strIP string) string { // NewTransformNetwork create a new transform // //nolint:cyclop -func NewTransformNetwork(params config.StageParam) (Transformer, error) { +func NewTransformNetwork(params config.StageParam, opMetrics *operational.Metrics) (Transformer, error) { var needToInitLocationDB = false var needToInitKubeData = false var needToInitNetworkServices = false @@ -205,7 +206,7 @@ func NewTransformNetwork(params config.StageParam) (Transformer, error) { } if needToInitKubeData { - err := kubernetes.InitFromConfig(jsonNetworkTransform.KubeConfig) + err := kubernetes.InitFromConfig(jsonNetworkTransform.KubeConfig, opMetrics) if err != nil { return nil, err } diff --git a/pkg/pipeline/transform/transform_network_test.go b/pkg/pipeline/transform/transform_network_test.go index 20e2bc700..4738dc9bc 100644 --- a/pkg/pipeline/transform/transform_network_test.go +++ b/pkg/pipeline/transform/transform_network_test.go @@ -211,7 +211,7 @@ func InitNewTransformNetwork(t *testing.T, configFile string) Transformer { v, cfg := test.InitConfig(t, configFile) require.NotNil(t, v) config := cfg.Parameters[0] - newTransform, err := NewTransformNetwork(config) + newTransform, err := NewTransformNetwork(config, nil) require.NoError(t, err) return newTransform } @@ -246,7 +246,7 @@ func Test_Categorize(t *testing.T) { }, } - tr, err := NewTransformNetwork(cfg) + tr, err := NewTransformNetwork(cfg, nil) require.NoError(t, err) output, ok := tr.Transform(entry) @@ -280,7 +280,7 @@ func Test_ReinterpretDirection(t *testing.T) { }, } - tr, err := NewTransformNetwork(cfg) + tr, err := NewTransformNetwork(cfg, nil) require.NoError(t, err) output, ok := tr.Transform(config.GenericMap{ @@ -430,7 +430,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { }, }, }, - }) + }, nil) require.Contains(t, err.Error(), "missing ReporterIPField") // Missing src field @@ -448,7 +448,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { }, }, }, - }) + }, nil) require.Contains(t, err.Error(), "missing SrcHostField") // Missing dst field @@ -466,7 +466,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { }, }, }, - }) + }, nil) require.Contains(t, err.Error(), "missing DstHostField") // Missing flow direction field @@ -484,7 +484,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { }, }, }, - }) + }, nil) require.Contains(t, err.Error(), "missing FlowDirectionField") // Missing if direction field does not trigger an error (this field will just not be populated) @@ -502,7 +502,7 @@ func Test_ValidateReinterpretDirection(t *testing.T) { }, }, }, - }) + }, nil) require.NoError(t, err) // Test transformation when IfDirection is missing diff --git a/pkg/test/e2e/pipline/flp-config.yaml b/pkg/test/e2e/pipline/flp-config.yaml index 8ea125330..166e1cfeb 100644 --- a/pkg/test/e2e/pipline/flp-config.yaml +++ b/pkg/test/e2e/pipline/flp-config.yaml @@ -69,7 +69,7 @@ data: subnet_mask: /16 - type: add_kubernetes kubernetes: - input: srcIP + ipField: srcIP output: srcK8S labels_prefix: srcK8S_labels - type: add_location