Skip to content

Commit

Permalink
NETOBSERV-1875: Make enrichment indexes configurable (#711)
Browse files Browse the repository at this point in the history
* NETOBSERV-1875: Make enrichment indexes configurable

For secondary interfaces, it's better to allow more configurable
indexing for pods enrichment. For some network types, pods might be identified by their MAC
address advertized in the network-status annotations, while in others it
can be by IP, by interface name, or any combination of those.

This PR makes it all configurable in that way (example):

```bash
secondaryNetworks:
- name: my-network
  index:
    mac: true
    ip: true
```

More than one network can be configured that way.

Additionally, a new metric is added to track which indexer is used for
every enrichment hit: promql:
`sum(rate(netobserv_secondary_network_indexer_hit[1m])) by (kind,
network, error)`

This metric can also report errors and unexpected multiple matches

* Add namespace label to metric, add network name enrichment

* do not return from building keys when a field is missing: move to next network
  • Loading branch information
jotak authored Sep 20, 2024
1 parent 61bd0fb commit 2ea1a7c
Show file tree
Hide file tree
Showing 22 changed files with 529 additions and 328 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ parameters:
output: dstLocation
- type: add_kubernetes
kubernetes:
input: srcIP
ipField: srcIP
output: srcK8S
```

Expand All @@ -443,7 +443,7 @@ All the geo-location fields will be named by appending `output` value
(e.g., `CountryName`, `CountryLongName`, `RegionName`, `CityName` , `Longitude` and `Latitude`)

The rule `add_kubernetes` generates new fields with kubernetes information by
matching the `input` value (`srcIP` in the example above) with kubernetes `nodes`, `pods` and `services` IPs.
matching the `ipField` value (`srcIP` in the example above) with kubernetes `nodes`, `pods` and `services` IPs.
All the kubernetes fields will be named by appending `output` value
(`srcK8S` in the example above) to the kubernetes metadata field names
(e.g., `Namespace`, `Name`, `Type`, `HostIP`, `OwnerName`, `OwnerType` )
Expand Down
2 changes: 1 addition & 1 deletion cmd/flowlogs-pipeline/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestPipelineConfigSetup(t *testing.T) {

js := `{
"PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]",
"Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"kubernetes\":{\"input\":\"SrcAddr\",\"output\":\"SrcK8S\"},\"type\":\"add_kubernetes\"},{\"kubernetes\":{\"input\":\"DstAddr\",\"output\":\"DstK8S\"},\"type\":\"add_kubernetes\"},{\"add_service\":{\"input\":\"DstPort\",\"output\":\"Service\",\"protocol\":\"Proto\"},\"type\":\"add_service\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]",
"Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"kubernetes\":{\"ipField\":\"SrcAddr\",\"output\":\"SrcK8S\"},\"type\":\"add_kubernetes\"},{\"kubernetes\":{\"ipField\":\"DstAddr\",\"output\":\"DstK8S\"},\"type\":\"add_kubernetes\"},{\"add_service\":{\"input\":\"DstPort\",\"output\":\"Service\",\"protocol\":\"Proto\"},\"type\":\"add_service\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]",
"Health": {
"Port": "8080"
},
Expand Down
2 changes: 1 addition & 1 deletion contrib/kubernetes/flowlogs-pipeline.conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ parameters:
subnet_mask: /16
- type: add_kubernetes
kubernetes:
input: srcIP
ipField: srcIP
output: srcK8S
labels_prefix: srcK8S_labels
- type: add_location
Expand Down
10 changes: 7 additions & 3 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,9 @@ Following is the supported API format for network transformations:
name: name of the object
namespace: namespace of the object
kubernetes: Kubernetes rule configuration
input: entry IP input field
mac-input: Optional entry MAC input field
ipField: entry IP input field
interfacesField: entry Interfaces input field
macField: entry MAC input field
output: entry output field
assignee: value needs to assign to output field
labels_prefix: labels prefix to use to copy input lables, if empty labels will not be copied
Expand All @@ -272,7 +273,10 @@ Following is the supported API format for network transformations:
protocol: entry protocol field
kubeConfig: global configuration related to Kubernetes (optional)
configPath: path to kubeconfig file (optional)
managedCNI: a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn, multus
secondaryNetworks: configuration for secondary networks
name: name of the secondary network, as mentioned in the annotation 'k8s.v1.cni.cncf.io/network-status'
index: fields to use for indexing, must be any combination of 'mac', 'ip', 'interface'
managedCNI: a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn
servicesFile: path to services file (optional, default: /etc/services)
protocolsFile: path to protocols file (optional, default: /etc/protocols)
subnetLabels: configure subnet and IPs custom labels
Expand Down
8 changes: 8 additions & 0 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
| **Labels** | stage |


### secondary_network_indexer_hit
| **Name** | secondary_network_indexer_hit |
|:---|:---|
| **Description** | Counter of hits per secondary network index for Kubernetes enrichment |
| **Type** | counter |
| **Labels** | kind, network, warning |


### stage_duration_ms
| **Name** | stage_duration_ms |
|:---|:---|
Expand Down
26 changes: 16 additions & 10 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ func (tn *TransformNetwork) GetServiceFiles() (string, string) {
}

const (
OVN = "ovn"
Multus = "multus"
OVN = "ovn"
)

type NetworkTransformKubeConfig struct {
ConfigPath string `yaml:"configPath,omitempty" json:"configPath,omitempty" doc:"path to kubeconfig file (optional)"`
ManagedCNI []string `yaml:"managedCNI,omitempty" json:"managedCNI,omitempty" doc:"a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn, multus"`
ConfigPath string `yaml:"configPath,omitempty" json:"configPath,omitempty" doc:"path to kubeconfig file (optional)"`
SecondaryNetworks []SecondaryNetwork `yaml:"secondaryNetworks,omitempty" json:"secondaryNetworks,omitempty" doc:"configuration for secondary networks"`
ManagedCNI []string `yaml:"managedCNI,omitempty" json:"managedCNI,omitempty" doc:"a list of CNI (network plugins) to manage, for detecting additional interfaces. Currently supported: ovn"`
}

type TransformNetworkOperationEnum string
Expand Down Expand Up @@ -84,12 +84,18 @@ type K8sReference struct {
}

type K8sRule struct {
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry IP input field"`
MacInput string `yaml:"mac-input,omitempty" json:"mac-input,omitempty" doc:"Optional entry MAC input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"`
LabelsPrefix string `yaml:"labels_prefix,omitempty" json:"labels_prefix,omitempty" doc:"labels prefix to use to copy input lables, if empty labels will not be copied"`
AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"if true the rule will add the zone"`
IPField string `yaml:"ipField,omitempty" json:"ipField,omitempty" doc:"entry IP input field"`
InterfacesField string `yaml:"interfacesField,omitempty" json:"interfacesField,omitempty" doc:"entry Interfaces input field"`
MACField string `yaml:"macField,omitempty" json:"macField,omitempty" doc:"entry MAC input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"`
LabelsPrefix string `yaml:"labels_prefix,omitempty" json:"labels_prefix,omitempty" doc:"labels prefix to use to copy input lables, if empty labels will not be copied"`
AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"if true the rule will add the zone"`
}

type SecondaryNetwork struct {
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"name of the secondary network, as mentioned in the annotation 'k8s.v1.cni.cncf.io/network-status'"`
Index map[string]any `yaml:"index,omitempty" json:"index,omitempty" doc:"fields to use for indexing, must be any combination of 'mac', 'ip', 'interface'"`
}

type NetworkGenericRule struct {
Expand Down
14 changes: 7 additions & 7 deletions pkg/confgen/dedup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import (

func Test_dedupeNetworkTransformRules(t *testing.T) {
slice := api.NetworkTransformRules{
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i1", Output: "o1"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i3", Output: "o3"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i1", Output: "o1"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i2", Output: "o2"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i3", Output: "o3"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i2", Output: "o2"}},
}
expected := api.NetworkTransformRules{
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i1", Output: "o1"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i3", Output: "o3"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i1", Output: "o1"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i2", Output: "o2"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{IPField: "i3", Output: "o3"}},
}
actual := dedupeNetworkTransformRules(slice)

Expand Down
20 changes: 10 additions & 10 deletions pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ func TestLokiPipeline(t *testing.T) {
pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{
Type: api.NetworkAddKubernetes,
Kubernetes: &api.K8sRule{
Input: "SrcAddr",
Output: "SrcK8S",
IPField: "SrcAddr",
Output: "SrcK8S",
},
}, {
Type: api.NetworkAddKubernetes,
Kubernetes: &api.K8sRule{
Input: "DstAddr",
Output: "DstK8S",
IPField: "DstAddr",
Output: "DstK8S",
},
}}})
pl = pl.WriteLoki("loki", api.WriteLoki{URL: "http://loki:3100/"})
Expand All @@ -58,7 +58,7 @@ func TestLokiPipeline(t *testing.T) {

b, err = json.Marshal(params[1])
require.NoError(t, err)
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b))
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"ipField":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"ipField":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b))

b, err = json.Marshal(params[2])
require.NoError(t, err)
Expand Down Expand Up @@ -206,14 +206,14 @@ func TestIPFIXPipeline(t *testing.T) {
pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{
Type: api.NetworkAddKubernetes,
Kubernetes: &api.K8sRule{
Input: "SrcAddr",
Output: "SrcK8S",
IPField: "SrcAddr",
Output: "SrcK8S",
},
}, {
Type: api.NetworkAddKubernetes,
Kubernetes: &api.K8sRule{
Input: "DstAddr",
Output: "DstK8S",
IPField: "DstAddr",
Output: "DstK8S",
},
}}})
pl = pl.WriteIpfix("ipfix", api.WriteIpfix{
Expand All @@ -238,7 +238,7 @@ func TestIPFIXPipeline(t *testing.T) {

b, err = json.Marshal(params[1])
require.NoError(t, err)
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b))
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"kubeConfig":{},"rules":[{"kubernetes":{"ipField":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"ipField":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b))

b, err = json.Marshal(params[2])
require.NoError(t, err)
Expand Down
13 changes: 13 additions & 0 deletions pkg/operational/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ var (
TypeHistogram,
"stage",
)
indexerHit = DefineMetric(
"secondary_network_indexer_hit",
"Counter of hits per secondary network index for Kubernetes enrichment",
TypeCounter,
"kind",
"namespace",
"network",
"warning",
)
)

func (def *MetricDefinition) mapLabels(labels []string) prometheus.Labels {
Expand Down Expand Up @@ -241,6 +250,10 @@ func (o *Metrics) GetOrCreateStageDurationHisto() *prometheus.HistogramVec {
return o.stageDurationHisto
}

func (o *Metrics) CreateIndexerHitCounter() *prometheus.CounterVec {
return o.NewCounterVec(&indexerHit)
}

func GetDocumentation() string {
doc := ""
sort.Slice(allMetrics, func(i, j int) bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func getWriter(opMetrics *operational.Metrics, params config.StageParam) (write.
return writer, err
}

func getTransformer(_ *operational.Metrics, params config.StageParam) (transform.Transformer, error) {
func getTransformer(opMetrics *operational.Metrics, params config.StageParam) (transform.Transformer, error) {
var transformer transform.Transformer
var err error
switch params.Transform.Type {
Expand All @@ -443,7 +443,7 @@ func getTransformer(_ *operational.Metrics, params config.StageParam) (transform
case api.FilterType:
transformer, err = transform.NewTransformFilter(params)
case api.NetworkType:
transformer, err = transform.NewTransformNetwork(params)
transformer, err = transform.NewTransformNetwork(params, opMetrics)
case api.NoneType:
transformer, err = transform.NewTransformNone()
default:
Expand Down
1 change: 0 additions & 1 deletion pkg/pipeline/transform/kubernetes/cni/cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ import (

type Plugin interface {
GetNodeIPs(node *v1.Node) []string
GetPodIPsAndMACs(pod *v1.Pod) ([]string, []string)
}
Loading

0 comments on commit 2ea1a7c

Please sign in to comment.