diff --git a/pkg/pipeline/transform/transform_filter.go b/pkg/pipeline/transform/transform_filter.go index 4fd7e7a2b..bd2783177 100644 --- a/pkg/pipeline/transform/transform_filter.go +++ b/pkg/pipeline/transform/transform_filter.go @@ -32,7 +32,8 @@ func (f *Filter) Transform(input []config.GenericMap) []config.GenericMap { log.Debugf("f = %v", f) output := make([]config.GenericMap, 0) for _, entry := range input { - outputEntry := entry + // copy input entry before transform to avoid alteration on parallel stages + outputEntry := entry.Copy() addToOutput := true for _, rule := range f.Rules { log.Debugf("rule = %v", rule) diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index ca1aed65b..68bcac3d7 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -39,9 +39,9 @@ type Network struct { api.TransformNetwork } -func (n *Network) Transform(inputEntries []config.GenericMap) []config.GenericMap { +func (n *Network) Transform(input []config.GenericMap) []config.GenericMap { outputEntries := make([]config.GenericMap, 0) - for _, entry := range inputEntries { + for _, entry := range input { outputEntry := n.TransformEntry(entry) outputEntries = append(outputEntries, outputEntry) } @@ -49,7 +49,8 @@ func (n *Network) Transform(inputEntries []config.GenericMap) []config.GenericMa } func (n *Network) TransformEntry(inputEntry config.GenericMap) config.GenericMap { - outputEntries := inputEntry + // copy input entry before transform to avoid alteration on parallel stages + outputEntry := inputEntry.Copy() for _, rule := range n.Rules { switch rule.Type { @@ -59,7 +60,7 @@ func (n *Network) TransformEntry(inputEntry config.GenericMap) config.GenericMap panic(err) } buf := &bytes.Buffer{} - err = template.Execute(buf, outputEntries) + err = template.Execute(buf, outputEntry) if err != nil { panic(err) } @@ -67,20 +68,20 @@ func (n *Network) TransformEntry(inputEntry config.GenericMap) config.GenericMap isNew := connection_tracking.CT.AddFlow(FlowIDFieldsAsString) if isNew { if rule.Parameters != "" { - outputEntries[rule.Output] = rule.Parameters + outputEntry[rule.Output] = rule.Parameters } else { - outputEntries[rule.Output] = true + outputEntry[rule.Output] = true } } case api.TransformNetworkOperationName("AddRegExIf"): - matched, err := regexp.MatchString(rule.Parameters, fmt.Sprintf("%s", outputEntries[rule.Input])) + matched, err := regexp.MatchString(rule.Parameters, fmt.Sprintf("%s", outputEntry[rule.Input])) if err != nil { continue } if matched { - outputEntries[rule.Output] = outputEntries[rule.Input] - outputEntries[rule.Output+"_Matched"] = true + outputEntry[rule.Output] = outputEntry[rule.Input] + outputEntry[rule.Output+"_Matched"] = true } case api.TransformNetworkOperationName("AddIf"): expressionString := fmt.Sprintf("val %s", rule.Parameters) @@ -89,73 +90,73 @@ func (n *Network) TransformEntry(inputEntry config.GenericMap) config.GenericMap log.Errorf("Can't evaluate AddIf rule: %+v expression: %v. err %v", rule, expressionString, err) continue } - result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntries[rule.Input]}) + result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntry[rule.Input]}) if evaluateErr == nil && result.(bool) { - outputEntries[rule.Output] = outputEntries[rule.Input] - outputEntries[rule.Output+"_Evaluate"] = true + outputEntry[rule.Output] = outputEntry[rule.Input] + outputEntry[rule.Output+"_Evaluate"] = true } case api.TransformNetworkOperationName("AddSubnet"): - _, ipv4Net, err := net.ParseCIDR(fmt.Sprintf("%v%s", outputEntries[rule.Input], rule.Parameters)) + _, ipv4Net, err := net.ParseCIDR(fmt.Sprintf("%v%s", outputEntry[rule.Input], rule.Parameters)) if err != nil { - log.Errorf("Can't find subnet for IP %v and prefix length %s - err %v", outputEntries[rule.Input], rule.Parameters, err) + log.Errorf("Can't find subnet for IP %v and prefix length %s - err %v", outputEntry[rule.Input], rule.Parameters, err) continue } - outputEntries[rule.Output] = ipv4Net.String() + outputEntry[rule.Output] = ipv4Net.String() case api.TransformNetworkOperationName("AddLocation"): var locationInfo *location.Info - err, locationInfo := location.GetLocation(fmt.Sprintf("%s", outputEntries[rule.Input])) + err, locationInfo := location.GetLocation(fmt.Sprintf("%s", outputEntry[rule.Input])) if err != nil { - log.Errorf("Can't find location for IP %v err %v", outputEntries[rule.Input], err) + log.Errorf("Can't find location for IP %v err %v", outputEntry[rule.Input], err) continue } - outputEntries[rule.Output+"_CountryName"] = locationInfo.CountryName - outputEntries[rule.Output+"_CountryLongName"] = locationInfo.CountryLongName - outputEntries[rule.Output+"_RegionName"] = locationInfo.RegionName - outputEntries[rule.Output+"_CityName"] = locationInfo.CityName - outputEntries[rule.Output+"_Latitude"] = locationInfo.Latitude - outputEntries[rule.Output+"_Longitude"] = locationInfo.Longitude + outputEntry[rule.Output+"_CountryName"] = locationInfo.CountryName + outputEntry[rule.Output+"_CountryLongName"] = locationInfo.CountryLongName + outputEntry[rule.Output+"_RegionName"] = locationInfo.RegionName + outputEntry[rule.Output+"_CityName"] = locationInfo.CityName + outputEntry[rule.Output+"_Latitude"] = locationInfo.Latitude + outputEntry[rule.Output+"_Longitude"] = locationInfo.Longitude case api.TransformNetworkOperationName("AddService"): - protocol := fmt.Sprintf("%v", outputEntries[rule.Parameters]) - portNumber, err := strconv.Atoi(fmt.Sprintf("%v", outputEntries[rule.Input])) + protocol := fmt.Sprintf("%v", outputEntry[rule.Parameters]) + portNumber, err := strconv.Atoi(fmt.Sprintf("%v", outputEntry[rule.Input])) if err != nil { - log.Errorf("Can't convert port to int: Port %v - err %v", outputEntries[rule.Input], err) + log.Errorf("Can't convert port to int: Port %v - err %v", outputEntry[rule.Input], err) continue } service := netdb.GetServByPort(portNumber, netdb.GetProtoByName(protocol)) if service == nil { protocolAsNumber, err := strconv.Atoi(fmt.Sprintf("%v", protocol)) if err != nil { - log.Infof("Can't find service name for Port %v and protocol %v - err %v", outputEntries[rule.Input], protocol, err) + log.Infof("Can't find service name for Port %v and protocol %v - err %v", outputEntry[rule.Input], protocol, err) continue } service = netdb.GetServByPort(portNumber, netdb.GetProtoByNumber(protocolAsNumber)) if service == nil { - log.Infof("Can't find service name for Port %v and protocol %v - err %v", outputEntries[rule.Input], protocol, err) + log.Infof("Can't find service name for Port %v and protocol %v - err %v", outputEntry[rule.Input], protocol, err) continue } } - outputEntries[rule.Output] = service.Name + outputEntry[rule.Output] = service.Name case api.TransformNetworkOperationName("AddKubernetes"): var kubeInfo *kubernetes.Info - kubeInfo, err := kubernetes.Data.GetInfo(fmt.Sprintf("%s", outputEntries[rule.Input])) + kubeInfo, err := kubernetes.Data.GetInfo(fmt.Sprintf("%s", outputEntry[rule.Input])) if err != nil { - log.Debugf("Can't find kubernetes info for IP %v err %v", outputEntries[rule.Input], err) + log.Debugf("Can't find kubernetes info for IP %v err %v", outputEntry[rule.Input], err) continue } - outputEntries[rule.Output+"_Namespace"] = kubeInfo.Namespace - outputEntries[rule.Output+"_Name"] = kubeInfo.Name - outputEntries[rule.Output+"_Type"] = kubeInfo.Type - outputEntries[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name - outputEntries[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type + outputEntry[rule.Output+"_Namespace"] = kubeInfo.Namespace + outputEntry[rule.Output+"_Name"] = kubeInfo.Name + outputEntry[rule.Output+"_Type"] = kubeInfo.Type + outputEntry[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name + outputEntry[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type if rule.Parameters != "" { for labelKey, labelValue := range kubeInfo.Labels { - outputEntries[rule.Parameters+"_"+labelKey] = labelValue + outputEntry[rule.Parameters+"_"+labelKey] = labelValue } } if kubeInfo.HostIP != "" { - outputEntries[rule.Output+"_HostIP"] = kubeInfo.HostIP + outputEntry[rule.Output+"_HostIP"] = kubeInfo.HostIP if kubeInfo.HostName != "" { - outputEntries[rule.Output+"_HostName"] = kubeInfo.HostName + outputEntry[rule.Output+"_HostName"] = kubeInfo.HostName } } default: @@ -163,7 +164,7 @@ func (n *Network) TransformEntry(inputEntry config.GenericMap) config.GenericMap } } - return outputEntries + return outputEntry } // NewTransformNetwork create a new transform