Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1500 : Refactoring of transform network API #580

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/flowlogs-pipeline/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestPipelineConfigSetup(t *testing.T) {

js := `{
"PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]",
"Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"input\":\"SrcAddr\",\"output\":\"SrcK8S\",\"type\":\"add_kubernetes\"},{\"input\":\"DstAddr\",\"output\":\"DstK8S\",\"type\":\"add_kubernetes\"},{\"input\":\"DstPort\",\"output\":\"Service\",\"parameters\":\"Proto\",\"type\":\"add_service\"},{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"parameters\":\"/16\",\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]",
"Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"kubernetes\":{\"input\":\"SrcAddr\",\"output\":\"SrcK8S\"},\"type\":\"add_kubernetes\"},{\"kubernetes\":{\"input\":\"DstAddr\",\"output\":\"DstK8S\"},\"type\":\"add_kubernetes\"},{\"add_service\":{\"input\":\"DstPort\",\"output\":\"Service\",\"protocol\":\"Proto\"},\"type\":\"add_service\"},{\"add_subnet\":{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"subnet_mask\":\"/16\"},\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]",
"Health": {
"Port": "8080"
},
Expand Down
49 changes: 38 additions & 11 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,50 @@ func TransformNetworkOperationName(operation string) string {
}

type NetworkTransformRule struct {
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformNetworkOperationEnum" doc:"one of the following:"`
Parameters string `yaml:"parameters,omitempty" json:"parameters,omitempty" doc:"parameters specific to type"`
Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"`
KubernetesInfra *K8sInfraRule `yaml:"kubernetes_infra,omitempty" json:"kubernetes_infra,omitempty" doc:"Kubernetes infra rule specific configuration"`
Kubernetes *K8sRule `yaml:"kubernetes,omitempty" json:"kubernetes,omitempty" doc:"Kubernetes rule specific configuration"`
Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformNetworkOperationEnum" doc:"one of the following:"`
KubernetesInfra *K8sInfraRule `yaml:"kubernetes_infra,omitempty" json:"kubernetes_infra,omitempty" doc:"Kubernetes infra rule configuration"`
Kubernetes *K8sRule `yaml:"kubernetes,omitempty" json:"kubernetes,omitempty" doc:"Kubernetes rule configuration"`
AddSubnet *NetworkAddSubnetRule `yaml:"add_subnet,omitempty" json:"add_subnet,omitempty" doc:"Add subnet rule configuration"`
AddLocation *NetworkGenericRule `yaml:"add_location,omitempty" json:"add_location,omitempty" doc:"Add location rule configuration"`
AddIPCategory *NetworkGenericRule `yaml:"add_ip_category,omitempty" json:"add_ip_category,omitempty" doc:"Add ip category rule configuration"`
AddService *NetworkAddServiceRule `yaml:"add_service,omitempty" json:"add_service,omitempty" doc:"Add service rule configuration"`
Comment on lines +67 to +72
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we often have same type of rules applied in a row; on both source & destination for example.

Would it make sense to allow arrays on the objects above ?

Then you will be able to call something like:

api.NetworkTransformRules{{
    Type: api.AddKubernetesRuleType,
    Kubernetes: []api.K8sRule{
      api.K8sRule{
          Input:  "SrcAddr",
          Output: "SrcK8S",
      }, 
      api.K8sRule{
          Input:  "DstAddr",
          Output: "DstK8S",
      },
    },
}}})

We could even get rid of the type by just providing objects 🤔

Copy link
Contributor Author

@OlivierCazade OlivierCazade Feb 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could remove the type field but this would change consistency with the rest of the API that is using it.

About allowing an array, this mean we would have two nested level of rules instead of having one flat array of rules. This add complexity and I am not sure to see a value in doing it.

}

type K8sInfraRule struct {
Inputs []string `yaml:"inputs,omitempty" json:"inputs,omitempty" doc:"entry inputs fields"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
InfraPrefix string `yaml:"infra_prefixes,omitempty" json:"infra_prefixes,omitempty" doc:"Namespace prefixes that will be tagged as infra"`
Inputs []string `yaml:"inputs,omitempty" json:"inputs,omitempty" doc:"entry inputs fields"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
InfraPrefixes []string `yaml:"infra_prefixes,omitempty" json:"infra_prefixes,omitempty" doc:"Namespace prefixes that will be tagged as infra"`
InfraRefs []K8sReference `yaml:"infra_refs,omitempty" json:"infra_refs,omitempty" doc:"Additional object references to be tagged as infra"`
}

type K8sReference struct {
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"name of the object"`
Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty" doc:"namespace of the object"`
}

type K8sRule struct {
AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"If true the rule will add the zone"`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"`
LabelsPrefix string `yaml:"labels_prefix,omitempty" json:"labels_prefix,omitempty" doc:"labels prefix to use to copy input lables, if empty labels will not be copied"`
AddZone bool `yaml:"add_zone,omitempty" json:"add_zone,omitempty" doc:"If true the rule will add the zone"`
}

type NetworkGenericRule struct {
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
}

type NetworkAddSubnetRule struct {
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
SubnetMask string `yaml:"subnet_mask,omitempty" json:"subnet_mask,omitempty" doc:"subnet mask field"`
}

type NetworkAddServiceRule struct {
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
Protocol string `yaml:"protocol,omitempty" json:"protocol,omitempty" doc:"entry protocol field"`
}

type NetworkTransformDirectionInfo struct {
Expand Down
30 changes: 18 additions & 12 deletions pkg/confgen/confgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ func Test_RunShortConfGen(t *testing.T) {
// Expects transform network
require.Len(t, out.Parameters[1].Transform.Network.Rules, 1)
require.Equal(t, api.NetworkTransformRule{
Input: "testInput",
Output: "testOutput",
Type: "add_service",
Parameters: "proto",
Type: "add_service",
AddService: &api.NetworkAddServiceRule{
Input: "testInput",
Output: "testOutput",
Protocol: "proto",
},
}, out.Parameters[1].Transform.Network.Rules[0])

// Expects aggregates
Expand Down Expand Up @@ -212,10 +214,12 @@ func Test_RunConfGenNoAgg(t *testing.T) {
// Expects transform network
require.Len(t, out.Parameters[1].Transform.Network.Rules, 1)
require.Equal(t, api.NetworkTransformRule{
Input: "testInput",
Output: "testOutput",
Type: "add_service",
Parameters: "proto",
Type: "add_service",
AddService: &api.NetworkAddServiceRule{
Input: "testInput",
Output: "testOutput",
Protocol: "proto",
},
}, out.Parameters[1].Transform.Network.Rules[0])

// Expects prom encode
Expand Down Expand Up @@ -299,10 +303,12 @@ func Test_RunLongConfGen(t *testing.T) {
// Expects transform network
require.Len(t, out.Parameters[2].Transform.Network.Rules, 1)
require.Equal(t, api.NetworkTransformRule{
Input: "testInput",
Output: "testOutput",
Type: "add_service",
Parameters: "proto",
Type: "add_service",
AddService: &api.NetworkAddServiceRule{
Input: "testInput",
Output: "testOutput",
Protocol: "proto",
},
}, out.Parameters[2].Transform.Network.Rules[0])

// Expects aggregates
Expand Down
29 changes: 15 additions & 14 deletions pkg/confgen/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,26 @@ func (cg *ConfGen) dedupe() {
cg.aggregates.Rules = dedupeAggregateDefinitions(cg.aggregates.Rules)
}

type void struct{}

var voidMember void

func dedupeNetworkTransformRules(rules api.NetworkTransformRules) api.NetworkTransformRules {
// There are no built-in sets in go
//https://stackoverflow.com/a/34020023/2749989
uniqueSet := make(map[api.NetworkTransformRule]void)
var dedpueSlice []api.NetworkTransformRule
var dedupeSlice []api.NetworkTransformRule
for i, rule := range rules {
if _, exists := uniqueSet[rule]; exists {
// duplicate rule
log.Debugf("Remove duplicate NetworkTransformRule %v at index %v", rule, i)
if containsNetworkTransformRule(dedupeSlice, rule) {
// duplicate aggregateDefinition
log.Debugf("Remove duplicate transformation rule %v at index %v", rule, i)
continue
}
uniqueSet[rule] = voidMember
dedpueSlice = append(dedpueSlice, rule)
dedupeSlice = append(dedupeSlice, rule)
}
return dedpueSlice
return dedupeSlice
}

func containsNetworkTransformRule(slice []api.NetworkTransformRule, rule api.NetworkTransformRule) bool {
for _, item := range slice {
if reflect.DeepEqual(item, rule) {
return true
}
}
return false
}

// dedupeAggregateDefinitions is inefficient because we can't use a map to look for duplicates.
Expand Down
14 changes: 7 additions & 7 deletions pkg/confgen/dedup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import (

func Test_dedupeNetworkTransformRules(t *testing.T) {
slice := api.NetworkTransformRules{
api.NetworkTransformRule{Input: "i1", Output: "o1"},
api.NetworkTransformRule{Input: "i2", Output: "o2"},
api.NetworkTransformRule{Input: "i3", Output: "o3"},
api.NetworkTransformRule{Input: "i2", Output: "o2"},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i1", Output: "o1"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i3", Output: "o3"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}},
}
expected := api.NetworkTransformRules{
api.NetworkTransformRule{Input: "i1", Output: "o1"},
api.NetworkTransformRule{Input: "i2", Output: "o2"},
api.NetworkTransformRule{Input: "i3", Output: "o3"},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i1", Output: "o1"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i2", Output: "o2"}},
api.NetworkTransformRule{Type: "add_kubernetes", Kubernetes: &api.K8sRule{Input: "i3", Output: "o3"}},
}
actual := dedupeNetworkTransformRules(slice)

Expand Down
36 changes: 22 additions & 14 deletions pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ import (
func TestLokiPipeline(t *testing.T) {
pl := NewCollectorPipeline("ingest", api.IngestCollector{HostName: "127.0.0.1", Port: 9999})
pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{
Type: api.AddKubernetesRuleType,
Input: "SrcAddr",
Output: "SrcK8S",
Type: api.AddKubernetesRuleType,
Kubernetes: &api.K8sRule{
Input: "SrcAddr",
Output: "SrcK8S",
},
}, {
Type: api.AddKubernetesRuleType,
Input: "DstAddr",
Output: "DstK8S",
Type: api.AddKubernetesRuleType,
Kubernetes: &api.K8sRule{
Input: "DstAddr",
Output: "DstK8S",
},
}}})
pl = pl.WriteLoki("loki", api.WriteLoki{URL: "http://loki:3100/"})
stages := pl.GetStages()
Expand All @@ -54,7 +58,7 @@ func TestLokiPipeline(t *testing.T) {

b, err = json.Marshal(params[1])
require.NoError(t, err)
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"rules":[{"input":"SrcAddr","output":"SrcK8S","type":"add_kubernetes"},{"input":"DstAddr","output":"DstK8S","type":"add_kubernetes"}]}}}`, string(b))
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b))

b, err = json.Marshal(params[2])
require.NoError(t, err)
Expand Down Expand Up @@ -199,13 +203,17 @@ func TestForkPipeline(t *testing.T) {
func TestIPFIXPipeline(t *testing.T) {
pl := NewCollectorPipeline("ingest", api.IngestCollector{HostName: "127.0.0.1", Port: 9999})
pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{
Type: api.AddKubernetesRuleType,
Input: "SrcAddr",
Output: "SrcK8S",
Type: api.AddKubernetesRuleType,
Kubernetes: &api.K8sRule{
Input: "SrcAddr",
Output: "SrcK8S",
},
}, {
Type: api.AddKubernetesRuleType,
Input: "DstAddr",
Output: "DstK8S",
Type: api.AddKubernetesRuleType,
Kubernetes: &api.K8sRule{
Input: "DstAddr",
Output: "DstK8S",
},
}}})
pl = pl.WriteIpfix("ipfix", api.WriteIpfix{
TargetHost: "ipfix-receiver-test",
Expand All @@ -229,7 +237,7 @@ func TestIPFIXPipeline(t *testing.T) {

b, err = json.Marshal(params[1])
require.NoError(t, err)
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"rules":[{"input":"SrcAddr","output":"SrcK8S","type":"add_kubernetes"},{"input":"DstAddr","output":"DstK8S","type":"add_kubernetes"}]}}}`, string(b))
require.JSONEq(t, `{"name":"enrich","transform":{"type":"network","network":{"directionInfo":{},"rules":[{"kubernetes":{"input":"SrcAddr","output":"SrcK8S"},"type":"add_kubernetes"},{"kubernetes":{"input":"DstAddr","output":"DstK8S"},"type":"add_kubernetes"}]}}}`, string(b))

b, err = json.Marshal(params[2])
require.NoError(t, err)
Expand Down
50 changes: 23 additions & 27 deletions pkg/pipeline/transform/kubernetes/enrich.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"fmt"
"strings"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
Expand All @@ -20,7 +21,7 @@ func InitFromConfig(kubeConfigPath string) error {
return informers.InitFromConfig(kubeConfigPath)
}

func Enrich(outputEntry config.GenericMap, rule api.NetworkTransformRule) {
func Enrich(outputEntry config.GenericMap, rule api.K8sRule) {
kubeInfo, err := informers.GetInfo(fmt.Sprintf("%s", outputEntry[rule.Input]))
if err != nil {
logrus.WithError(err).Tracef("can't find kubernetes info for IP %v", outputEntry[rule.Input])
Expand All @@ -36,9 +37,9 @@ func Enrich(outputEntry config.GenericMap, rule api.NetworkTransformRule) {
outputEntry[rule.Output+"_Type"] = kubeInfo.Type
outputEntry[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name
outputEntry[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type
if rule.Parameters != "" {
if rule.LabelsPrefix != "" {
for labelKey, labelValue := range kubeInfo.Labels {
outputEntry[rule.Parameters+"_"+labelKey] = labelValue
outputEntry[rule.LabelsPrefix+"_"+labelKey] = labelValue
}
}
if kubeInfo.HostIP != "" {
Expand Down Expand Up @@ -70,9 +71,9 @@ func Enrich(outputEntry config.GenericMap, rule api.NetworkTransformRule) {
outputEntry[rule.Output+"k8s.type"] = kubeInfo.Type
outputEntry[rule.Output+"k8s.owner.name"] = kubeInfo.Owner.Name
outputEntry[rule.Output+"k8s.owner.type"] = kubeInfo.Owner.Type
if rule.Parameters != "" {
if rule.LabelsPrefix != "" {
for labelKey, labelValue := range kubeInfo.Labels {
outputEntry[rule.Parameters+"."+labelKey] = labelValue
outputEntry[rule.LabelsPrefix+"."+labelKey] = labelValue
}
}
if kubeInfo.HostIP != "" {
Expand All @@ -87,8 +88,8 @@ func Enrich(outputEntry config.GenericMap, rule api.NetworkTransformRule) {

const nodeZoneLabelName = "topology.kubernetes.io/zone"

func fillInK8sZone(outputEntry config.GenericMap, rule api.NetworkTransformRule, kubeInfo inf.Info, zonePrefix string) {
if rule.Kubernetes == nil || !rule.Kubernetes.AddZone {
func fillInK8sZone(outputEntry config.GenericMap, rule api.K8sRule, kubeInfo inf.Info, zonePrefix string) {
if !rule.AddZone {
//Nothing to do
return
}
Expand Down Expand Up @@ -119,39 +120,34 @@ func fillInK8sZone(outputEntry config.GenericMap, rule api.NetworkTransformRule,
}
}

func EnrichLayer(outputEntry config.GenericMap, rule api.NetworkTransformRule) {
if rule.KubernetesInfra == nil {
logrus.Error("transformation rule: Missing Kubernetes Infra configuration ")
return
}
outputEntry[rule.KubernetesInfra.Output] = "infra"
for _, input := range rule.KubernetesInfra.Inputs {
if objectIsApp(fmt.Sprintf("%s", outputEntry[input]), rule.KubernetesInfra.InfraPrefix) {
outputEntry[rule.KubernetesInfra.Output] = "app"
func EnrichLayer(outputEntry config.GenericMap, rule api.K8sInfraRule) {
outputEntry[rule.Output] = "infra"
for _, input := range rule.Inputs {
if objectIsApp(fmt.Sprintf("%s", outputEntry[input]), rule) {
outputEntry[rule.Output] = "app"
return
}
}
}

const openshiftNamespacePrefix = "openshift-"
const openshiftPrefixLen = len(openshiftNamespacePrefix)

func objectIsApp(addr string, additionalInfraPrefix string) bool {
func objectIsApp(addr string, rule api.K8sInfraRule) bool {
obj, err := informers.GetInfo(addr)
if err != nil {
logrus.WithError(err).Tracef("can't find kubernetes info for IP %s", addr)
return false
}
nsLen := len(obj.Namespace)
additionalPrefixLen := len(additionalInfraPrefix)
if nsLen == 0 {
if len(obj.Namespace) == 0 {
return false
}
if nsLen >= openshiftPrefixLen && obj.Namespace[:openshiftPrefixLen] == openshiftNamespacePrefix {
return false
for _, prefix := range rule.InfraPrefixes {
if strings.HasPrefix(obj.Namespace, prefix) {
return false
}
}
if nsLen >= additionalPrefixLen && obj.Namespace[:additionalPrefixLen] == additionalInfraPrefix {
return false
for _, ref := range rule.InfraRefs {
if obj.Namespace == ref.Namespace && obj.Name == ref.Name {
return false
}
}
//Special case with openshift and kubernetes service in default namespace
if obj.Namespace == "default" && (obj.Name == "kubernetes" || obj.Name == "openshift") {
Expand Down
Loading
Loading