Skip to content

Commit

Permalink
Copy maps before write
Browse files Browse the repository at this point in the history
Fixes #234
  • Loading branch information
jotak committed Jun 23, 2022
1 parent 0e670ba commit e99ca07
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 41 deletions.
2 changes: 1 addition & 1 deletion pkg/pipeline/transform/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (f *Filter) Transform(input []config.GenericMap) []config.GenericMap {
log.Debugf("f = %v", f)
output := make([]config.GenericMap, 0)
for _, entry := range input {
outputEntry := entry
outputEntry := entry.Copy()
addToOutput := true
for _, rule := range f.Rules {
log.Debugf("rule = %v", rule)
Expand Down
80 changes: 40 additions & 40 deletions pkg/pipeline/transform/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ type Network struct {
api.TransformNetwork
}

func (n *Network) Transform(inputEntries []config.GenericMap) []config.GenericMap {
func (n *Network) Transform(input []config.GenericMap) []config.GenericMap {
outputEntries := make([]config.GenericMap, 0)
for _, entry := range inputEntries {
for _, entry := range input {
outputEntry := n.TransformEntry(entry)
outputEntries = append(outputEntries, outputEntry)
}
return outputEntries
}

func (n *Network) TransformEntry(inputEntry config.GenericMap) config.GenericMap {
outputEntries := inputEntry
outputEntry := inputEntry.Copy()

for _, rule := range n.Rules {
switch rule.Type {
Expand All @@ -59,28 +59,28 @@ func (n *Network) TransformEntry(inputEntry config.GenericMap) config.GenericMap
panic(err)
}
buf := &bytes.Buffer{}
err = template.Execute(buf, outputEntries)
err = template.Execute(buf, outputEntry)
if err != nil {
panic(err)
}
FlowIDFieldsAsString := buf.String()
isNew := connection_tracking.CT.AddFlow(FlowIDFieldsAsString)
if isNew {
if rule.Parameters != "" {
outputEntries[rule.Output] = rule.Parameters
outputEntry[rule.Output] = rule.Parameters
} else {
outputEntries[rule.Output] = true
outputEntry[rule.Output] = true
}
}

case api.TransformNetworkOperationName("AddRegExIf"):
matched, err := regexp.MatchString(rule.Parameters, fmt.Sprintf("%s", outputEntries[rule.Input]))
matched, err := regexp.MatchString(rule.Parameters, fmt.Sprintf("%s", outputEntry[rule.Input]))
if err != nil {
continue
}
if matched {
outputEntries[rule.Output] = outputEntries[rule.Input]
outputEntries[rule.Output+"_Matched"] = true
outputEntry[rule.Output] = outputEntry[rule.Input]
outputEntry[rule.Output+"_Matched"] = true
}
case api.TransformNetworkOperationName("AddIf"):
expressionString := fmt.Sprintf("val %s", rule.Parameters)
Expand All @@ -89,81 +89,81 @@ func (n *Network) TransformEntry(inputEntry config.GenericMap) config.GenericMap
log.Errorf("Can't evaluate AddIf rule: %+v expression: %v. err %v", rule, expressionString, err)
continue
}
result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntries[rule.Input]})
result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntry[rule.Input]})
if evaluateErr == nil && result.(bool) {
outputEntries[rule.Output] = outputEntries[rule.Input]
outputEntries[rule.Output+"_Evaluate"] = true
outputEntry[rule.Output] = outputEntry[rule.Input]
outputEntry[rule.Output+"_Evaluate"] = true
}
case api.TransformNetworkOperationName("AddSubnet"):
_, ipv4Net, err := net.ParseCIDR(fmt.Sprintf("%v%s", outputEntries[rule.Input], rule.Parameters))
_, ipv4Net, err := net.ParseCIDR(fmt.Sprintf("%v%s", outputEntry[rule.Input], rule.Parameters))
if err != nil {
log.Errorf("Can't find subnet for IP %v and prefix length %s - err %v", outputEntries[rule.Input], rule.Parameters, err)
log.Errorf("Can't find subnet for IP %v and prefix length %s - err %v", outputEntry[rule.Input], rule.Parameters, err)
continue
}
outputEntries[rule.Output] = ipv4Net.String()
outputEntry[rule.Output] = ipv4Net.String()
case api.TransformNetworkOperationName("AddLocation"):
var locationInfo *location.Info
err, locationInfo := location.GetLocation(fmt.Sprintf("%s", outputEntries[rule.Input]))
err, locationInfo := location.GetLocation(fmt.Sprintf("%s", outputEntry[rule.Input]))
if err != nil {
log.Errorf("Can't find location for IP %v err %v", outputEntries[rule.Input], err)
log.Errorf("Can't find location for IP %v err %v", outputEntry[rule.Input], err)
continue
}
outputEntries[rule.Output+"_CountryName"] = locationInfo.CountryName
outputEntries[rule.Output+"_CountryLongName"] = locationInfo.CountryLongName
outputEntries[rule.Output+"_RegionName"] = locationInfo.RegionName
outputEntries[rule.Output+"_CityName"] = locationInfo.CityName
outputEntries[rule.Output+"_Latitude"] = locationInfo.Latitude
outputEntries[rule.Output+"_Longitude"] = locationInfo.Longitude
outputEntry[rule.Output+"_CountryName"] = locationInfo.CountryName
outputEntry[rule.Output+"_CountryLongName"] = locationInfo.CountryLongName
outputEntry[rule.Output+"_RegionName"] = locationInfo.RegionName
outputEntry[rule.Output+"_CityName"] = locationInfo.CityName
outputEntry[rule.Output+"_Latitude"] = locationInfo.Latitude
outputEntry[rule.Output+"_Longitude"] = locationInfo.Longitude
case api.TransformNetworkOperationName("AddService"):
protocol := fmt.Sprintf("%v", outputEntries[rule.Parameters])
portNumber, err := strconv.Atoi(fmt.Sprintf("%v", outputEntries[rule.Input]))
protocol := fmt.Sprintf("%v", outputEntry[rule.Parameters])
portNumber, err := strconv.Atoi(fmt.Sprintf("%v", outputEntry[rule.Input]))
if err != nil {
log.Errorf("Can't convert port to int: Port %v - err %v", outputEntries[rule.Input], err)
log.Errorf("Can't convert port to int: Port %v - err %v", outputEntry[rule.Input], err)
continue
}
service := netdb.GetServByPort(portNumber, netdb.GetProtoByName(protocol))
if service == nil {
protocolAsNumber, err := strconv.Atoi(fmt.Sprintf("%v", protocol))
if err != nil {
log.Infof("Can't find service name for Port %v and protocol %v - err %v", outputEntries[rule.Input], protocol, err)
log.Infof("Can't find service name for Port %v and protocol %v - err %v", outputEntry[rule.Input], protocol, err)
continue
}
service = netdb.GetServByPort(portNumber, netdb.GetProtoByNumber(protocolAsNumber))
if service == nil {
log.Infof("Can't find service name for Port %v and protocol %v - err %v", outputEntries[rule.Input], protocol, err)
log.Infof("Can't find service name for Port %v and protocol %v - err %v", outputEntry[rule.Input], protocol, err)
continue
}
}
outputEntries[rule.Output] = service.Name
outputEntry[rule.Output] = service.Name
case api.TransformNetworkOperationName("AddKubernetes"):
var kubeInfo *kubernetes.Info
kubeInfo, err := kubernetes.Data.GetInfo(fmt.Sprintf("%s", outputEntries[rule.Input]))
kubeInfo, err := kubernetes.Data.GetInfo(fmt.Sprintf("%s", outputEntry[rule.Input]))
if err != nil {
log.Debugf("Can't find kubernetes info for IP %v err %v", outputEntries[rule.Input], err)
log.Debugf("Can't find kubernetes info for IP %v err %v", outputEntry[rule.Input], err)
continue
}
outputEntries[rule.Output+"_Namespace"] = kubeInfo.Namespace
outputEntries[rule.Output+"_Name"] = kubeInfo.Name
outputEntries[rule.Output+"_Type"] = kubeInfo.Type
outputEntries[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name
outputEntries[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type
outputEntry[rule.Output+"_Namespace"] = kubeInfo.Namespace
outputEntry[rule.Output+"_Name"] = kubeInfo.Name
outputEntry[rule.Output+"_Type"] = kubeInfo.Type
outputEntry[rule.Output+"_OwnerName"] = kubeInfo.Owner.Name
outputEntry[rule.Output+"_OwnerType"] = kubeInfo.Owner.Type
if rule.Parameters != "" {
for labelKey, labelValue := range kubeInfo.Labels {
outputEntries[rule.Parameters+"_"+labelKey] = labelValue
outputEntry[rule.Parameters+"_"+labelKey] = labelValue
}
}
if kubeInfo.HostIP != "" {
outputEntries[rule.Output+"_HostIP"] = kubeInfo.HostIP
outputEntry[rule.Output+"_HostIP"] = kubeInfo.HostIP
if kubeInfo.HostName != "" {
outputEntries[rule.Output+"_HostName"] = kubeInfo.HostName
outputEntry[rule.Output+"_HostName"] = kubeInfo.HostName
}
}
default:
log.Panicf("unknown type %s for transform.Network rule: %v", rule.Type, rule)
}
}

return outputEntries
return outputEntry
}

// NewTransformNetwork create a new transform
Expand Down

0 comments on commit e99ca07

Please sign in to comment.