diff --git a/Makefile b/Makefile index 0758671f8..3b77a3ee4 100644 --- a/Makefile +++ b/Makefile @@ -121,6 +121,7 @@ build: validate_go lint build_code docs ## Build flowlogs-pipeline executable an docs: FORCE ## Update flowlogs-pipeline documentation @./hack/update-docs.sh @go run cmd/apitodoc/main.go > docs/api.md + @./hack/update-enum-docs.sh @go run cmd/operationalmetricstodoc/main.go > docs/operational-metrics.md .PHONY: clean diff --git a/README.md b/README.md index ab6373193..39d9a1f57 100644 --- a/README.md +++ b/README.md @@ -374,9 +374,11 @@ pipeline: - type: filter filter: rules: - - input: SrcPort - type: remove_entry_if_exists + - type: remove_entry_if_exists + removeEntryIfExists: + input: SrcPort ``` + Using `remove_entry_if_doesnt_exist` in the rule reverses the logic and will not remove the above example entry Using `remove_field` in the rule `type` instead, results in outputting the entry after removal of only the `SrcPort` key and value diff --git a/cmd/apitodoc/main.go b/cmd/apitodoc/main.go index fb2e0002b..40134e35d 100644 --- a/cmd/apitodoc/main.go +++ b/cmd/apitodoc/main.go @@ -19,6 +19,7 @@ package main import ( "bytes" + "errors" "fmt" "io" "reflect" @@ -29,9 +30,12 @@ import ( func iterate(output io.Writer, data interface{}, indent int) { newIndent := indent + 1 - dataType := reflect.ValueOf(data).Kind() - // DEBUG code: dataTypeName := reflect.ValueOf(data).Type().String() d := reflect.ValueOf(data) + dataType := d.Kind() + dataTypeName, err := getTypeName(d) + if err != nil { + dataTypeName = "(unknown)" + } if dataType == reflect.Slice || dataType == reflect.Map { // DEBUG code: fmt.Fprintf(output, "%s %s <-- %s \n",strings.Repeat(" ",4*indent),dataTypeName,dataType ) zeroElement := reflect.Zero(reflect.ValueOf(data).Type().Elem()).Interface() @@ -43,17 +47,8 @@ func iterate(output io.Writer, data interface{}, indent int) { val := reflect.Indirect(reflect.ValueOf(data)) fieldName := val.Type().Field(i).Tag.Get(api.TagYaml) fieldName = strings.ReplaceAll(fieldName, ",omitempty", "") - fieldDocTag := val.Type().Field(i).Tag.Get(api.TagDoc) - fieldEnumTag := val.Type().Field(i).Tag.Get(api.TagEnum) - if fieldEnumTag != "" { - enumType := api.GetEnumReflectionTypeByFieldName(fieldEnumTag) - zeroElement := reflect.Zero(enumType).Interface() - fmt.Fprintf(output, "%s %s: (enum) %s\n", strings.Repeat(" ", 4*newIndent), fieldName, fieldDocTag) - iterate(output, zeroElement, newIndent) - continue - } if fieldDocTag != "" { if fieldDocTag[0:1] == "#" { fmt.Fprintf(output, "\n%s\n", fieldDocTag) @@ -75,9 +70,22 @@ func iterate(output io.Writer, data interface{}, indent int) { // Since we only "converted" Ptr to Struct and the actual output is done in the next iteration, we call // iterate() with the same `indent` as the current level iterate(output, zeroElement, indent) + } else if strings.HasPrefix(dataTypeName, "api.") && strings.HasSuffix(dataTypeName, "Enum") { + // set placeholder for enum + fmt.Fprintf(output, "placeholder @%s:%d@\n", strings.TrimPrefix(dataTypeName, "api."), 4*newIndent) } } +func getTypeName(d reflect.Value) (name string, err error) { + defer func() { + if recover() != nil { + err = errors.New("unknown type name") + } + }() + name = d.Type().String() + return +} + func main() { output := new(bytes.Buffer) iterate(output, api.API{}, 0) diff --git a/docs/api.md b/docs/api.md index 26ae6e93c..456e9e8e7 100644 --- a/docs/api.md +++ b/docs/api.md @@ -13,20 +13,20 @@ Following is the supported API format for prometheus encode: metrics: list of prometheus metric definitions, each includes: name: the metric name type: (enum) one of the following: - gauge: single numerical value that can arbitrarily go up and down - counter: monotonically increasing counter whose value can only increase - histogram: counts samples in configurable buckets - agg_histogram: counts samples in configurable buckets, pre-aggregated via an Aggregate stage + gauge: single numerical value that can arbitrarily go up and down + counter: monotonically increasing counter whose value can only increase + histogram: counts samples in configurable buckets + agg_histogram: counts samples in configurable buckets, pre-aggregated via an Aggregate stage filters: a list of criteria to filter entries by key: the key to match and filter by value: the value to match and filter by - type: (enum) the type of filter match: equal (default), not_equal, presence, absence, match_regex or not_match_regex - equal: match exactly the provided filter value - not_equal: the value must be different from the provided filter - presence: filter key must be present (filter value is ignored) - absence: filter key must be absent (filter value is ignored) - match_regex: match filter value as a regular expression - not_match_regex: the filter value must not match the provided regular expression + type: the type of filter match (enum) + equal: match exactly the provided filter value + not_equal: the value must be different from the provided filter + presence: filter key must be present (filter value is ignored) + absence: filter key must be absent (filter value is ignored) + match_regex: match filter value as a regular expression + not_match_regex: the filter value must not match the provided regular expression valueKey: entry key from which to resolve metric value labels: labels to be associated with the metric buckets: histogram buckets @@ -43,11 +43,11 @@ Following is the supported API format for kafka encode: address: address of kafka server topic: kafka topic to write to balancer: (enum) one of the following: - roundRobin: RoundRobin balancer - leastBytes: LeastBytes balancer - hash: Hash balancer - crc32: Crc32 balancer - murmur2: Murmur2 balancer + roundRobin: RoundRobin balancer + leastBytes: LeastBytes balancer + hash: Hash balancer + crc32: Crc32 balancer + murmur2: Murmur2 balancer writeTimeout: timeout (in seconds) for write operation performed by the Writer readTimeout: timeout (in seconds) for read operation performed by the Writer batchBytes: limit the maximum size of a request in bytes before being sent to a partition @@ -58,6 +58,9 @@ Following is the supported API format for kafka encode: userCertPath: path to the user certificate userKeyPath: path to the user private key sasl: SASL configuration (optional) + type: SASL type + plain: Plain SASL + scramSHA512: SCRAM/SHA512 SASL clientIDPath: path to the client ID / SASL username clientSecretPath: path to the client secret / SASL password @@ -99,8 +102,8 @@ Following is the supported API format for the kafka ingest: batchReadTimeout: how often (in milliseconds) to process input decoder: decoder to use (E.g. json or protobuf) type: (enum) one of the following: - json: JSON decoder - protobuf: Protobuf decoder + json: JSON decoder + protobuf: Protobuf decoder batchMaxLen: the number of accumulated flows before being forwarded for processing pullQueueCapacity: the capacity of the queue use to store pulled flows pullMaxBytes: the maximum number of bytes being pulled from kafka @@ -111,6 +114,9 @@ Following is the supported API format for the kafka ingest: userCertPath: path to the user certificate userKeyPath: path to the user private key sasl: SASL configuration (optional) + type: SASL type + plain: Plain SASL + scramSHA512: SCRAM/SHA512 SASL clientIDPath: path to the client ID / SASL username clientSecretPath: path to the client secret / SASL password @@ -134,8 +140,8 @@ Following is the supported API format for generic transformations:
  generic:
          policy: (enum) key replacement policy; may be one of the following:
-             preserve_original_keys: adds new keys in addition to existing keys (default)
-             replace_keys: removes all old keys and uses only the new keys
+            preserve_original_keys: adds new keys in addition to existing keys (default)
+            replace_keys: removes all old keys and uses only the new keys
          rules: list of transform rules, each includes:
                  input: entry input field
                  output: entry output field
@@ -147,23 +153,57 @@ Following is the supported API format for filter transformations:
 
  filter:
          rules: list of filter rules, each includes:
-                 input: entry input field
-                 output: entry output field
                  type: (enum) one of the following:
-                     remove_field: removes the field from the entry
-                     remove_entry_if_exists: removes the entry if the field exists
-                     remove_entry_if_doesnt_exist: removes the entry if the field does not exist
-                     remove_entry_if_equal: removes the entry if the field value equals specified value
-                     remove_entry_if_not_equal: removes the entry if the field value does not equal specified value
-                     add_field: adds (input) field to the entry; overrides previous value if present (key=input, value=value)
-                     add_field_if_doesnt_exist: adds a field to the entry if the field does not exist
-                     add_field_if: add output field set to assignee if input field satisfies criteria from parameters field
-                     add_regex_if: add output field if input field satisfies regex pattern from parameters field
-                     add_label: add (input) field to list of labels with value taken from Value field (key=input, value=value)
-                     add_label_if: add output field to list of labels with value taken from assignee field if input field satisfies criteria from parameters field
-                 value: specified value of input field:
-                 parameters: parameters specific to type
-                 assignee: value needs to assign to output field
+                    remove_field: removes the field from the entry
+                    remove_entry_if_exists: removes the entry if the field exists
+                    remove_entry_if_doesnt_exist: removes the entry if the field does not exist
+                    remove_entry_if_equal: removes the entry if the field value equals specified value
+                    remove_entry_if_not_equal: removes the entry if the field value does not equal specified value
+                    add_field: adds (input) field to the entry; overrides previous value if present (key=input, value=value)
+                    add_field_if_doesnt_exist: adds a field to the entry if the field does not exist
+                    add_field_if: add output field set to assignee if input field satisfies criteria from parameters field
+                    add_regex_if: add output field if input field satisfies regex pattern from parameters field
+                    add_label: add (input) field to list of labels with value taken from Value field (key=input, value=value)
+                    add_label_if: add output field to list of labels with value taken from assignee field if input field satisfies criteria from parameters field
+                 removeField: configuration for remove_field rule
+                     input: entry input field
+                     value: specified value of input field:
+                 removeEntryIfExists: configuration for remove_entry_if_exists rule
+                     input: entry input field
+                     value: specified value of input field:
+                 removeEntryIfDoesntExist: configuration for remove_entry_if_doesnt_exist rule
+                     input: entry input field
+                     value: specified value of input field:
+                 removeEntryIfEqual: configuration for remove_entry_if_equal rule
+                     input: entry input field
+                     value: specified value of input field:
+                 removeEntryIfNotEqual: configuration for remove_entry_if_not_equal rule
+                     input: entry input field
+                     value: specified value of input field:
+                 addField: configuration for add_field rule
+                     input: entry input field
+                     value: specified value of input field:
+                 addFieldIfDoesntExist: configuration for add_field_if_doesnt_exist rule
+                     input: entry input field
+                     value: specified value of input field:
+                 addFieldIf: configuration for add_field_if rule
+                     input: entry input field
+                     output: entry output field
+                     parameters: parameters specific to type
+                     assignee: value needs to assign to output field
+                 addRegexIf: configuration for add_regex_if rule
+                     input: entry input field
+                     output: entry output field
+                     parameters: parameters specific to type
+                     assignee: value needs to assign to output field
+                 addLabel: configuration for add_label rule
+                     input: entry input field
+                     value: specified value of input field:
+                 addLabelIf: configuration for add_label_if rule
+                     input: entry input field
+                     output: entry output field
+                     parameters: parameters specific to type
+                     assignee: value needs to assign to output field
 
## Transform Network API Following is the supported API format for network transformations: @@ -171,24 +211,41 @@ Following is the supported API format for network transformations:
  network:
          rules: list of transform rules, each includes:
-                 input: entry input field
-                 output: entry output field
                  type: (enum) one of the following:
-                     add_subnet: add output subnet field from input field and prefix length from parameters field
-                     add_location: add output location fields from input
-                     add_service: add output network service field from input port and parameters protocol field
-                     add_kubernetes: add output kubernetes fields from input
-                     add_kubernetes_infra: add output kubernetes isInfra field from input
-                     reinterpret_direction: reinterpret flow direction at the node level (instead of net interface), to ease the deduplication process
-                     add_ip_category: categorize IPs based on known subnets configuration
-                 parameters: parameters specific to type
-                 assignee: value needs to assign to output field
-                 kubernetes_infra: Kubernetes infra rule specific configuration
+                    add_subnet: add output subnet field from input field and prefix length from parameters field
+                    add_location: add output location fields from input
+                    add_service: add output network service field from input port and parameters protocol field
+                    add_kubernetes: add output kubernetes fields from input
+                    add_kubernetes_infra: add output kubernetes isInfra field from input
+                    reinterpret_direction: reinterpret flow direction at the node level (instead of net interface), to ease the deduplication process
+                    add_ip_category: categorize IPs based on known subnets configuration
+                 kubernetes_infra: Kubernetes infra rule configuration
                      inputs: entry inputs fields
                      output: entry output field
                      infra_prefixes: Namespace prefixes that will be tagged as infra
-                 kubernetes: Kubernetes rule specific configuration
+                     infra_refs: Additional object references to be tagged as infra
+                             name: name of the object
+                             namespace: namespace of the object
+                 kubernetes: Kubernetes rule configuration
+                     input: entry input field
+                     output: entry output field
+                     assignee: value needs to assign to output field
+                     labels_prefix: labels prefix to use to copy input lables, if empty labels will not be copied
                      add_zone: If true the rule will add the zone
+                 add_subnet: Add subnet rule configuration
+                     input: entry input field
+                     output: entry output field
+                     subnet_mask: subnet mask field
+                 add_location: Add location rule configuration
+                     input: entry input field
+                     output: entry output field
+                 add_ip_category: Add ip category rule configuration
+                     input: entry input field
+                     output: entry output field
+                 add_service: Add service rule configuration
+                     input: entry input field
+                     output: entry output field
+                     protocol: entry protocol field
          kubeConfigPath: path to kubeconfig file (optional)
          servicesFile: path to services file (optional, default: /etc/services)
          protocolsFile: path to protocols file (optional, default: /etc/protocols)
@@ -256,19 +313,19 @@ Following is the supported API format for specifying connection tracking:
                  fieldGroupARef: field group name of endpoint A
                  fieldGroupBRef: field group name of endpoint B
          outputRecordTypes: (enum) output record types to emit
-             newConnection: New connection
-             endConnection: End connection
-             heartbeat: Heartbeat
-             flowLog: Flow log
+                newConnection: New connection
+                endConnection: End connection
+                heartbeat: Heartbeat
+                flowLog: Flow log
          outputFields: list of output fields
                  name: output field name
                  operation: (enum) aggregate operation on the field value
-                     sum: sum
-                     count: count
-                     min: min
-                     max: max
-                     first: first
-                     last: last
+                    sum: sum
+                    count: count
+                    min: min
+                    max: max
+                    first: first
+                    last: last
                  splitAB: When true, 2 output fields will be created. One for A->B and one for B->A flows.
                  input: The input field to base the operation on. When omitted, 'name' is used
                  reportMissing: When true, missing input will produce MissingFieldError metric and error logs
@@ -293,13 +350,13 @@ Following is the supported API format for specifying metrics time-based filters:
                  indexKey: internal field to index TopK. Deprecated, use indexKeys instead
                  indexKeys: internal fields to index TopK
                  operationType: (enum) sum, min, max, avg, count, last or diff
-                     sum: set output field to sum of parameters fields in the time window
-                     avg: set output field to average of parameters fields in the time window
-                     min: set output field to minimum of parameters fields in the time window
-                     max: set output field to maximum of parameters fields in the time window
-                     count: set output field to number of flows registered in the time window
-                     last: set output field to last of parameters fields in the time window
-                     diff: set output field to the difference of the first and last parameters fields in the time window
+                    sum: set output field to sum of parameters fields in the time window
+                    avg: set output field to average of parameters fields in the time window
+                    min: set output field to minimum of parameters fields in the time window
+                    max: set output field to maximum of parameters fields in the time window
+                    count: set output field to number of flows registered in the time window
+                    last: set output field to last of parameters fields in the time window
+                    diff: set output field to the difference of the first and last parameters fields in the time window
                  operationKey: internal field on which to perform the operation
                  topK: number of highest incidence to report (default - report all)
                  reversed: report lowest incidence instead of highest (default - false)
@@ -340,20 +397,20 @@ Following is the supported API format for writing metrics to an OpenTelemetry co
          metrics: list of metric definitions, each includes:
                  name: the metric name
                  type: (enum) one of the following:
-                     gauge: single numerical value that can arbitrarily go up and down
-                     counter: monotonically increasing counter whose value can only increase
-                     histogram: counts samples in configurable buckets
-                     agg_histogram: counts samples in configurable buckets, pre-aggregated via an Aggregate stage
+                    gauge: single numerical value that can arbitrarily go up and down
+                    counter: monotonically increasing counter whose value can only increase
+                    histogram: counts samples in configurable buckets
+                    agg_histogram: counts samples in configurable buckets, pre-aggregated via an Aggregate stage
                  filters: a list of criteria to filter entries by
                          key: the key to match and filter by
                          value: the value to match and filter by
-                         type: (enum) the type of filter match: equal (default), not_equal, presence, absence, match_regex or not_match_regex
-                             equal: match exactly the provided filter value
-                             not_equal: the value must be different from the provided filter
-                             presence: filter key must be present (filter value is ignored)
-                             absence: filter key must be absent (filter value is ignored)
-                             match_regex: match filter value as a regular expression
-                             not_match_regex: the filter value must not match the provided regular expression
+                         type: the type of filter match (enum)
+                            equal: match exactly the provided filter value
+                            not_equal: the value must be different from the provided filter
+                            presence: filter key must be present (filter value is ignored)
+                            absence: filter key must be absent (filter value is ignored)
+                            match_regex: match filter value as a regular expression
+                            not_match_regex: the filter value must not match the provided regular expression
                  valueKey: entry key from which to resolve metric value
                  labels: labels to be associated with the metric
                  buckets: histogram buckets
@@ -377,4 +434,4 @@ Following is the supported API format for writing traces to an OpenTelemetry col
                  userKeyPath: path to the user private key
              headers: headers to add to messages (optional)
          spanSplitter: separate span for each prefix listed
-
\ No newline at end of file +
diff --git a/hack/update-enum-docs.sh b/hack/update-enum-docs.sh new file mode 100755 index 000000000..e1b9544fb --- /dev/null +++ b/hack/update-enum-docs.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash + +set -eou pipefail + +md_file="./docs/api.md" + +grep "placeholder @" "$md_file" | while read line; do + type=$(echo "$line" | sed -r 's/^placeholder @(.*):(.*)@/\1/') + indent=$(echo "$line" | sed -r 's/^placeholder @(.*):(.*)@/\2/') + repl=$(go doc -all -short -C pkg/api $type | grep "${type} =" | sed -r "s/^.*\"(.*)\".*\/\/ (.*)/$(printf "%${indent}s")\1: \2/") + awk -v inject="${repl}" "/placeholder @$type:$indent@/{print inject;next}1" $md_file > "$md_file.tmp" + rm $md_file + mv "$md_file.tmp" $md_file +done diff --git a/pkg/api/api.go b/pkg/api/api.go index 93535c27a..711474bbe 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -18,47 +18,33 @@ package api const ( - FileType = "file" - FileLoopType = "file_loop" - FileChunksType = "file_chunks" - SyntheticType = "synthetic" - CollectorType = "collector" - StdinType = "stdin" - GRPCType = "grpc" - FakeType = "fake" - KafkaType = "kafka" - S3Type = "s3" - OtlpLogsType = "otlplogs" - OtlpMetricsType = "otlpmetrics" - OtlpTracesType = "otlptraces" - StdoutType = "stdout" - LokiType = "loki" - IpfixType = "ipfix" - AggregateType = "aggregates" - TimebasedType = "timebased" - PromType = "prom" - GenericType = "generic" - NetworkType = "network" - FilterType = "filter" - ConnTrackType = "conntrack" - NoneType = "none" - AddRegExIfRuleType = "add_regex_if" - AddSubnetRuleType = "add_subnet" - AddLocationRuleType = "add_location" - AddServiceRuleType = "add_service" - AddKubernetesRuleType = "add_kubernetes" - AddKubernetesInfraRuleType = "add_kubernetes_infra" - ReinterpretDirectionRuleType = "reinterpret_direction" - PromFilterEqual = "equal" - PromFilterNotEqual = "not_equal" - PromFilterPresence = "presence" - PromFilterAbsence = "absence" - PromFilterRegex = "match_regex" - PromFilterNotRegex = "not_match_regex" + FileType = "file" + FileLoopType = "file_loop" + FileChunksType = "file_chunks" + SyntheticType = "synthetic" + CollectorType = "collector" + StdinType = "stdin" + GRPCType = "grpc" + FakeType = "fake" + KafkaType = "kafka" + S3Type = "s3" + OtlpLogsType = "otlplogs" + OtlpMetricsType = "otlpmetrics" + OtlpTracesType = "otlptraces" + StdoutType = "stdout" + LokiType = "loki" + IpfixType = "ipfix" + AggregateType = "aggregates" + TimebasedType = "timebased" + PromType = "prom" + GenericType = "generic" + NetworkType = "network" + FilterType = "filter" + ConnTrackType = "conntrack" + NoneType = "none" TagYaml = "yaml" TagDoc = "doc" - TagEnum = "enum" ) // Note: items beginning with doc: "## title" are top level items that get divided into sections inside api.md. diff --git a/pkg/api/conntrack.go b/pkg/api/conntrack.go index 3edd7c5e6..cd7089580 100644 --- a/pkg/api/conntrack.go +++ b/pkg/api/conntrack.go @@ -28,24 +28,23 @@ const ( ) type ConnTrack struct { - KeyDefinition KeyDefinition `yaml:"keyDefinition,omitempty" json:"keyDefinition,omitempty" doc:"fields that are used to identify the connection"` - OutputRecordTypes []string `yaml:"outputRecordTypes,omitempty" json:"outputRecordTypes,omitempty" enum:"ConnTrackOutputRecordTypeEnum" doc:"output record types to emit"` - OutputFields []OutputField `yaml:"outputFields,omitempty" json:"outputFields,omitempty" doc:"list of output fields"` - Scheduling []ConnTrackSchedulingGroup `yaml:"scheduling,omitempty" json:"scheduling,omitempty" doc:"list of timeouts and intervals to apply per selector"` - MaxConnectionsTracked int `yaml:"maxConnectionsTracked,omitempty" json:"maxConnectionsTracked,omitempty" doc:"maximum number of connections we keep in our cache (0 means no limit)"` - TCPFlags ConnTrackTCPFlags `yaml:"tcpFlags,omitempty" json:"tcpFlags,omitempty" doc:"settings for handling TCP flags"` + KeyDefinition KeyDefinition `yaml:"keyDefinition,omitempty" json:"keyDefinition,omitempty" doc:"fields that are used to identify the connection"` + OutputRecordTypes []ConnTrackOutputRecordTypeEnum `yaml:"outputRecordTypes,omitempty" json:"outputRecordTypes,omitempty" doc:"(enum) output record types to emit"` + OutputFields []OutputField `yaml:"outputFields,omitempty" json:"outputFields,omitempty" doc:"list of output fields"` + Scheduling []ConnTrackSchedulingGroup `yaml:"scheduling,omitempty" json:"scheduling,omitempty" doc:"list of timeouts and intervals to apply per selector"` + MaxConnectionsTracked int `yaml:"maxConnectionsTracked,omitempty" json:"maxConnectionsTracked,omitempty" doc:"maximum number of connections we keep in our cache (0 means no limit)"` + TCPFlags ConnTrackTCPFlags `yaml:"tcpFlags,omitempty" json:"tcpFlags,omitempty" doc:"settings for handling TCP flags"` } -type ConnTrackOutputRecordTypeEnum struct { - NewConnection string `yaml:"newConnection" json:"newConnection" doc:"New connection"` - EndConnection string `yaml:"endConnection" json:"endConnection" doc:"End connection"` - Heartbeat string `yaml:"heartbeat" json:"heartbeat" doc:"Heartbeat"` - FlowLog string `yaml:"flowLog" json:"flowLog" doc:"Flow log"` -} +type ConnTrackOutputRecordTypeEnum string -func ConnTrackOutputRecordTypeName(operation string) string { - return GetEnumName(ConnTrackOutputRecordTypeEnum{}, operation) -} +const ( + // For doc generation, enum definitions must match format `Constant Type = "value" // doc` + ConnTrackNewConnection ConnTrackOutputRecordTypeEnum = "newConnection" // New connection + ConnTrackEndConnection ConnTrackOutputRecordTypeEnum = "endConnection" // End connection + ConnTrackHeartbeat ConnTrackOutputRecordTypeEnum = "heartbeat" // Heartbeat + ConnTrackFlowLog ConnTrackOutputRecordTypeEnum = "flowLog" // Flow log +) type KeyDefinition struct { FieldGroups []FieldGroup `yaml:"fieldGroups,omitempty" json:"fieldGroups,omitempty" doc:"list of field group definitions"` @@ -70,21 +69,24 @@ type ConnTrackHash struct { } type OutputField struct { - Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"output field name"` - Operation string `yaml:"operation,omitempty" json:"operation,omitempty" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"` - SplitAB bool `yaml:"splitAB,omitempty" json:"splitAB,omitempty" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."` - Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"The input field to base the operation on. When omitted, 'name' is used"` - ReportMissing bool `yaml:"reportMissing,omitempty" json:"reportMissing,omitempty" doc:"When true, missing input will produce MissingFieldError metric and error logs"` + Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"output field name"` + Operation ConnTrackOperationEnum `yaml:"operation,omitempty" json:"operation,omitempty" doc:"(enum) aggregate operation on the field value"` + SplitAB bool `yaml:"splitAB,omitempty" json:"splitAB,omitempty" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."` + Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"The input field to base the operation on. When omitted, 'name' is used"` + ReportMissing bool `yaml:"reportMissing,omitempty" json:"reportMissing,omitempty" doc:"When true, missing input will produce MissingFieldError metric and error logs"` } -type ConnTrackOperationEnum struct { - Sum string `yaml:"sum" json:"sum" doc:"sum"` - Count string `yaml:"count" json:"count" doc:"count"` - Min string `yaml:"min" json:"min" doc:"min"` - Max string `yaml:"max" json:"max" doc:"max"` - First string `yaml:"first" json:"first" doc:"first"` - Last string `yaml:"last" json:"last" doc:"last"` -} +type ConnTrackOperationEnum string + +const ( + // For doc generation, enum definitions must match format `Constant Type = "value" // doc` + ConnTrackSum ConnTrackOperationEnum = "sum" // sum + ConnTrackCount ConnTrackOperationEnum = "count" // count + ConnTrackMin ConnTrackOperationEnum = "min" // min + ConnTrackMax ConnTrackOperationEnum = "max" // max + ConnTrackFirst ConnTrackOperationEnum = "first" // first + ConnTrackLast ConnTrackOperationEnum = "last" // last +) type ConnTrackSchedulingGroup struct { Selector map[string]interface{} `yaml:"selector,omitempty" json:"selector,omitempty" doc:"key-value map to match against connection fields to apply this scheduling"` @@ -93,10 +95,6 @@ type ConnTrackSchedulingGroup struct { HeartbeatInterval Duration `yaml:"heartbeatInterval,omitempty" json:"heartbeatInterval,omitempty" doc:"duration of time to wait between heartbeat reports of a connection"` } -func ConnTrackOperationName(operation string) string { - return GetEnumName(ConnTrackOperationEnum{}, operation) -} - type ConnTrackTCPFlags struct { FieldName string `yaml:"fieldName,omitempty" json:"fieldName,omitempty" doc:"name of the field containing TCP flags"` DetectEndConnection bool `yaml:"detectEndConnection,omitempty" json:"detectEndConnection,omitempty" doc:"detect end connections by FIN flag"` @@ -255,14 +253,14 @@ func addToSet(set map[string]struct{}, item string) bool { return true } -func isOperationValid(value string, splitAB bool) bool { +func isOperationValid(value ConnTrackOperationEnum, splitAB bool) bool { valid := true switch value { - case ConnTrackOperationName("Sum"): - case ConnTrackOperationName("Count"): - case ConnTrackOperationName("Min"): - case ConnTrackOperationName("Max"): - case ConnTrackOperationName("First"), ConnTrackOperationName("Last"): + case ConnTrackSum: + case ConnTrackCount: + case ConnTrackMin: + case ConnTrackMax: + case ConnTrackFirst, ConnTrackLast: valid = !splitAB default: valid = false @@ -270,13 +268,13 @@ func isOperationValid(value string, splitAB bool) bool { return valid } -func isOutputRecordTypeValid(value string) bool { +func isOutputRecordTypeValid(value ConnTrackOutputRecordTypeEnum) bool { valid := true switch value { - case ConnTrackOutputRecordTypeName("NewConnection"): - case ConnTrackOutputRecordTypeName("EndConnection"): - case ConnTrackOutputRecordTypeName("Heartbeat"): - case ConnTrackOutputRecordTypeName("FlowLog"): + case ConnTrackNewConnection: + case ConnTrackEndConnection: + case ConnTrackHeartbeat: + case ConnTrackFlowLog: default: valid = false } diff --git a/pkg/api/conntrack_test.go b/pkg/api/conntrack_test.go index 1d866014b..4c84c99d8 100644 --- a/pkg/api/conntrack_test.go +++ b/pkg/api/conntrack_test.go @@ -141,7 +141,7 @@ func TestConnTrackValidate(t *testing.T) { { "Unknown output record", ConnTrack{ - OutputRecordTypes: []string{"unknown"}, + OutputRecordTypes: []ConnTrackOutputRecordTypeEnum{"unknown"}, }, conntrackInvalidError{unknownOutputRecord: true}, }, diff --git a/pkg/api/decoder.go b/pkg/api/decoder.go index cf6a2f835..8772c06e6 100644 --- a/pkg/api/decoder.go +++ b/pkg/api/decoder.go @@ -1,14 +1,13 @@ package api type Decoder struct { - Type string `yaml:"type" json:"type" enum:"DecoderEnum" doc:"one of the following:"` + Type DecoderEnum `yaml:"type" json:"type" doc:"(enum) one of the following:"` } -type DecoderEnum struct { - JSON string `yaml:"json" json:"json" doc:"JSON decoder"` - Protobuf string `yaml:"protobuf" json:"protobuf" doc:"Protobuf decoder"` -} +type DecoderEnum string -func DecoderName(decoder string) string { - return GetEnumName(DecoderEnum{}, decoder) -} +const ( + // For doc generation, enum definitions must match format `Constant Type = "value" // doc` + DecoderJSON DecoderEnum = "json" // JSON decoder + DecoderProtobuf DecoderEnum = "protobuf" // Protobuf decoder +) diff --git a/pkg/api/encode_kafka.go b/pkg/api/encode_kafka.go index f03f9faec..d136bcbed 100644 --- a/pkg/api/encode_kafka.go +++ b/pkg/api/encode_kafka.go @@ -18,25 +18,24 @@ package api type EncodeKafka struct { - Address string `yaml:"address" json:"address" doc:"address of kafka server"` - Topic string `yaml:"topic" json:"topic" doc:"kafka topic to write to"` - Balancer string `yaml:"balancer,omitempty" json:"balancer,omitempty" enum:"KafkaEncodeBalancerEnum" doc:"one of the following:"` - WriteTimeout int64 `yaml:"writeTimeout,omitempty" json:"writeTimeout,omitempty" doc:"timeout (in seconds) for write operation performed by the Writer"` - ReadTimeout int64 `yaml:"readTimeout,omitempty" json:"readTimeout,omitempty" doc:"timeout (in seconds) for read operation performed by the Writer"` - BatchBytes int64 `yaml:"batchBytes,omitempty" json:"batchBytes,omitempty" doc:"limit the maximum size of a request in bytes before being sent to a partition"` - BatchSize int `yaml:"batchSize,omitempty" json:"batchSize,omitempty" doc:"limit on how many messages will be buffered before being sent to a partition"` - TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"` - SASL *SASLConfig `yaml:"sasl" json:"sasl" doc:"SASL configuration (optional)"` + Address string `yaml:"address" json:"address" doc:"address of kafka server"` + Topic string `yaml:"topic" json:"topic" doc:"kafka topic to write to"` + Balancer KafkaEncodeBalancerEnum `yaml:"balancer,omitempty" json:"balancer,omitempty" doc:"(enum) one of the following:"` + WriteTimeout int64 `yaml:"writeTimeout,omitempty" json:"writeTimeout,omitempty" doc:"timeout (in seconds) for write operation performed by the Writer"` + ReadTimeout int64 `yaml:"readTimeout,omitempty" json:"readTimeout,omitempty" doc:"timeout (in seconds) for read operation performed by the Writer"` + BatchBytes int64 `yaml:"batchBytes,omitempty" json:"batchBytes,omitempty" doc:"limit the maximum size of a request in bytes before being sent to a partition"` + BatchSize int `yaml:"batchSize,omitempty" json:"batchSize,omitempty" doc:"limit on how many messages will be buffered before being sent to a partition"` + TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"` + SASL *SASLConfig `yaml:"sasl" json:"sasl" doc:"SASL configuration (optional)"` } -type KafkaEncodeBalancerEnum struct { - RoundRobin string `yaml:"roundRobin" json:"roundRobin" doc:"RoundRobin balancer"` - LeastBytes string `yaml:"leastBytes" json:"leastBytes" doc:"LeastBytes balancer"` - Hash string `yaml:"hash" json:"hash" doc:"Hash balancer"` - Crc32 string `yaml:"crc32" json:"crc32" doc:"Crc32 balancer"` - Murmur2 string `yaml:"murmur2" json:"murmur2" doc:"Murmur2 balancer"` -} +type KafkaEncodeBalancerEnum string -func KafkaEncodeBalancerName(operation string) string { - return GetEnumName(KafkaEncodeBalancerEnum{}, operation) -} +const ( + // For doc generation, enum definitions must match format `Constant Type = "value" // doc` + KafkaRoundRobin KafkaEncodeBalancerEnum = "roundRobin" // RoundRobin balancer + KafkaLeastBytes KafkaEncodeBalancerEnum = "leastBytes" // LeastBytes balancer + KafkaHash KafkaEncodeBalancerEnum = "hash" // Hash balancer + KafkaCrc32 KafkaEncodeBalancerEnum = "crc32" // Crc32 balancer + KafkaMurmur2 KafkaEncodeBalancerEnum = "murmur2" // Murmur2 balancer +) diff --git a/pkg/api/encode_prom.go b/pkg/api/encode_prom.go index 1e54672f9..b17628cc7 100644 --- a/pkg/api/encode_prom.go +++ b/pkg/api/encode_prom.go @@ -30,16 +30,15 @@ type PromEncode struct { MaxMetrics int `yaml:"maxMetrics,omitempty" json:"maxMetrics,omitempty" doc:"maximum number of metrics to report (default: unlimited)"` } -type MetricEncodeOperationEnum struct { - Gauge string `yaml:"gauge" json:"gauge" doc:"single numerical value that can arbitrarily go up and down"` - Counter string `yaml:"counter" json:"counter" doc:"monotonically increasing counter whose value can only increase"` - Histogram string `yaml:"histogram" json:"histogram" doc:"counts samples in configurable buckets"` - AggHistogram string `yaml:"agg_histogram" json:"agg_histogram" doc:"counts samples in configurable buckets, pre-aggregated via an Aggregate stage"` -} +type MetricEncodeOperationEnum string -func MetricEncodeOperationName(operation string) string { - return GetEnumName(MetricEncodeOperationEnum{}, operation) -} +const ( + // For doc generation, enum definitions must match format `Constant Type = "value" // doc` + MetricGauge MetricEncodeOperationEnum = "gauge" // single numerical value that can arbitrarily go up and down + MetricCounter MetricEncodeOperationEnum = "counter" // monotonically increasing counter whose value can only increase + MetricHistogram MetricEncodeOperationEnum = "histogram" // counts samples in configurable buckets + MetricAggHistogram MetricEncodeOperationEnum = "agg_histogram" // counts samples in configurable buckets, pre-aggregated via an Aggregate stage +) type PromConnectionInfo struct { Address string `yaml:"address,omitempty" json:"address,omitempty" doc:"endpoint address to expose"` @@ -48,32 +47,30 @@ type PromConnectionInfo struct { } type MetricsItem struct { - Name string `yaml:"name" json:"name" doc:"the metric name"` - Type string `yaml:"type" json:"type" enum:"MetricEncodeOperationEnum" doc:"one of the following:"` - Filters []MetricsFilter `yaml:"filters" json:"filters" doc:"a list of criteria to filter entries by"` - ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"` - Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"` - Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"` - ValueScale float64 `yaml:"valueScale,omitempty" json:"valueScale,omitempty" doc:"scale factor of the value (MetricVal := FlowVal / Scale)"` + Name string `yaml:"name" json:"name" doc:"the metric name"` + Type MetricEncodeOperationEnum `yaml:"type" json:"type" doc:"(enum) one of the following:"` + Filters []MetricsFilter `yaml:"filters" json:"filters" doc:"a list of criteria to filter entries by"` + ValueKey string `yaml:"valueKey" json:"valueKey" doc:"entry key from which to resolve metric value"` + Labels []string `yaml:"labels" json:"labels" doc:"labels to be associated with the metric"` + Buckets []float64 `yaml:"buckets" json:"buckets" doc:"histogram buckets"` + ValueScale float64 `yaml:"valueScale,omitempty" json:"valueScale,omitempty" doc:"scale factor of the value (MetricVal := FlowVal / Scale)"` } type MetricsItems []MetricsItem +type MetricFilterEnum string -type MetricsFilter struct { - Key string `yaml:"key" json:"key" doc:"the key to match and filter by"` - Value string `yaml:"value" json:"value" doc:"the value to match and filter by"` - Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"MetricEncodeFilterTypeEnum" doc:"the type of filter match: equal (default), not_equal, presence, absence, match_regex or not_match_regex"` -} +const ( + // For doc generation, enum definitions must match format `Constant Type = "value" // doc` + MetricFilterEqual MetricFilterEnum = "equal" // match exactly the provided filter value + MetricFilterNotEqual MetricFilterEnum = "not_equal" // the value must be different from the provided filter + MetricFilterPresence MetricFilterEnum = "presence" // filter key must be present (filter value is ignored) + MetricFilterAbsence MetricFilterEnum = "absence" // filter key must be absent (filter value is ignored) + MetricFilterRegex MetricFilterEnum = "match_regex" // match filter value as a regular expression + MetricFilterNotRegex MetricFilterEnum = "not_match_regex" // the filter value must not match the provided regular expression +) -type MetricEncodeFilterTypeEnum struct { - Equal string `yaml:"equal" json:"equal" doc:"match exactly the provided filter value"` - NotEqual string `yaml:"not_equal" json:"not_equal" doc:"the value must be different from the provided filter"` - Presence string `yaml:"presence" json:"presence" doc:"filter key must be present (filter value is ignored)"` - Absence string `yaml:"absence" json:"absence" doc:"filter key must be absent (filter value is ignored)"` - MatchRegex string `yaml:"match_regex" json:"match_regex" doc:"match filter value as a regular expression"` - NotMatchRegex string `yaml:"not_match_regex" json:"not_match_regex" doc:"the filter value must not match the provided regular expression"` -} - -func MetricEncodeFilterTypeName(t string) string { - return GetEnumName(MetricEncodeFilterTypeEnum{}, t) +type MetricsFilter struct { + Key string `yaml:"key" json:"key" doc:"the key to match and filter by"` + Value string `yaml:"value" json:"value" doc:"the value to match and filter by"` + Type MetricFilterEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"the type of filter match (enum)"` } diff --git a/pkg/api/enum.go b/pkg/api/enum.go deleted file mode 100644 index 12b4841d6..000000000 --- a/pkg/api/enum.go +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (C) 2022 IBM, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package api - -import ( - "log" - "reflect" -) - -type enums struct { - MetricEncodeOperationEnum MetricEncodeOperationEnum - MetricEncodeFilterTypeEnum MetricEncodeFilterTypeEnum - TransformNetworkOperationEnum TransformNetworkOperationEnum - TransformFilterOperationEnum TransformFilterOperationEnum - TransformGenericOperationEnum TransformGenericOperationEnum - KafkaEncodeBalancerEnum KafkaEncodeBalancerEnum - SASLTypeEnum SASLTypeEnum - ConnTrackOperationEnum ConnTrackOperationEnum - ConnTrackOutputRecordTypeEnum ConnTrackOutputRecordTypeEnum - DecoderEnum DecoderEnum - FilterOperationEnum FilterOperationEnum -} - -type enumNameCacheKey struct { - enum interface{} - operation string -} - -var enumNamesCache = map[enumNameCacheKey]string{} - -func init() { - populateEnumCache() -} - -func populateEnumCache() { - enumStruct := enums{} - e := reflect.ValueOf(&enumStruct).Elem() - for i := 0; i < e.NumField(); i++ { - eType := e.Type().Field(i).Type - eValue := e.Field(i).Interface() - for j := 0; j < eType.NumField(); j++ { - fName := eType.Field(j).Name - key := enumNameCacheKey{enum: eValue, operation: fName} - d := reflect.ValueOf(eValue) - field, _ := d.Type().FieldByName(fName) - tag := field.Tag.Get(TagYaml) - enumNamesCache[key] = tag - } - } -} - -// GetEnumName gets the name of an enum value from the representing enum struct based on `TagYaml` tag. -func GetEnumName(enum interface{}, operation string) string { - key := enumNameCacheKey{enum: enum, operation: operation} - cachedValue, found := enumNamesCache[key] - if found { - return cachedValue - } - log.Panicf("can't find name '%s' in enum %v", operation, enum) - return "" -} - -// GetEnumReflectionTypeByFieldName gets the enum struct `reflection Type` from the name of the struct (using fields from `enums{}` struct). -func GetEnumReflectionTypeByFieldName(enumName string) reflect.Type { - d := reflect.ValueOf(enums{}) - field, found := d.Type().FieldByName(enumName) - if !found { - log.Panicf("can't find enumName %s in enums", enumName) - return nil - } - - return field.Type -} diff --git a/pkg/api/extract_timebased.go b/pkg/api/extract_timebased.go index 23d749974..8d9605e62 100644 --- a/pkg/api/extract_timebased.go +++ b/pkg/api/extract_timebased.go @@ -17,31 +17,30 @@ package api -type FilterOperationEnum struct { - FilterOperationSum string `yaml:"sum" json:"sum" doc:"set output field to sum of parameters fields in the time window"` - FilterOperationAvg string `yaml:"avg" json:"avg" doc:"set output field to average of parameters fields in the time window"` - FilterOperationMin string `yaml:"min" json:"min" doc:"set output field to minimum of parameters fields in the time window"` - FilterOperationMax string `yaml:"max" json:"max" doc:"set output field to maximum of parameters fields in the time window"` - FilterOperationCnt string `yaml:"count" json:"count" doc:"set output field to number of flows registered in the time window"` - FilterOperationLast string `yaml:"last" json:"last" doc:"set output field to last of parameters fields in the time window"` - FilterOperationDiff string `yaml:"diff" json:"diff" doc:"set output field to the difference of the first and last parameters fields in the time window"` -} +type FilterOperationEnum string -func FilterOperationName(operation string) string { - return GetEnumName(FilterOperationEnum{}, operation) -} +const ( + // For doc generation, enum definitions must match format `Constant Type = "value" // doc` + FilterOperationSum FilterOperationEnum = "sum" // set output field to sum of parameters fields in the time window + FilterOperationAvg FilterOperationEnum = "avg" // set output field to average of parameters fields in the time window + FilterOperationMin FilterOperationEnum = "min" // set output field to minimum of parameters fields in the time window + FilterOperationMax FilterOperationEnum = "max" // set output field to maximum of parameters fields in the time window + FilterOperationCnt FilterOperationEnum = "count" // set output field to number of flows registered in the time window + FilterOperationLast FilterOperationEnum = "last" // set output field to last of parameters fields in the time window + FilterOperationDiff FilterOperationEnum = "diff" // set output field to the difference of the first and last parameters fields in the time window +) type ExtractTimebased struct { Rules []TimebasedFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"` } type TimebasedFilterRule struct { - Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"description of filter result"` - IndexKey string `yaml:"indexKey,omitempty" json:"indexKey,omitempty" doc:"internal field to index TopK. Deprecated, use indexKeys instead"` - IndexKeys []string `yaml:"indexKeys,omitempty" json:"indexKeys,omitempty" doc:"internal fields to index TopK"` - OperationType string `yaml:"operationType,omitempty" json:"operationType,omitempty" enum:"FilterOperationEnum" doc:"sum, min, max, avg, count, last or diff"` - OperationKey string `yaml:"operationKey,omitempty" json:"operationKey,omitempty" doc:"internal field on which to perform the operation"` - TopK int `yaml:"topK,omitempty" json:"topK,omitempty" doc:"number of highest incidence to report (default - report all)"` - Reversed bool `yaml:"reversed,omitempty" json:"reversed,omitempty" doc:"report lowest incidence instead of highest (default - false)"` - TimeInterval Duration `yaml:"timeInterval,omitempty" json:"timeInterval,omitempty" doc:"time duration of data to use to compute the metric"` + Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"description of filter result"` + IndexKey string `yaml:"indexKey,omitempty" json:"indexKey,omitempty" doc:"internal field to index TopK. Deprecated, use indexKeys instead"` + IndexKeys []string `yaml:"indexKeys,omitempty" json:"indexKeys,omitempty" doc:"internal fields to index TopK"` + OperationType FilterOperationEnum `yaml:"operationType,omitempty" json:"operationType,omitempty" doc:"(enum) sum, min, max, avg, count, last or diff"` + OperationKey string `yaml:"operationKey,omitempty" json:"operationKey,omitempty" doc:"internal field on which to perform the operation"` + TopK int `yaml:"topK,omitempty" json:"topK,omitempty" doc:"number of highest incidence to report (default - report all)"` + Reversed bool `yaml:"reversed,omitempty" json:"reversed,omitempty" doc:"report lowest incidence instead of highest (default - false)"` + TimeInterval Duration `yaml:"timeInterval,omitempty" json:"timeInterval,omitempty" doc:"time duration of data to use to compute the metric"` } diff --git a/pkg/api/sasl.go b/pkg/api/sasl.go index e5717b0e6..b00544f30 100644 --- a/pkg/api/sasl.go +++ b/pkg/api/sasl.go @@ -1,16 +1,15 @@ package api type SASLConfig struct { - Type string - ClientIDPath string `yaml:"clientIDPath,omitempty" json:"clientIDPath,omitempty" doc:"path to the client ID / SASL username"` - ClientSecretPath string `yaml:"clientSecretPath,omitempty" json:"clientSecretPath,omitempty" doc:"path to the client secret / SASL password"` + Type SASLTypeEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"SASL type"` + ClientIDPath string `yaml:"clientIDPath,omitempty" json:"clientIDPath,omitempty" doc:"path to the client ID / SASL username"` + ClientSecretPath string `yaml:"clientSecretPath,omitempty" json:"clientSecretPath,omitempty" doc:"path to the client secret / SASL password"` } -type SASLTypeEnum struct { - Plain string `yaml:"plain" json:"plain" doc:"Plain SASL"` - ScramSHA512 string `yaml:"scramSHA512" json:"scramSHA512" doc:"SCRAM/SHA512 SASL"` -} +type SASLTypeEnum string -func SASLTypeName(operation string) string { - return GetEnumName(SASLTypeEnum{}, operation) -} +const ( + // For doc generation, enum definitions must match format `Constant Type = "value" // doc` + SASLPlain SASLTypeEnum = "plain" // Plain SASL + SASLScramSHA512 SASLTypeEnum = "scramSHA512" // SCRAM/SHA512 SASL +) diff --git a/pkg/api/transform_filter.go b/pkg/api/transform_filter.go index 8029df23d..f2b995fde 100644 --- a/pkg/api/transform_filter.go +++ b/pkg/api/transform_filter.go @@ -21,29 +21,46 @@ type TransformFilter struct { Rules []TransformFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"` } -type TransformFilterOperationEnum struct { - RemoveField string `yaml:"remove_field" json:"remove_field" doc:"removes the field from the entry"` - RemoveEntryIfExists string `yaml:"remove_entry_if_exists" json:"remove_entry_if_exists" doc:"removes the entry if the field exists"` - RemoveEntryIfDoesntExist string `yaml:"remove_entry_if_doesnt_exist" json:"remove_entry_if_doesnt_exist" doc:"removes the entry if the field does not exist"` - RemoveEntryIfEqual string `yaml:"remove_entry_if_equal" json:"remove_entry_if_equal" doc:"removes the entry if the field value equals specified value"` - RemoveEntryIfNotEqual string `yaml:"remove_entry_if_not_equal" json:"remove_entry_if_not_equal" doc:"removes the entry if the field value does not equal specified value"` - AddField string `yaml:"add_field" json:"add_field" doc:"adds (input) field to the entry; overrides previous value if present (key=input, value=value)"` - AddFieldIfDoesntExist string `yaml:"add_field_if_doesnt_exist" json:"add_field_if_doesnt_exist" doc:"adds a field to the entry if the field does not exist"` - AddFieldIf string `yaml:"add_field_if" json:"add_field_if" doc:"add output field set to assignee if input field satisfies criteria from parameters field"` - AddRegExIf string `yaml:"add_regex_if" json:"add_regex_if" doc:"add output field if input field satisfies regex pattern from parameters field"` - AddLabel string `yaml:"add_label" json:"add_label" doc:"add (input) field to list of labels with value taken from Value field (key=input, value=value)"` - AddLabelIf string `yaml:"add_label_if" json:"add_label_if" doc:"add output field to list of labels with value taken from assignee field if input field satisfies criteria from parameters field"` +type TransformFilterEnum string + +const ( + // For doc generation, enum definitions must match format `Constant Type = "value" // doc` + RemoveField TransformFilterEnum = "remove_field" // removes the field from the entry + RemoveEntryIfExists TransformFilterEnum = "remove_entry_if_exists" // removes the entry if the field exists + RemoveEntryIfDoesntExist TransformFilterEnum = "remove_entry_if_doesnt_exist" // removes the entry if the field does not exist + RemoveEntryIfEqual TransformFilterEnum = "remove_entry_if_equal" // removes the entry if the field value equals specified value + RemoveEntryIfNotEqual TransformFilterEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value + AddField TransformFilterEnum = "add_field" // adds (input) field to the entry; overrides previous value if present (key=input, value=value) + AddFieldIfDoesntExist TransformFilterEnum = "add_field_if_doesnt_exist" // adds a field to the entry if the field does not exist + AddFieldIf TransformFilterEnum = "add_field_if" // add output field set to assignee if input field satisfies criteria from parameters field + AddRegExIf TransformFilterEnum = "add_regex_if" // add output field if input field satisfies regex pattern from parameters field + AddLabel TransformFilterEnum = "add_label" // add (input) field to list of labels with value taken from Value field (key=input, value=value) + AddLabelIf TransformFilterEnum = "add_label_if" // add output field to list of labels with value taken from assignee field if input field satisfies criteria from parameters field +) + +type TransformFilterRule struct { + Type TransformFilterEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"` + RemoveField *TransformFilterGenericRule `yaml:"removeField,omitempty" json:"removeField,omitempty" doc:"configuration for remove_field rule"` + RemoveEntryIfExists *TransformFilterGenericRule `yaml:"removeEntryIfExists,omitempty" json:"removeEntryIfExists,omitempty" doc:"configuration for remove_entry_if_exists rule"` + RemoveEntryIfDoesntExist *TransformFilterGenericRule `yaml:"removeEntryIfDoesntExist,omitempty" json:"removeEntryIfDoesntExist,omitempty" doc:"configuration for remove_entry_if_doesnt_exist rule"` + RemoveEntryIfEqual *TransformFilterGenericRule `yaml:"removeEntryIfEqual,omitempty" json:"removeEntryIfEqual,omitempty" doc:"configuration for remove_entry_if_equal rule"` + RemoveEntryIfNotEqual *TransformFilterGenericRule `yaml:"removeEntryIfNotEqual,omitempty" json:"removeEntryIfNotEqual,omitempty" doc:"configuration for remove_entry_if_not_equal rule"` + AddField *TransformFilterGenericRule `yaml:"addField,omitempty" json:"addField,omitempty" doc:"configuration for add_field rule"` + AddFieldIfDoesntExist *TransformFilterGenericRule `yaml:"addFieldIfDoesntExist,omitempty" json:"addFieldIfDoesntExist,omitempty" doc:"configuration for add_field_if_doesnt_exist rule"` + AddFieldIf *TransformFilterRuleWithAssignee `yaml:"addFieldIf,omitempty" json:"addFieldIf,omitempty" doc:"configuration for add_field_if rule"` + AddRegExIf *TransformFilterRuleWithAssignee `yaml:"addRegexIf,omitempty" json:"addRegexIf,omitempty" doc:"configuration for add_regex_if rule"` + AddLabel *TransformFilterGenericRule `yaml:"addLabel,omitempty" json:"addLabel,omitempty" doc:"configuration for add_label rule"` + AddLabelIf *TransformFilterRuleWithAssignee `yaml:"addLabelIf,omitempty" json:"addLabelIf,omitempty" doc:"configuration for add_label_if rule"` } -func TransformFilterOperationName(operation string) string { - return GetEnumName(TransformFilterOperationEnum{}, operation) +type TransformFilterGenericRule struct { + Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` + Value interface{} `yaml:"value,omitempty" json:"value,omitempty" doc:"specified value of input field:"` } -type TransformFilterRule struct { - Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` - Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` - Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformFilterOperationEnum" doc:"one of the following:"` - Value interface{} `yaml:"value,omitempty" json:"value,omitempty" doc:"specified value of input field:"` - Parameters string `yaml:"parameters,omitempty" json:"parameters,omitempty" doc:"parameters specific to type"` - Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"` +type TransformFilterRuleWithAssignee struct { + Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` + Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"` + Parameters string `yaml:"parameters,omitempty" json:"parameters,omitempty" doc:"parameters specific to type"` + Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"` } diff --git a/pkg/api/transform_generic.go b/pkg/api/transform_generic.go index d03e46cc0..c4b94a2b5 100644 --- a/pkg/api/transform_generic.go +++ b/pkg/api/transform_generic.go @@ -18,18 +18,17 @@ package api type TransformGeneric struct { - Policy string `yaml:"policy,omitempty" json:"policy,omitempty" enum:"TransformGenericOperationEnum" doc:"key replacement policy; may be one of the following:"` - Rules []GenericTransformRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of transform rules, each includes:"` + Policy TransformGenericOperationEnum `yaml:"policy,omitempty" json:"policy,omitempty" doc:"(enum) key replacement policy; may be one of the following:"` + Rules []GenericTransformRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of transform rules, each includes:"` } -type TransformGenericOperationEnum struct { - PreserveOriginalKeys string `yaml:"preserve_original_keys" json:"preserve_original_keys" doc:"adds new keys in addition to existing keys (default)"` - ReplaceKeys string `yaml:"replace_keys" json:"replace_keys" doc:"removes all old keys and uses only the new keys"` -} +type TransformGenericOperationEnum string -func TransformGenericOperationName(operation string) string { - return GetEnumName(TransformGenericOperationEnum{}, operation) -} +const ( + // For doc generation, enum definitions must match format `Constant Type = "value" // doc` + PreserveOriginalKeys TransformGenericOperationEnum = "preserve_original_keys" // adds new keys in addition to existing keys (default) + ReplaceKeys TransformGenericOperationEnum = "replace_keys" // removes all old keys and uses only the new keys +) type GenericTransformRule struct { Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"` diff --git a/pkg/api/transform_network.go b/pkg/api/transform_network.go index 1156c6276..012a6cbd5 100644 --- a/pkg/api/transform_network.go +++ b/pkg/api/transform_network.go @@ -38,38 +38,27 @@ func (tn *TransformNetwork) GetServiceFiles() (string, string) { return p, s } +type TransformNetworkOperationEnum string + const ( - OpAddSubnet = "add_subnet" - OpAddLocation = "add_location" - OpAddService = "add_service" - OpAddKubernetes = "add_kubernetes" - OpAddKubernetesInfra = "add_kubernetes_infra" - OpReinterpretDirection = "reinterpret_direction" - OpAddIPCategory = "add_ip_category" + // For doc generation, enum definitions must match format `Constant Type = "value" // doc` + NetworkAddSubnet TransformNetworkOperationEnum = "add_subnet" // add output subnet field from input field and prefix length from parameters field + NetworkAddLocation TransformNetworkOperationEnum = "add_location" // add output location fields from input + NetworkAddService TransformNetworkOperationEnum = "add_service" // add output network service field from input port and parameters protocol field + NetworkAddKubernetes TransformNetworkOperationEnum = "add_kubernetes" // add output kubernetes fields from input + NetworkAddKubernetesInfra TransformNetworkOperationEnum = "add_kubernetes_infra" // add output kubernetes isInfra field from input + NetworkReinterpretDirection TransformNetworkOperationEnum = "reinterpret_direction" // reinterpret flow direction at the node level (instead of net interface), to ease the deduplication process + NetworkAddIPCategory TransformNetworkOperationEnum = "add_ip_category" // categorize IPs based on known subnets configuration ) -type TransformNetworkOperationEnum struct { - AddSubnet string `yaml:"add_subnet" json:"add_subnet" doc:"add output subnet field from input field and prefix length from parameters field"` - AddLocation string `yaml:"add_location" json:"add_location" doc:"add output location fields from input"` - AddService string `yaml:"add_service" json:"add_service" doc:"add output network service field from input port and parameters protocol field"` - AddKubernetes string `yaml:"add_kubernetes" json:"add_kubernetes" doc:"add output kubernetes fields from input"` - AddKubernetesInfra string `yaml:"add_kubernetes_infra" json:"add_kubernetes_infra" doc:"add output kubernetes isInfra field from input"` - ReinterpretDirection string `yaml:"reinterpret_direction" json:"reinterpret_direction" doc:"reinterpret flow direction at the node level (instead of net interface), to ease the deduplication process"` - AddIPCategory string `yaml:"add_ip_category" json:"add_ip_category" doc:"categorize IPs based on known subnets configuration"` -} - -func TransformNetworkOperationName(operation string) string { - return GetEnumName(TransformNetworkOperationEnum{}, operation) -} - type NetworkTransformRule struct { - Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformNetworkOperationEnum" doc:"one of the following:"` - KubernetesInfra *K8sInfraRule `yaml:"kubernetes_infra,omitempty" json:"kubernetes_infra,omitempty" doc:"Kubernetes infra rule configuration"` - Kubernetes *K8sRule `yaml:"kubernetes,omitempty" json:"kubernetes,omitempty" doc:"Kubernetes rule configuration"` - AddSubnet *NetworkAddSubnetRule `yaml:"add_subnet,omitempty" json:"add_subnet,omitempty" doc:"Add subnet rule configuration"` - AddLocation *NetworkGenericRule `yaml:"add_location,omitempty" json:"add_location,omitempty" doc:"Add location rule configuration"` - AddIPCategory *NetworkGenericRule `yaml:"add_ip_category,omitempty" json:"add_ip_category,omitempty" doc:"Add ip category rule configuration"` - AddService *NetworkAddServiceRule `yaml:"add_service,omitempty" json:"add_service,omitempty" doc:"Add service rule configuration"` + Type TransformNetworkOperationEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"` + KubernetesInfra *K8sInfraRule `yaml:"kubernetes_infra,omitempty" json:"kubernetes_infra,omitempty" doc:"Kubernetes infra rule configuration"` + Kubernetes *K8sRule `yaml:"kubernetes,omitempty" json:"kubernetes,omitempty" doc:"Kubernetes rule configuration"` + AddSubnet *NetworkAddSubnetRule `yaml:"add_subnet,omitempty" json:"add_subnet,omitempty" doc:"Add subnet rule configuration"` + AddLocation *NetworkGenericRule `yaml:"add_location,omitempty" json:"add_location,omitempty" doc:"Add location rule configuration"` + AddIPCategory *NetworkGenericRule `yaml:"add_ip_category,omitempty" json:"add_ip_category,omitempty" doc:"Add ip category rule configuration"` + AddService *NetworkAddServiceRule `yaml:"add_service,omitempty" json:"add_service,omitempty" doc:"Add service rule configuration"` } type K8sInfraRule struct { diff --git a/pkg/confgen/confgen_test.go b/pkg/confgen/confgen_test.go index 53a789ced..53198219e 100644 --- a/pkg/confgen/confgen_test.go +++ b/pkg/confgen/confgen_test.go @@ -298,7 +298,7 @@ func Test_RunLongConfGen(t *testing.T) { }, out.Parameters[0].Ingest.Collector) // Expects transform generic - require.Equal(t, "replace_keys", out.Parameters[1].Transform.Generic.Policy) + require.Equal(t, api.ReplaceKeys, out.Parameters[1].Transform.Generic.Policy) // Expects transform network require.Len(t, out.Parameters[2].Transform.Network.Rules, 1) diff --git a/pkg/config/pipeline_builder_test.go b/pkg/config/pipeline_builder_test.go index 4b041b995..568006dec 100644 --- a/pkg/config/pipeline_builder_test.go +++ b/pkg/config/pipeline_builder_test.go @@ -29,13 +29,13 @@ import ( func TestLokiPipeline(t *testing.T) { pl := NewCollectorPipeline("ingest", api.IngestCollector{HostName: "127.0.0.1", Port: 9999}) pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{ - Type: api.AddKubernetesRuleType, + Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ Input: "SrcAddr", Output: "SrcK8S", }, }, { - Type: api.AddKubernetesRuleType, + Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ Input: "DstAddr", Output: "DstK8S", @@ -69,8 +69,8 @@ func TestGRPCPipeline(t *testing.T) { pl := NewGRPCPipeline("grpc", api.IngestGRPCProto{Port: 9050, BufferLen: 50}) pl = pl.TransformFilter("filter", api.TransformFilter{ Rules: []api.TransformFilterRule{{ - Type: "remove_entry_if_doesnt_exist", - Input: "doesnt_exist", + Type: "remove_entry_if_doesnt_exist", + RemoveEntryIfDoesntExist: &api.TransformFilterGenericRule{Input: "doesnt_exist"}, }}, }) pl = pl.WriteStdout("stdout", api.WriteStdout{Format: "json"}) @@ -90,7 +90,7 @@ func TestGRPCPipeline(t *testing.T) { b, err = json.Marshal(params[1]) require.NoError(t, err) - require.JSONEq(t, `{"name":"filter","transform":{"type":"filter","filter":{"rules":[{"input":"doesnt_exist","type":"remove_entry_if_doesnt_exist"}]}}}`, string(b)) + require.JSONEq(t, `{"name":"filter","transform":{"type":"filter","filter":{"rules":[{"removeEntryIfDoesntExist":{"input":"doesnt_exist"},"type":"remove_entry_if_doesnt_exist"}]}}}`, string(b)) b, err = json.Marshal(params[2]) require.NoError(t, err) @@ -110,8 +110,8 @@ func TestKafkaPromPipeline(t *testing.T) { }) pl = pl.TransformFilter("filter", api.TransformFilter{ Rules: []api.TransformFilterRule{{ - Type: "remove_entry_if_doesnt_exist", - Input: "doesnt_exist", + Type: "remove_entry_if_doesnt_exist", + RemoveEntryIfDoesntExist: &api.TransformFilterGenericRule{Input: "doesnt_exist"}, }}, }) pl = pl.ConnTrack("conntrack", api.ConnTrack{ @@ -158,7 +158,7 @@ func TestKafkaPromPipeline(t *testing.T) { b, err = json.Marshal(params[1]) require.NoError(t, err) - require.JSONEq(t, `{"name":"filter","transform":{"type":"filter","filter":{"rules":[{"input":"doesnt_exist","type":"remove_entry_if_doesnt_exist"}]}}}`, string(b)) + require.JSONEq(t, `{"name":"filter","transform":{"type":"filter","filter":{"rules":[{"removeEntryIfDoesntExist":{"input":"doesnt_exist"},"type":"remove_entry_if_doesnt_exist"}]}}}`, string(b)) b, err = json.Marshal(params[2]) require.NoError(t, err) @@ -203,13 +203,13 @@ func TestForkPipeline(t *testing.T) { func TestIPFIXPipeline(t *testing.T) { pl := NewCollectorPipeline("ingest", api.IngestCollector{HostName: "127.0.0.1", Port: 9999}) pl = pl.TransformNetwork("enrich", api.TransformNetwork{Rules: api.NetworkTransformRules{{ - Type: api.AddKubernetesRuleType, + Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ Input: "SrcAddr", Output: "SrcK8S", }, }, { - Type: api.AddKubernetesRuleType, + Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ Input: "DstAddr", Output: "DstK8S", diff --git a/pkg/pipeline/conntrack_integ_test.go b/pkg/pipeline/conntrack_integ_test.go index 6dac78b27..18361b798 100644 --- a/pkg/pipeline/conntrack_integ_test.go +++ b/pkg/pipeline/conntrack_integ_test.go @@ -25,6 +25,7 @@ import ( "time" test2 "github.com/mariomac/guara/pkg/test" + "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest" @@ -141,7 +142,7 @@ func TestConnTrack(t *testing.T) { "TimeFlowEnd": 1_637_501_829.0, "TimeFlowStart": 1_637_501_079.0, "_HashId": "d28db42bcd8aea8f", - "_RecordType": "endConnection", + "_RecordType": api.ConnTrackOutputRecordTypeEnum("endConnection"), "_IsFirst": false, "numFlowLogs": 5.0, } diff --git a/pkg/pipeline/decode/decode.go b/pkg/pipeline/decode/decode.go index d0e6b4dae..d85511e61 100644 --- a/pkg/pipeline/decode/decode.go +++ b/pkg/pipeline/decode/decode.go @@ -31,9 +31,9 @@ type Decoder interface { func GetDecoder(params api.Decoder) (Decoder, error) { switch params.Type { - case api.DecoderName("JSON"): + case api.DecoderJSON: return NewDecodeJSON() - case api.DecoderName("Protobuf"): + case api.DecoderProtobuf: return decode.NewProtobuf() } panic(fmt.Sprintf("`decode` type %s not defined", params.Type)) diff --git a/pkg/pipeline/encode/encode_kafka.go b/pkg/pipeline/encode/encode_kafka.go index 7dcf0f24f..a59f76b3b 100644 --- a/pkg/pipeline/encode/encode_kafka.go +++ b/pkg/pipeline/encode/encode_kafka.go @@ -76,15 +76,15 @@ func NewEncodeKafka(opMetrics *operational.Metrics, params config.StageParam) (E var balancer kafkago.Balancer switch config.Balancer { - case api.KafkaEncodeBalancerName("RoundRobin"): + case api.KafkaRoundRobin: balancer = &kafkago.RoundRobin{} - case api.KafkaEncodeBalancerName("LeastBytes"): + case api.KafkaLeastBytes: balancer = &kafkago.LeastBytes{} - case api.KafkaEncodeBalancerName("Hash"): + case api.KafkaHash: balancer = &kafkago.Hash{} - case api.KafkaEncodeBalancerName("Crc32"): + case api.KafkaCrc32: balancer = &kafkago.CRC32Balancer{} - case api.KafkaEncodeBalancerName("Murmur2"): + case api.KafkaMurmur2: balancer = &kafkago.Murmur2Balancer{} default: balancer = nil diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index d9f1de669..9f84bf169 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -139,7 +139,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Debugf("Labels = %v", labels) mInfo := CreateMetricInfo(mCfg) switch mCfg.Type { - case api.MetricEncodeOperationName("Counter"): + case api.MetricCounter: counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, labels) err := registerer.Register(counter) if err != nil { @@ -147,7 +147,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En return nil, err } metricCommon.AddCounter(counter, mInfo) - case api.MetricEncodeOperationName("Gauge"): + case api.MetricGauge: gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels) err := registerer.Register(gauge) if err != nil { @@ -155,7 +155,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En return nil, err } metricCommon.AddGauge(gauge, mInfo) - case api.MetricEncodeOperationName("Histogram"): + case api.MetricHistogram: log.Debugf("buckets = %v", mCfg.Buckets) hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels) err := registerer.Register(hist) @@ -164,7 +164,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En return nil, err } metricCommon.AddHist(hist, mInfo) - case api.MetricEncodeOperationName("AggHistogram"): + case api.MetricAggHistogram: log.Debugf("buckets = %v", mCfg.Buckets) hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels) err := registerer.Register(hist) diff --git a/pkg/pipeline/encode/encode_prom_metric.go b/pkg/pipeline/encode/encode_prom_metric.go index fc19f3dbf..407e55155 100644 --- a/pkg/pipeline/encode/encode_prom_metric.go +++ b/pkg/pipeline/encode/encode_prom_metric.go @@ -76,17 +76,17 @@ func NotRegex(filter api.MetricsFilter) Predicate { func filterToPredicate(filter api.MetricsFilter) Predicate { switch filter.Type { - case api.PromFilterEqual: + case api.MetricFilterEqual: return Equal(filter) - case api.PromFilterNotEqual: + case api.MetricFilterNotEqual: return NotEqual(filter) - case api.PromFilterPresence: + case api.MetricFilterPresence: return Presence(filter) - case api.PromFilterAbsence: + case api.MetricFilterAbsence: return Absence(filter) - case api.PromFilterRegex: + case api.MetricFilterRegex: return Regex(filter) - case api.PromFilterNotRegex: + case api.MetricFilterNotRegex: return NotRegex(filter) } // Default = Exact diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go index 3ebd84416..36b2908bd 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go @@ -134,14 +134,14 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar log.Debugf("Labels = %v", labels) mInfo := encode.CreateMetricInfo(mCfg) switch mCfg.Type { - case api.MetricEncodeOperationName("Counter"): + case api.MetricCounter: counter, err := meter.Float64Counter(fullMetricName) if err != nil { log.Errorf("error during counter creation: %v", err) return nil, err } metricCommon.AddCounter(counter, mInfo) - case api.MetricEncodeOperationName("Gauge"): + case api.MetricGauge: // at implementation time, only asynchronous gauges are supported by otel in golang obs := Float64Gauge{observations: make(map[string]Float64GaugeEntry)} gauge, err := meterFactory.Float64ObservableGauge( @@ -153,7 +153,7 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar return nil, err } metricCommon.AddGauge(gauge, mInfo) - case api.MetricEncodeOperationName("Histogram"): + case api.MetricHistogram: var histo metric.Float64Histogram if len(mCfg.Buckets) == 0 { histo, err = meter.Float64Histogram(fullMetricName) @@ -168,7 +168,9 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar return nil, err } metricCommon.AddHist(histo, mInfo) - case "default": + case api.MetricAggHistogram: + fallthrough + default: log.Errorf("invalid metric type = %v, skipping", mCfg.Type) continue } diff --git a/pkg/pipeline/extract/conntrack/aggregator.go b/pkg/pipeline/extract/conntrack/aggregator.go index 4b5e77afe..cf5b1de01 100644 --- a/pkg/pipeline/extract/conntrack/aggregator.go +++ b/pkg/pipeline/extract/conntrack/aggregator.go @@ -68,22 +68,22 @@ func newAggregator(of api.OutputField, metrics *metricsType) (aggregator, error) aggBase := aggregateBase{inputField: inputField, outputField: of.Name, splitAB: of.SplitAB, metrics: metrics, reportMissing: of.ReportMissing} var agg aggregator switch of.Operation { - case api.ConnTrackOperationName("Sum"): + case api.ConnTrackSum: aggBase.initVal = float64(0) agg = &aSum{aggBase} - case api.ConnTrackOperationName("Count"): + case api.ConnTrackCount: aggBase.initVal = float64(0) agg = &aCount{aggBase} - case api.ConnTrackOperationName("Min"): + case api.ConnTrackMin: aggBase.initVal = math.MaxFloat64 agg = &aMin{aggBase} - case api.ConnTrackOperationName("Max"): + case api.ConnTrackMax: aggBase.initVal = -math.MaxFloat64 agg = &aMax{aggBase} - case api.ConnTrackOperationName("First"): + case api.ConnTrackFirst: aggBase.initVal = nil agg = &aFirst{aggBase} - case api.ConnTrackOperationName("Last"): + case api.ConnTrackLast: aggBase.initVal = nil agg = &aLast{aggBase} default: diff --git a/pkg/pipeline/extract/conntrack/conntrack.go b/pkg/pipeline/extract/conntrack/conntrack.go index b0cf369ba..47d67a493 100644 --- a/pkg/pipeline/extract/conntrack/conntrack.go +++ b/pkg/pipeline/extract/conntrack/conntrack.go @@ -104,7 +104,7 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM if ct.shouldOutputNewConnection { record := conn.toGenericMap() addHashField(record, computedHash.hashTotal) - addTypeField(record, api.ConnTrackOutputRecordTypeName("NewConnection")) + addTypeField(record, api.ConnTrackNewConnection) isFirst := conn.markReported() addIsFirstField(record, isFirst) outputRecords = append(outputRecords, record) @@ -120,7 +120,7 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM if ct.shouldOutputFlowLogs { record := fl.Copy() addHashField(record, computedHash.hashTotal) - addTypeField(record, api.ConnTrackOutputRecordTypeName("FlowLog")) + addTypeField(record, api.ConnTrackFlowLog) outputRecords = append(outputRecords, record) ct.metrics.outputRecords.WithLabelValues("flowLog").Inc() } @@ -149,7 +149,7 @@ func (ct *conntrackImpl) popEndConnections() []config.GenericMap { for _, conn := range connections { record := conn.toGenericMap() addHashField(record, conn.getHash().hashTotal) - addTypeField(record, api.ConnTrackOutputRecordTypeName("EndConnection")) + addTypeField(record, api.ConnTrackEndConnection) var isFirst bool if ct.shouldOutputEndConnection { isFirst = conn.markReported() @@ -168,7 +168,7 @@ func (ct *conntrackImpl) prepareHeartbeatRecords() []config.GenericMap { for _, conn := range connections { record := conn.toGenericMap() addHashField(record, conn.getHash().hashTotal) - addTypeField(record, api.ConnTrackOutputRecordTypeName("Heartbeat")) + addTypeField(record, api.ConnTrackHeartbeat) var isFirst bool if ct.shouldOutputHeartbeats { isFirst = conn.markReported() @@ -247,13 +247,13 @@ func NewConnectionTrack(opMetrics *operational.Metrics, params config.StageParam shouldOutputHeartbeats := false for _, option := range cfg.OutputRecordTypes { switch option { - case api.ConnTrackOutputRecordTypeName("FlowLog"): + case api.ConnTrackFlowLog: shouldOutputFlowLogs = true - case api.ConnTrackOutputRecordTypeName("NewConnection"): + case api.ConnTrackNewConnection: shouldOutputNewConnection = true - case api.ConnTrackOutputRecordTypeName("EndConnection"): + case api.ConnTrackEndConnection: shouldOutputEndConnection = true - case api.ConnTrackOutputRecordTypeName("Heartbeat"): + case api.ConnTrackHeartbeat: shouldOutputHeartbeats = true default: return nil, fmt.Errorf("unknown OutputRecordTypes: %v", option) @@ -282,7 +282,7 @@ func addHashField(record config.GenericMap, hashID uint64) { record[api.HashIDFieldName] = strconv.FormatUint(hashID, 16) } -func addTypeField(record config.GenericMap, recordType string) { +func addTypeField(record config.GenericMap, recordType api.ConnTrackOutputRecordTypeEnum) { record[api.RecordTypeFieldName] = recordType } diff --git a/pkg/pipeline/extract/conntrack/conntrack_test.go b/pkg/pipeline/extract/conntrack/conntrack_test.go index 969451ee5..cf2f45210 100644 --- a/pkg/pipeline/extract/conntrack/conntrack_test.go +++ b/pkg/pipeline/extract/conntrack/conntrack_test.go @@ -36,7 +36,7 @@ import ( var opMetrics = operational.NewMetrics(&config.MetricsSettings{}) -func buildMockConnTrackConfig(isBidirectional bool, outputRecordType []string, +func buildMockConnTrackConfig(isBidirectional bool, outputRecordType []api.ConnTrackOutputRecordTypeEnum, heartbeatInterval, endConnectionTimeout time.Duration, terminatingTimeout time.Duration) *config.StageParam { splitAB := isBidirectional var hash api.ConnTrackHash @@ -128,14 +128,14 @@ func TestTrack(t *testing.T) { }{ { "duplicates, doesn't output connection events", - buildMockConnTrackConfig(true, []string{"newConnection", "heartbeat", "endConnection"}, + buildMockConnTrackConfig(true, []api.ConnTrackOutputRecordTypeEnum{"newConnection", "heartbeat", "endConnection"}, heartbeatInterval, endConnectionTimeout, terminatingTimeout), []config.GenericMap{flAB1Duplicated}, []config.GenericMap(nil), }, { "bidirectional, output new connection", - buildMockConnTrackConfig(true, []string{"newConnection"}, + buildMockConnTrackConfig(true, []api.ConnTrackOutputRecordTypeEnum{"newConnection"}, heartbeatInterval, endConnectionTimeout, terminatingTimeout), []config.GenericMap{flAB1, flAB1Duplicated, flAB2, flBA3, flBA4}, []config.GenericMap{ @@ -144,7 +144,7 @@ func TestTrack(t *testing.T) { }, { "bidirectional, output new connection and flow log", - buildMockConnTrackConfig(true, []string{"newConnection", "flowLog"}, + buildMockConnTrackConfig(true, []api.ConnTrackOutputRecordTypeEnum{"newConnection", "flowLog"}, heartbeatInterval, endConnectionTimeout, terminatingTimeout), []config.GenericMap{flAB1, flAB1Duplicated, flAB2, flBA3, flBA4}, []config.GenericMap{ @@ -158,7 +158,7 @@ func TestTrack(t *testing.T) { }, { "unidirectional, output new connection", - buildMockConnTrackConfig(false, []string{"newConnection"}, + buildMockConnTrackConfig(false, []api.ConnTrackOutputRecordTypeEnum{"newConnection"}, heartbeatInterval, endConnectionTimeout, terminatingTimeout), []config.GenericMap{flAB1, flAB1Duplicated, flAB2, flBA3, flBA4}, []config.GenericMap{ @@ -168,7 +168,7 @@ func TestTrack(t *testing.T) { }, { "unidirectional, output new connection and flow log", - buildMockConnTrackConfig(false, []string{"newConnection", "flowLog"}, + buildMockConnTrackConfig(false, []api.ConnTrackOutputRecordTypeEnum{"newConnection", "flowLog"}, heartbeatInterval, endConnectionTimeout, terminatingTimeout), []config.GenericMap{flAB1, flAB1Duplicated, flAB2, flBA3, flBA4}, []config.GenericMap{ @@ -206,7 +206,7 @@ func TestEndConn_Bidirectional(t *testing.T) { endConnectionTimeout := 30 * time.Second terminatingTimeout := 5 * time.Second - conf := buildMockConnTrackConfig(true, []string{"newConnection", "flowLog", "endConnection"}, + conf := buildMockConnTrackConfig(true, []api.ConnTrackOutputRecordTypeEnum{"newConnection", "flowLog", "endConnection"}, heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -296,7 +296,7 @@ func TestEndConn_Unidirectional(t *testing.T) { endConnectionTimeout := 30 * time.Second terminatingTimeout := 5 * time.Second - conf := buildMockConnTrackConfig(false, []string{"newConnection", "flowLog", "endConnection"}, + conf := buildMockConnTrackConfig(false, []api.ConnTrackOutputRecordTypeEnum{"newConnection", "flowLog", "endConnection"}, heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -402,7 +402,7 @@ func TestHeartbeat_Unidirectional(t *testing.T) { endConnectionTimeout := 30 * time.Second terminatingTimeout := 5 * time.Second - conf := buildMockConnTrackConfig(false, []string{"newConnection", "flowLog", "heartbeat", "endConnection"}, + conf := buildMockConnTrackConfig(false, []api.ConnTrackOutputRecordTypeEnum{"newConnection", "flowLog", "heartbeat", "endConnection"}, heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -554,7 +554,7 @@ func TestIsFirst_LongConnection(t *testing.T) { endConnectionTimeout := 30 * time.Second terminatingTimeout := 5 * time.Second - conf := buildMockConnTrackConfig(false, []string{"heartbeat", "endConnection"}, + conf := buildMockConnTrackConfig(false, []api.ConnTrackOutputRecordTypeEnum{"heartbeat", "endConnection"}, heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -646,7 +646,7 @@ func TestIsFirst_ShortConnection(t *testing.T) { endConnectionTimeout := 5 * time.Second terminatingTimeout := 5 * time.Second - conf := buildMockConnTrackConfig(false, []string{"heartbeat", "endConnection"}, + conf := buildMockConnTrackConfig(false, []api.ConnTrackOutputRecordTypeEnum{"heartbeat", "endConnection"}, heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -712,7 +712,7 @@ func TestPrepareUpdateConnectionRecords(t *testing.T) { endConnectionTimeout := 30 * time.Second terminatingTimeout := 5 * time.Second - conf := buildMockConnTrackConfig(false, []string{"heartbeat"}, + conf := buildMockConnTrackConfig(false, []api.ConnTrackOutputRecordTypeEnum{"heartbeat"}, heartbeatInterval, endConnectionTimeout, terminatingTimeout) interval := 10 * time.Second extract, err := NewConnectionTrack(opMetrics, *conf, clk) @@ -774,7 +774,7 @@ func TestScheduling(t *testing.T) { defaultEndConnectionTimeout := 15 * time.Second defaultTerminatingTimeout := 5 * time.Second - conf := buildMockConnTrackConfig(true, []string{"heartbeat", "endConnection"}, + conf := buildMockConnTrackConfig(true, []api.ConnTrackOutputRecordTypeEnum{"heartbeat", "endConnection"}, defaultHeartbeatInterval, defaultEndConnectionTimeout, defaultTerminatingTimeout) // Insert a scheduling group before the default group. // https://github.com/golang/go/wiki/SliceTricks#push-frontunshift @@ -941,7 +941,7 @@ func TestMaxConnections(t *testing.T) { endConnectionTimeout := 30 * time.Second terminatingTimeout := 5 * time.Second - conf := buildMockConnTrackConfig(true, []string{"newConnection", "flowLog", "endConnection"}, + conf := buildMockConnTrackConfig(true, []api.ConnTrackOutputRecordTypeEnum{"newConnection", "flowLog", "endConnection"}, heartbeatInterval, endConnectionTimeout, terminatingTimeout) conf.Extract.ConnTrack.MaxConnectionsTracked = maxConnections extract, err := NewConnectionTrack(opMetrics, *conf, clk) @@ -976,7 +976,7 @@ func TestIsLastFlowLogOfConnection(t *testing.T) { endConnectionTimeout := 10 * time.Second terminatingTimeout := 5 * time.Second - conf := buildMockConnTrackConfig(true, []string{}, + conf := buildMockConnTrackConfig(true, []api.ConnTrackOutputRecordTypeEnum{}, heartbeatInterval, endConnectionTimeout, terminatingTimeout) tcpFlagsFieldName := "TCPFlags" conf.Extract.ConnTrack.TCPFlags = api.ConnTrackTCPFlags{ @@ -1051,7 +1051,7 @@ func TestDetectEndConnection(t *testing.T) { defaultEndConnectionTimeout := 10 * time.Second defaultTerminatingTimeout := 5 * time.Second - conf := buildMockConnTrackConfig(true, []string{"newConnection", "endConnection"}, + conf := buildMockConnTrackConfig(true, []api.ConnTrackOutputRecordTypeEnum{"newConnection", "endConnection"}, defaultUpdateConnectionInterval, defaultEndConnectionTimeout, defaultTerminatingTimeout) tcpFlagsFieldName := "TCPFlags" conf.Extract.ConnTrack.TCPFlags = api.ConnTrackTCPFlags{ @@ -1135,7 +1135,7 @@ func TestSwapAB(t *testing.T) { defaultEndConnectionTimeout := 10 * time.Second defaultTerminatingTimeout := 5 * time.Second - conf := buildMockConnTrackConfig(true, []string{"newConnection", "endConnection"}, + conf := buildMockConnTrackConfig(true, []api.ConnTrackOutputRecordTypeEnum{"newConnection", "endConnection"}, defaultUpdateConnectionInterval, defaultEndConnectionTimeout, defaultTerminatingTimeout) tcpFlagsFieldName := "TCPFlags" conf.Extract.ConnTrack.TCPFlags = api.ConnTrackTCPFlags{ @@ -1194,7 +1194,7 @@ func TestExpiringConnection(t *testing.T) { defaultEndConnectionTimeout := 10 * time.Second defaultTerminatingTimeout := 5 * time.Second - conf := buildMockConnTrackConfig(true, []string{"newConnection", "endConnection"}, + conf := buildMockConnTrackConfig(true, []api.ConnTrackOutputRecordTypeEnum{"newConnection", "endConnection"}, defaultUpdateConnectionInterval, defaultEndConnectionTimeout, defaultTerminatingTimeout) tcpFlagsFieldName := "TCPFlags" conf.Extract.ConnTrack.TCPFlags = api.ConnTrackTCPFlags{ diff --git a/pkg/pipeline/extract/conntrack/utils_test.go b/pkg/pipeline/extract/conntrack/utils_test.go index 93693f01d..831e49689 100644 --- a/pkg/pipeline/extract/conntrack/utils_test.go +++ b/pkg/pipeline/extract/conntrack/utils_test.go @@ -139,7 +139,7 @@ func (m *mockRecord) withHash(hashStr string) *mockRecord { return m } -func (m *mockRecord) withType(recordType string) *mockRecord { +func (m *mockRecord) withType(recordType api.ConnTrackOutputRecordTypeEnum) *mockRecord { m.record[api.RecordTypeFieldName] = recordType return m } diff --git a/pkg/pipeline/extract/extract_timebased_test.go b/pkg/pipeline/extract/extract_timebased_test.go index 83bdf5135..7726bd892 100644 --- a/pkg/pipeline/extract/extract_timebased_test.go +++ b/pkg/pipeline/extract/extract_timebased_test.go @@ -227,35 +227,35 @@ func Test_ExtractTimebasedTopAvg(t *testing.T) { expectedOutput := []config.GenericMap{ { "name": "TopK_Bytes1", - "operation": "last", + "operation": api.FilterOperationLast, "Bytes": float64(1000), "index_key": "SrcAddr", "SrcAddr": "10.0.0.4", }, { "name": "TopK_Bytes1", - "operation": "last", + "operation": api.FilterOperationLast, "Bytes": float64(900), "index_key": "SrcAddr", "SrcAddr": "10.0.0.3", }, { "name": "TopK_Bytes1", - "operation": "last", + "operation": api.FilterOperationLast, "Bytes": float64(800), "index_key": "SrcAddr", "SrcAddr": "10.0.0.2", }, { "name": "BotK_Bytes1", - "operation": "avg", + "operation": api.FilterOperationAvg, "Bytes": float64(400), "index_key": "SrcAddr", "SrcAddr": "10.0.0.1", }, { "name": "BotK_Bytes1", - "operation": "avg", + "operation": api.FilterOperationAvg, "Bytes": float64(500), "index_key": "SrcAddr", "SrcAddr": "10.0.0.2", @@ -273,7 +273,7 @@ func Test_ExtractTimebasedSum(t *testing.T) { expectedOutput := []config.GenericMap{ { "name": "TopK_Bytes2", - "operation": "sum", + "operation": api.FilterOperationSum, "Bytes": float64(1800), "index_key": "SrcAddr", "SrcAddr": "10.0.0.3", @@ -291,7 +291,7 @@ func Test_ExtractTimebasedDiff(t *testing.T) { expectedOutput := []config.GenericMap{ { "name": "BotK_Bytes3", - "operation": "diff", + "operation": api.FilterOperationDiff, "Bytes": float64(0), "index_key": "SrcAddr", "SrcAddr": "10.0.0.4", @@ -309,7 +309,7 @@ func Test_ExtractTimebasedMax(t *testing.T) { expectedOutput := []config.GenericMap{ { "name": "TopK_Bytes4", - "operation": "max", + "operation": api.FilterOperationMax, "Bytes": float64(1000), "index_key": "SrcAddr", "SrcAddr": "10.0.0.4", @@ -327,7 +327,7 @@ func Test_ExtractTimebasedMinReversed(t *testing.T) { expectedOutput := []config.GenericMap{ { "name": "BotK_Bytes5", - "operation": "min", + "operation": api.FilterOperationMin, "Bytes": float64(100), "index_key": "SrcAddr", "SrcAddr": "10.0.0.1", @@ -345,28 +345,28 @@ func Test_ExtractTimebasedAllFlows(t *testing.T) { expectedOutput := []config.GenericMap{ { "name": "All_Bytes6", - "operation": "sum", + "operation": api.FilterOperationSum, "Bytes": float64(1200), "index_key": "SrcAddr", "SrcAddr": "10.0.0.1", }, { "name": "All_Bytes6", - "operation": "sum", + "operation": api.FilterOperationSum, "Bytes": float64(1500), "index_key": "SrcAddr", "SrcAddr": "10.0.0.2", }, { "name": "All_Bytes6", - "operation": "sum", + "operation": api.FilterOperationSum, "Bytes": float64(1800), "index_key": "SrcAddr", "SrcAddr": "10.0.0.3", }, { "name": "All_Bytes6", - "operation": "sum", + "operation": api.FilterOperationSum, "Bytes": float64(1000), "index_key": "SrcAddr", "SrcAddr": "10.0.0.4", @@ -386,28 +386,28 @@ func Test_ExtractTimebasedCount(t *testing.T) { expectedOutput := []config.GenericMap{ { "name": "Count_Flows", - "operation": "count", + "operation": api.FilterOperationCnt, "Bytes": float64(3), "index_key": "SrcAddr", "SrcAddr": "10.0.0.1", }, { "name": "Count_Flows", - "operation": "count", + "operation": api.FilterOperationCnt, "Bytes": float64(3), "index_key": "SrcAddr", "SrcAddr": "10.0.0.2", }, { "name": "Count_Flows", - "operation": "count", + "operation": api.FilterOperationCnt, "Bytes": float64(3), "index_key": "SrcAddr", "SrcAddr": "10.0.0.3", }, { "name": "Count_Flows", - "operation": "count", + "operation": api.FilterOperationCnt, "Bytes": float64(1), "index_key": "SrcAddr", "SrcAddr": "10.0.0.4", @@ -427,7 +427,7 @@ func Test_ExtractTimebasedMultiple(t *testing.T) { expectedOutput := []config.GenericMap{ { "name": "BotK_SrcDst_Bytes", - "operation": "avg", + "operation": api.FilterOperationAvg, "Bytes": float64(1000), "index_key": "SrcAddr,DstAddr,Direction", "SrcAddr": "10.0.0.4", @@ -436,7 +436,7 @@ func Test_ExtractTimebasedMultiple(t *testing.T) { }, { "name": "BotK_SrcDst_Bytes", - "operation": "avg", + "operation": api.FilterOperationAvg, "Bytes": float64(500), "index_key": "SrcAddr,DstAddr,Direction", "SrcAddr": "10.0.0.2", diff --git a/pkg/pipeline/extract/timebased/filters.go b/pkg/pipeline/extract/timebased/filters.go index c556ec975..cd47c2188 100644 --- a/pkg/pipeline/extract/timebased/filters.go +++ b/pkg/pipeline/extract/timebased/filters.go @@ -35,8 +35,9 @@ func (fs *FilterStruct) CalculateResults(nowInSecs time.Time) { for tableKey, l := range fs.IndexKeyDataTable.dataTableMap { var valueFloat64 = float64(0) var err error + //nolint:exhaustive switch fs.Rule.OperationType { - case api.FilterOperationName("FilterOperationLast"): + case api.FilterOperationLast: // handle empty list if l.Len() == 0 { continue @@ -45,7 +46,7 @@ func (fs *FilterStruct) CalculateResults(nowInSecs time.Time) { if err != nil { continue } - case api.FilterOperationName("FilterOperationDiff"): + case api.FilterOperationDiff: for e := l.Front(); e != nil; e = e.Next() { cEntry := e.Value.(*TableEntry) if cEntry.timeStamp.Before(oldestValidTime) { @@ -90,35 +91,36 @@ func (fs *FilterStruct) CalculateValue(l *list.List, oldestValidTime time.Time) } else { nItems++ switch fs.Rule.OperationType { - case api.FilterOperationName("FilterOperationSum"), api.FilterOperationName("FilterOperationAvg"): + case api.FilterOperationSum, api.FilterOperationAvg: currentValue += valueFloat64 - case api.FilterOperationName("FilterOperationMax"): + case api.FilterOperationMax: currentValue = math.Max(currentValue, valueFloat64) - case api.FilterOperationName("FilterOperationMin"): + case api.FilterOperationMin: currentValue = math.Min(currentValue, valueFloat64) + case api.FilterOperationCnt, api.FilterOperationLast, api.FilterOperationDiff: } } } - if fs.Rule.OperationType == api.FilterOperationName("FilterOperationAvg") && nItems > 0 { + if fs.Rule.OperationType == api.FilterOperationAvg && nItems > 0 { currentValue = currentValue / float64(nItems) } - if fs.Rule.OperationType == api.FilterOperationName("FilterOperationCnt") { + if fs.Rule.OperationType == api.FilterOperationCnt { currentValue = float64(nItems) } return currentValue } -func getInitValue(operation string) float64 { +func getInitValue(operation api.FilterOperationEnum) float64 { switch operation { - case api.FilterOperationName("FilterOperationSum"), - api.FilterOperationName("FilterOperationAvg"), - api.FilterOperationName("FilterOperationCnt"), - api.FilterOperationName("FilterOperationLast"), - api.FilterOperationName("FilterOperationDiff"): + case api.FilterOperationSum, + api.FilterOperationAvg, + api.FilterOperationCnt, + api.FilterOperationLast, + api.FilterOperationDiff: return 0 - case api.FilterOperationName("FilterOperationMax"): + case api.FilterOperationMax: return (-math.MaxFloat64) - case api.FilterOperationName("FilterOperationMin"): + case api.FilterOperationMin: return math.MaxFloat64 default: log.Panicf("unknown operation %v", operation) diff --git a/pkg/pipeline/extract/timebased/timebased.go b/pkg/pipeline/extract/timebased/timebased.go index c58351b6e..0a003f57e 100644 --- a/pkg/pipeline/extract/timebased/timebased.go +++ b/pkg/pipeline/extract/timebased/timebased.go @@ -87,13 +87,13 @@ func CreateIndexKeysAndFilters(rules []api.TimebasedFilterRule) (map[string]*Ind } // verify the validity of the OperationType field in the filterRule switch filterRule.OperationType { - case api.FilterOperationName("FilterOperationLast"), - api.FilterOperationName("FilterOperationDiff"), - api.FilterOperationName("FilterOperationCnt"), - api.FilterOperationName("FilterOperationAvg"), - api.FilterOperationName("FilterOperationMax"), - api.FilterOperationName("FilterOperationMin"), - api.FilterOperationName("FilterOperationSum"): + case api.FilterOperationLast, + api.FilterOperationDiff, + api.FilterOperationCnt, + api.FilterOperationAvg, + api.FilterOperationMax, + api.FilterOperationMin, + api.FilterOperationSum: // OK; nothing to do default: log.Errorf("illegal operation type %s", filterRule.OperationType) diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index 97e02114b..c5dac331a 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -294,6 +294,6 @@ func NewIngestKafka(opMetrics *operational.Metrics, params config.StageParam) (I batchMaxLength: bml, batchReadTimeout: batchReadTimeout, metrics: metrics, - canLogMessages: jsonIngestKafka.Decoder.Type == api.DecoderName("JSON"), + canLogMessages: jsonIngestKafka.Decoder.Type == api.DecoderJSON, }, nil } diff --git a/pkg/pipeline/ingest/ingest_stdin.go b/pkg/pipeline/ingest/ingest_stdin.go index 1aa248056..1b425cf4d 100644 --- a/pkg/pipeline/ingest/ingest_stdin.go +++ b/pkg/pipeline/ingest/ingest_stdin.go @@ -102,7 +102,7 @@ func NewIngestStdin(opMetrics *operational.Metrics, params config.StageParam) (I in := make(chan string, stdinChannelSize) eof := make(chan struct{}) metrics := newMetrics(opMetrics, params.Name, params.Ingest.Type, func() int { return len(in) }) - decoderParams := api.Decoder{Type: api.DecoderName("JSON")} + decoderParams := api.Decoder{Type: api.DecoderJSON} decoder, err := decode.GetDecoder(decoderParams) if err != nil { return nil, err diff --git a/pkg/pipeline/transform/kubernetes/enrich_test.go b/pkg/pipeline/transform/kubernetes/enrich_test.go index 0a8079eaf..e1339609e 100644 --- a/pkg/pipeline/transform/kubernetes/enrich_test.go +++ b/pkg/pipeline/transform/kubernetes/enrich_test.go @@ -63,7 +63,7 @@ var nodes = map[string]*inf.Info{ var rules = api.NetworkTransformRules{ { - Type: api.OpAddKubernetes, + Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ Input: "SrcAddr", Output: "SrcK8s", @@ -71,7 +71,7 @@ var rules = api.NetworkTransformRules{ }, }, { - Type: api.OpAddKubernetes, + Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ Input: "DstAddr", Output: "DstK8s", @@ -162,7 +162,7 @@ func TestEnrich(t *testing.T) { var otelRules = api.NetworkTransformRules{ { - Type: api.OpAddKubernetes, + Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ Input: "source.ip", Output: "source.", @@ -171,7 +171,7 @@ var otelRules = api.NetworkTransformRules{ }, }, { - Type: api.OpAddKubernetes, + Type: api.NetworkAddKubernetes, Kubernetes: &api.K8sRule{ Input: "destination.ip", Output: "destination.", diff --git a/pkg/pipeline/transform/transform_filter.go b/pkg/pipeline/transform/transform_filter.go index 5c5fbdf8e..db233da4d 100644 --- a/pkg/pipeline/transform/transform_filter.go +++ b/pkg/pipeline/transform/transform_filter.go @@ -45,72 +45,72 @@ func (f *Filter) Transform(entry config.GenericMap) (config.GenericMap, bool) { for _, rule := range f.Rules { tlog.Tracef("rule = %v", rule) switch rule.Type { - case api.TransformFilterOperationName("RemoveField"): - delete(outputEntry, rule.Input) - case api.TransformFilterOperationName("RemoveEntryIfExists"): - if _, ok := entry[rule.Input]; ok { + case api.RemoveField: + delete(outputEntry, rule.RemoveField.Input) + case api.RemoveEntryIfExists: + if _, ok := entry[rule.RemoveEntryIfExists.Input]; ok { return nil, false } - case api.TransformFilterOperationName("RemoveEntryIfDoesntExist"): - if _, ok := entry[rule.Input]; !ok { + case api.RemoveEntryIfDoesntExist: + if _, ok := entry[rule.RemoveEntryIfDoesntExist.Input]; !ok { return nil, false } - case api.TransformFilterOperationName("RemoveEntryIfEqual"): - if val, ok := entry[rule.Input]; ok { - if val == rule.Value { + case api.RemoveEntryIfEqual: + if val, ok := entry[rule.RemoveEntryIfEqual.Input]; ok { + if val == rule.RemoveEntryIfEqual.Value { return nil, false } } - case api.TransformFilterOperationName("RemoveEntryIfNotEqual"): - if val, ok := entry[rule.Input]; ok { - if val != rule.Value { + case api.RemoveEntryIfNotEqual: + if val, ok := entry[rule.RemoveEntryIfNotEqual.Input]; ok { + if val != rule.RemoveEntryIfNotEqual.Value { return nil, false } } - case api.TransformFilterOperationName("AddField"): - outputEntry[rule.Input] = rule.Value - case api.TransformFilterOperationName("AddFieldIfDoesntExist"): - if _, ok := entry[rule.Input]; !ok { - outputEntry[rule.Input] = rule.Value + case api.AddField: + outputEntry[rule.AddField.Input] = rule.AddField.Value + case api.AddFieldIfDoesntExist: + if _, ok := entry[rule.AddFieldIfDoesntExist.Input]; !ok { + outputEntry[rule.AddFieldIfDoesntExist.Input] = rule.AddFieldIfDoesntExist.Value } - case api.TransformFilterOperationName("AddRegExIf"): - matched, err := regexp.MatchString(rule.Parameters, fmt.Sprintf("%s", outputEntry[rule.Input])) + case api.AddRegExIf: + matched, err := regexp.MatchString(rule.AddRegExIf.Parameters, fmt.Sprintf("%s", outputEntry[rule.AddRegExIf.Input])) if err != nil { continue } if matched { - outputEntry[rule.Output] = outputEntry[rule.Input] - outputEntry[rule.Output+"_Matched"] = true + outputEntry[rule.AddRegExIf.Output] = outputEntry[rule.AddRegExIf.Input] + outputEntry[rule.AddRegExIf.Output+"_Matched"] = true } - case api.TransformFilterOperationName("AddFieldIf"): - expressionString := fmt.Sprintf("val %s", rule.Parameters) + case api.AddFieldIf: + expressionString := fmt.Sprintf("val %s", rule.AddFieldIf.Parameters) expression, err := govaluate.NewEvaluableExpression(expressionString) if err != nil { log.Warningf("Can't evaluate AddIf rule: %+v expression: %v. err %v", rule, expressionString, err) continue } - result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntry[rule.Input]}) + result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntry[rule.AddFieldIf.Input]}) if evaluateErr == nil && result.(bool) { - if rule.Assignee != "" { - outputEntry[rule.Output] = rule.Assignee + if rule.AddFieldIf.Assignee != "" { + outputEntry[rule.AddFieldIf.Output] = rule.AddFieldIf.Assignee } else { - outputEntry[rule.Output] = outputEntry[rule.Input] + outputEntry[rule.AddFieldIf.Output] = outputEntry[rule.AddFieldIf.Input] } - outputEntry[rule.Output+"_Evaluate"] = true + outputEntry[rule.AddFieldIf.Output+"_Evaluate"] = true } - case api.TransformFilterOperationName("AddLabel"): - labels[rule.Input], _ = utils.ConvertToString(rule.Value) - case api.TransformFilterOperationName("AddLabelIf"): + case api.AddLabel: + labels[rule.AddLabel.Input], _ = utils.ConvertToString(rule.AddLabel.Value) + case api.AddLabelIf: // TODO perhaps add a cache of previously evaluated expressions - expressionString := fmt.Sprintf("val %s", rule.Parameters) + expressionString := fmt.Sprintf("val %s", rule.AddLabelIf.Parameters) expression, err := govaluate.NewEvaluableExpression(expressionString) if err != nil { log.Warningf("Can't evaluate AddLabelIf rule: %+v expression: %v. err %v", rule, expressionString, err) continue } - result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntry[rule.Input]}) + result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntry[rule.AddLabelIf.Input]}) if evaluateErr == nil && result.(bool) { - labels[rule.Output] = rule.Assignee + labels[rule.AddLabelIf.Output] = rule.AddLabelIf.Assignee } default: tlog.Panicf("unknown type %s for transform.Filter rule: %v", rule.Type, rule) diff --git a/pkg/pipeline/transform/transform_filter_test.go b/pkg/pipeline/transform/transform_filter_test.go index 82557585d..6051e7d20 100644 --- a/pkg/pipeline/transform/transform_filter_test.go +++ b/pkg/pipeline/transform/transform_filter_test.go @@ -36,10 +36,12 @@ parameters: type: filter filter: rules: - - input: dstPort - type: remove_field - - input: srcPort - type: remove_field + - type: remove_field + removeField: + input: dstPort + - type: remove_field + removeField: + input: srcPort ` const testConfigTransformFilterRemoveEntryIfExists = `--- @@ -52,8 +54,10 @@ parameters: type: filter filter: rules: - - input: srcPort - type: remove_entry_if_exists + - type: remove_entry_if_exists + removeEntryIfExists: + input: srcPort + ` const testConfigTransformFilterRemoveEntryIfDoesntExists = `--- @@ -66,8 +70,10 @@ parameters: type: filter filter: rules: - - input: doesntSrcPort - type: remove_entry_if_doesnt_exist + - type: remove_entry_if_doesnt_exist + removeEntryIfDoesntExist: + input: doesntSrcPort + ` const testConfigTransformFilterRemoveEntryIfEqual = `--- log-level: debug @@ -79,12 +85,14 @@ parameters: type: filter filter: rules: - - input: message - type: remove_entry_if_equal - value: "test message" - - input: value - type: remove_entry_if_equal - value: 8.0 + - type: remove_entry_if_equal + removeEntryIfEqual: + input: message + value: "test message" + - type: remove_entry_if_equal + removeEntryIfEqual: + input: value + value: 8.0 ` const testConfigTransformFilterRemoveEntryIfNotEqual = `--- @@ -97,9 +105,10 @@ parameters: type: filter filter: rules: - - input: message - type: remove_entry_if_not_equal - value: "test message" + - type: remove_entry_if_not_equal + removeEntryIfNotEqual: + input: message + value: "test message" ` const testConfigTransformFilterAddField = `--- @@ -112,12 +121,14 @@ parameters: type: filter filter: rules: - - input: dstPort - type: add_field_if_doesnt_exist - value: dummy_value - - input: dummy_field - type: add_field_if_doesnt_exist - value: dummy_value + - type: add_field_if_doesnt_exist + addFieldIfDoesntExist: + input: dstPort + value: dummy_value + - type: add_field_if_doesnt_exist + addFieldIfDoesntExist: + input: dummy_field + value: dummy_value ` func getFilterExpectedOutput() config.GenericMap { @@ -236,44 +247,56 @@ func Test_Transform_AddIfScientificNotation(t *testing.T) { newFilter := Filter{ Rules: []api.TransformFilterRule{ { - Input: "value", - Output: "bigger_than_10", - Type: "add_field_if", - Parameters: ">10", + Type: "add_field_if", + AddFieldIf: &api.TransformFilterRuleWithAssignee{ + Input: "value", + Output: "bigger_than_10", + Parameters: ">10", + }, }, { - Input: "value", - Output: "smaller_than_10", - Type: "add_field_if", - Parameters: "<10", + Type: "add_field_if", + AddFieldIf: &api.TransformFilterRuleWithAssignee{ + Input: "value", + Output: "smaller_than_10", + Parameters: "<10", + }, }, { - Input: "value", - Output: "dir", - Assignee: "in", - Type: "add_field_if", - Parameters: "==1", + Type: "add_field_if", + AddFieldIf: &api.TransformFilterRuleWithAssignee{ + Input: "value", + Output: "dir", + Assignee: "in", + Parameters: "==1", + }, }, { - Input: "value", - Output: "dir", - Assignee: "out", - Type: "add_field_if", - Parameters: "==0", + Type: "add_field_if", + AddFieldIf: &api.TransformFilterRuleWithAssignee{ + Input: "value", + Output: "dir", + Assignee: "out", + Parameters: "==0", + }, }, { - Input: "value", - Output: "not_one", - Assignee: "true", - Type: "add_field_if", - Parameters: "!=1", + Type: "add_field_if", + AddFieldIf: &api.TransformFilterRuleWithAssignee{ + Input: "value", + Output: "not_one", + Assignee: "true", + Parameters: "!=1", + }, }, { - Input: "value", - Output: "not_one", - Assignee: "false", - Type: "add_field_if", - Parameters: "==1", + Type: "add_field_if", + AddFieldIf: &api.TransformFilterRuleWithAssignee{ + Input: "value", + Output: "not_one", + Assignee: "false", + Parameters: "==1", + }, }, }, } @@ -329,17 +352,20 @@ parameters: type: filter filter: rules: - - input: subnetSrcIP - type: add_field_if_doesnt_exist - value: 10.0.0.0/24 - - input: subnetSrcIP - output: match-10.0.* - type: add_regex_if - parameters: 10.0.* - - input: subnetSrcIP - output: match-11.0.* - type: add_regex_if - parameters: 11.0.* + - type: add_field_if_doesnt_exist + addFieldIfDoesntExist: + input: subnetSrcIP + value: 10.0.0.0/24 + - type: add_regex_if + addRegexIf: + input: subnetSrcIP + output: match-10.0.* + parameters: 10.0.* + - type: add_regex_if + addRegexIf: + input: subnetSrcIP + output: match-11.0.* + parameters: 11.0.* - name: write1 write: type: stdout @@ -368,32 +394,40 @@ func Test_AddLabelIf(t *testing.T) { Filter: &api.TransformFilter{ Rules: []api.TransformFilterRule{ { - Type: "add_label_if", - Input: "param1", - Parameters: "<10", - Output: "group1", - Assignee: "LT10", + Type: "add_label_if", + AddLabelIf: &api.TransformFilterRuleWithAssignee{ + Input: "param1", + Parameters: "<10", + Output: "group1", + Assignee: "LT10", + }, }, { - Type: "add_label_if", - Input: "param1", - Parameters: ">=10", - Output: "group1", - Assignee: "GE10", + Type: "add_label_if", + AddLabelIf: &api.TransformFilterRuleWithAssignee{ + Input: "param1", + Parameters: ">=10", + Output: "group1", + Assignee: "GE10", + }, }, { - Type: "add_label_if", - Input: "param2", - Parameters: "<5", - Output: "group2", - Assignee: "LT5", + Type: "add_label_if", + AddLabelIf: &api.TransformFilterRuleWithAssignee{ + Input: "param2", + Parameters: "<5", + Output: "group2", + Assignee: "LT5", + }, }, { - Type: "add_label_if", - Input: "param3", - Parameters: "<0", - Output: "group3", - Assignee: "LT0", + Type: "add_label_if", + AddLabelIf: &api.TransformFilterRuleWithAssignee{ + Input: "param3", + Parameters: "<0", + Output: "group3", + Assignee: "LT0", + }, }, }, }, @@ -424,19 +458,25 @@ func Test_AddLabel(t *testing.T) { Filter: &api.TransformFilter{ Rules: []api.TransformFilterRule{ { - Type: "add_label", - Input: "key1", - Value: "value1", + Type: "add_label", + AddLabel: &api.TransformFilterGenericRule{ + Input: "key1", + Value: "value1", + }, }, { - Type: "add_label", - Input: "key2", - Value: "value2", + Type: "add_label", + AddLabel: &api.TransformFilterGenericRule{ + Input: "key2", + Value: "value2", + }, }, { - Type: "add_label", - Input: "key3", - Value: "value3", + Type: "add_label", + AddLabel: &api.TransformFilterGenericRule{ + Input: "key3", + Value: "value3", + }, }, }, }, @@ -478,14 +518,18 @@ func Test_AddField(t *testing.T) { Filter: &api.TransformFilter{ Rules: []api.TransformFilterRule{ { - Type: "add_field", - Input: "field1", - Value: "value1", + Type: "add_field", + AddField: &api.TransformFilterGenericRule{ + Input: "field1", + Value: "value1", + }, }, { - Type: "add_field", - Input: "param1", - Value: "new_value", + Type: "add_field", + AddField: &api.TransformFilterGenericRule{ + Input: "param1", + Value: "new_value", + }, }, }, }, diff --git a/pkg/pipeline/transform/transform_generic.go b/pkg/pipeline/transform/transform_generic.go index e43f6ea77..c1f2e4dcf 100644 --- a/pkg/pipeline/transform/transform_generic.go +++ b/pkg/pipeline/transform/transform_generic.go @@ -26,7 +26,7 @@ import ( var glog = logrus.WithField("component", "transform.Generic") type Generic struct { - policy string + policy api.TransformGenericOperationEnum rules []api.GenericTransformRule } @@ -96,7 +96,7 @@ func NewTransformGeneric(params config.StageParam) (Transformer, error) { rules := genConfig.Rules policy := genConfig.Policy switch policy { - case "replace_keys", "preserve_original_keys", "": + case api.ReplaceKeys, api.PreserveOriginalKeys, "": // valid; nothing to do glog.Infof("NewTransformGeneric, policy = %s", policy) default: diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index 2eed36060..f51b17176 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -55,7 +55,7 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo // TODO: for efficiency and maintainability, maybe each case in the switch below should be an individual implementation of Transformer for _, rule := range n.Rules { switch rule.Type { - case api.OpAddSubnet: + case api.NetworkAddSubnet: if rule.AddSubnet == nil { log.Errorf("Missing add subnet configuration") continue @@ -66,7 +66,7 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo continue } outputEntry[rule.AddSubnet.Output] = ipv4Net.String() - case api.OpAddLocation: + case api.NetworkAddLocation: if rule.AddLocation == nil { log.Errorf("Missing add location configuration") continue @@ -83,7 +83,7 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo outputEntry[rule.AddLocation.Output+"_CityName"] = locationInfo.CityName outputEntry[rule.AddLocation.Output+"_Latitude"] = locationInfo.Latitude outputEntry[rule.AddLocation.Output+"_Longitude"] = locationInfo.Longitude - case api.OpAddService: + case api.NetworkAddService: if rule.AddService == nil { log.Errorf("Missing add service configuration") continue @@ -110,17 +110,17 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo } } outputEntry[rule.AddService.Output] = serviceName - case api.OpAddKubernetes: + case api.NetworkAddKubernetes: kubernetes.Enrich(outputEntry, *rule.Kubernetes) - case api.OpAddKubernetesInfra: + case api.NetworkAddKubernetesInfra: if rule.KubernetesInfra == nil { logrus.Error("transformation rule: Missing configuration ") continue } kubernetes.EnrichLayer(outputEntry, rule.KubernetesInfra) - case api.OpReinterpretDirection: + case api.NetworkReinterpretDirection: reinterpretDirection(outputEntry, &n.DirectionInfo) - case api.OpAddIPCategory: + case api.NetworkAddIPCategory: if rule.AddIPCategory == nil { logrus.Error("AddIPCategory rule: Missing configuration ") continue @@ -169,22 +169,23 @@ func NewTransformNetwork(params config.StageParam) (Transformer, error) { } for _, rule := range jsonNetworkTransform.Rules { switch rule.Type { - case api.OpAddLocation: + case api.NetworkAddLocation: needToInitLocationDB = true - case api.OpAddKubernetes: + case api.NetworkAddKubernetes: needToInitKubeData = true - case api.OpAddKubernetesInfra: + case api.NetworkAddKubernetesInfra: needToInitKubeData = true - case api.OpAddService: + case api.NetworkAddService: needToInitNetworkServices = true - case api.OpReinterpretDirection: + case api.NetworkReinterpretDirection: if err := validateReinterpretDirectionConfig(&jsonNetworkTransform.DirectionInfo); err != nil { return nil, err } - case api.OpAddIPCategory: + case api.NetworkAddIPCategory: if len(jsonNetworkTransform.IPCategories) == 0 { - return nil, fmt.Errorf("a rule '%s' was found, but there are no IP categories configured", api.OpAddIPCategory) + return nil, fmt.Errorf("a rule '%s' was found, but there are no IP categories configured", api.NetworkAddIPCategory) } + case api.NetworkAddSubnet: } } diff --git a/pkg/pipeline/transform/transform_network_direction.go b/pkg/pipeline/transform/transform_network_direction.go index f4c55f687..5f466088f 100644 --- a/pkg/pipeline/transform/transform_network_direction.go +++ b/pkg/pipeline/transform/transform_network_direction.go @@ -15,16 +15,16 @@ const ( func validateReinterpretDirectionConfig(info *api.NetworkTransformDirectionInfo) error { if info.FlowDirectionField == "" { - return fmt.Errorf("invalid config for transform.Network rule %s: missing FlowDirectionField", api.OpReinterpretDirection) + return fmt.Errorf("invalid config for transform.Network rule %s: missing FlowDirectionField", api.NetworkReinterpretDirection) } if info.ReporterIPField == "" { - return fmt.Errorf("invalid config for transform.Network rule %s: missing ReporterIPField", api.OpReinterpretDirection) + return fmt.Errorf("invalid config for transform.Network rule %s: missing ReporterIPField", api.NetworkReinterpretDirection) } if info.SrcHostField == "" { - return fmt.Errorf("invalid config for transform.Network rule %s: missing SrcHostField", api.OpReinterpretDirection) + return fmt.Errorf("invalid config for transform.Network rule %s: missing SrcHostField", api.NetworkReinterpretDirection) } if info.DstHostField == "" { - return fmt.Errorf("invalid config for transform.Network rule %s: missing DstHostField", api.OpReinterpretDirection) + return fmt.Errorf("invalid config for transform.Network rule %s: missing DstHostField", api.NetworkReinterpretDirection) } return nil } diff --git a/pkg/pipeline/transform/transform_network_test.go b/pkg/pipeline/transform/transform_network_test.go index 21da21cbf..e26a1ddbe 100644 --- a/pkg/pipeline/transform/transform_network_test.go +++ b/pkg/pipeline/transform/transform_network_test.go @@ -227,10 +227,10 @@ func Test_Categorize(t *testing.T) { Transform: &config.Transform{ Network: &api.TransformNetwork{ Rules: []api.NetworkTransformRule{ - {Type: api.OpAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr1", Output: "cat1"}}, - {Type: api.OpAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr2", Output: "cat2"}}, - {Type: api.OpAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr3", Output: "cat3"}}, - {Type: api.OpAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr4", Output: "cat4"}}, + {Type: api.NetworkAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr1", Output: "cat1"}}, + {Type: api.NetworkAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr2", Output: "cat2"}}, + {Type: api.NetworkAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr3", Output: "cat3"}}, + {Type: api.NetworkAddIPCategory, AddIPCategory: &api.NetworkGenericRule{Input: "addr4", Output: "cat4"}}, }, IPCategories: []api.NetworkTransformIPCategory{{ Name: "Pods overlay", diff --git a/pkg/pipeline/utils/sasl.go b/pkg/pipeline/utils/sasl.go index 664ee2153..06d0c75d2 100644 --- a/pkg/pipeline/utils/sasl.go +++ b/pkg/pipeline/utils/sasl.go @@ -26,9 +26,9 @@ func SetupSASLMechanism(cfg *api.SASLConfig) (sasl.Mechanism, error) { strPwd := strings.TrimSpace(string(pwd)) var mechanism sasl.Mechanism switch cfg.Type { - case api.SASLTypeName("Plain"): + case api.SASLPlain: mechanism = plain.Mechanism{Username: strID, Password: strPwd} - case api.SASLTypeName("ScramSHA512"): + case api.SASLScramSHA512: mechanism, err = scram.Mechanism(scram.SHA512, strID, strPwd) default: return nil, fmt.Errorf("unknown SASL type: %s", cfg.Type)