diff --git a/cmd/flowlogs-pipeline/main.go b/cmd/flowlogs-pipeline/main.go index c2e1fac9e..fbb39dea9 100644 --- a/cmd/flowlogs-pipeline/main.go +++ b/cmd/flowlogs-pipeline/main.go @@ -106,7 +106,10 @@ func initLogger() { } func dumpConfig(opts config.Options) { - configAsJSON, _ := json.MarshalIndent(opts, "", " ") + configAsJSON, err := json.MarshalIndent(opts, "", " ") + if err != nil { + panic(fmt.Sprintf("error dumping config: %v", err)) + } fmt.Printf("Using configuration:\n%s\n", configAsJSON) } diff --git a/pkg/pipeline/encode/encode_kafka.go b/pkg/pipeline/encode/encode_kafka.go index 44ee57222..7dcf0f24f 100644 --- a/pkg/pipeline/encode/encode_kafka.go +++ b/pkg/pipeline/encode/encode_kafka.go @@ -49,11 +49,16 @@ type encodeKafka struct { // Encode writes entries to kafka topic func (r *encodeKafka) Encode(entry config.GenericMap) { var entryByteArray []byte - entryByteArray, _ = json.Marshal(entry) + var err error + entryByteArray, err = json.Marshal(entry) + if err != nil { + log.Errorf("encodeKafka error: %v", err) + return + } msg := kafkago.Message{ Value: entryByteArray, } - err := r.kafkaWriter.WriteMessages(context.Background(), msg) + err = r.kafkaWriter.WriteMessages(context.Background(), msg) if err != nil { log.Errorf("encodeKafka error: %v", err) } else { diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 2c106e5bb..b96b14dff 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -153,22 +153,26 @@ func (aggregate *Aggregate) UpdateByEntry(entry config.GenericMap, normalizedVal value, ok := entry[operationKey] if ok { valueString := fmt.Sprintf("%v", value) - valueFloat64, _ := strconv.ParseFloat(valueString, 64) - switch operation { - case OperationSum: - groupState.totalValue += valueFloat64 - groupState.recentOpValue += valueFloat64 - case OperationMax: - groupState.totalValue = math.Max(groupState.totalValue, valueFloat64) - groupState.recentOpValue = math.Max(groupState.recentOpValue, valueFloat64) - case OperationMin: - groupState.totalValue = math.Min(groupState.totalValue, valueFloat64) - groupState.recentOpValue = math.Min(groupState.recentOpValue, valueFloat64) - case OperationAvg: - groupState.totalValue = (groupState.totalValue*float64(groupState.totalCount) + valueFloat64) / float64(groupState.totalCount+1) - groupState.recentOpValue = (groupState.recentOpValue*float64(groupState.recentCount) + valueFloat64) / float64(groupState.recentCount+1) - case OperationRawValues: - groupState.recentRawValues = append(groupState.recentRawValues, valueFloat64) + if valueFloat64, err := strconv.ParseFloat(valueString, 64); err != nil { + // Log as debug to avoid performance impact + log.Debugf("UpdateByEntry error when parsing float '%s': %v", valueString, err) + } else { + switch operation { + case OperationSum: + groupState.totalValue += valueFloat64 + groupState.recentOpValue += valueFloat64 + case OperationMax: + groupState.totalValue = math.Max(groupState.totalValue, valueFloat64) + groupState.recentOpValue = math.Max(groupState.recentOpValue, valueFloat64) + case OperationMin: + groupState.totalValue = math.Min(groupState.totalValue, valueFloat64) + groupState.recentOpValue = math.Min(groupState.recentOpValue, valueFloat64) + case OperationAvg: + groupState.totalValue = (groupState.totalValue*float64(groupState.totalCount) + valueFloat64) / float64(groupState.totalCount+1) + groupState.recentOpValue = (groupState.recentOpValue*float64(groupState.recentCount) + valueFloat64) / float64(groupState.recentCount+1) + case OperationRawValues: + groupState.recentRawValues = append(groupState.recentRawValues, valueFloat64) + } } } } diff --git a/pkg/pipeline/extract/aggregate/aggregate_test.go b/pkg/pipeline/extract/aggregate/aggregate_test.go index c1471d27e..c0caf1c35 100644 --- a/pkg/pipeline/extract/aggregate/aggregate_test.go +++ b/pkg/pipeline/extract/aggregate/aggregate_test.go @@ -145,5 +145,5 @@ func Test_GetMetrics(t *testing.T) { require.Equal(t, len(metrics), 1) require.Equal(t, metrics[0]["name"], aggregate.Definition.Name) valueFloat64 := metrics[0]["total_value"].(float64) - require.Equal(t, valueFloat64, float64(7)) + require.Equal(t, float64(7), valueFloat64) } diff --git a/pkg/pipeline/extract/timebased/filters.go b/pkg/pipeline/extract/timebased/filters.go index 060e386a2..c556ec975 100644 --- a/pkg/pipeline/extract/timebased/filters.go +++ b/pkg/pipeline/extract/timebased/filters.go @@ -84,15 +84,19 @@ func (fs *FilterStruct) CalculateValue(l *list.List, oldestValidTime time.Time) // entry is out of time range; ignore it continue } - valueFloat64, _ := utils.ConvertToFloat64(cEntry.entry[fs.Rule.OperationKey]) - nItems++ - switch fs.Rule.OperationType { - case api.FilterOperationName("FilterOperationSum"), api.FilterOperationName("FilterOperationAvg"): - currentValue += valueFloat64 - case api.FilterOperationName("FilterOperationMax"): - currentValue = math.Max(currentValue, valueFloat64) - case api.FilterOperationName("FilterOperationMin"): - currentValue = math.Min(currentValue, valueFloat64) + if valueFloat64, err := utils.ConvertToFloat64(cEntry.entry[fs.Rule.OperationKey]); err != nil { + // Log as debug to avoid performance impact + log.Debugf("CalculateValue error with OperationKey %s: %v", fs.Rule.OperationKey, err) + } else { + nItems++ + switch fs.Rule.OperationType { + case api.FilterOperationName("FilterOperationSum"), api.FilterOperationName("FilterOperationAvg"): + currentValue += valueFloat64 + case api.FilterOperationName("FilterOperationMax"): + currentValue = math.Max(currentValue, valueFloat64) + case api.FilterOperationName("FilterOperationMin"): + currentValue = math.Min(currentValue, valueFloat64) + } } } if fs.Rule.OperationType == api.FilterOperationName("FilterOperationAvg") && nItems > 0 { diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index eb5e895ad..ddce03ac9 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -175,8 +175,12 @@ func (k *ingestKafka) reportStats() { func NewIngestKafka(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) { klog.Debugf("entering NewIngestKafka") jsonIngestKafka := api.IngestKafka{} - if params.Ingest != nil && params.Ingest.Kafka != nil { - jsonIngestKafka = *params.Ingest.Kafka + var ingestType string + if params.Ingest != nil { + ingestType = params.Ingest.Type + if params.Ingest.Kafka != nil { + jsonIngestKafka = *params.Ingest.Kafka + } } // connect to the kafka server @@ -278,7 +282,7 @@ func NewIngestKafka(opMetrics *operational.Metrics, params config.StageParam) (I } in := make(chan []byte, 2*bml) - metrics := newMetrics(opMetrics, params.Name, params.Ingest.Type, func() int { return len(in) }) + metrics := newMetrics(opMetrics, params.Name, ingestType, func() int { return len(in) }) return &ingestKafka{ kafkaReader: kafkaReader, diff --git a/pkg/pipeline/ingest/ingest_synthetic.go b/pkg/pipeline/ingest/ingest_synthetic.go index d133da060..c8da4e1e1 100644 --- a/pkg/pipeline/ingest/ingest_synthetic.go +++ b/pkg/pipeline/ingest/ingest_synthetic.go @@ -85,7 +85,7 @@ func (ingestS *IngestSynthetic) Ingest(out chan<- config.GenericMap) { func NewIngestSynthetic(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) { log.Debugf("entering NewIngestSynthetic") confIngestSynthetic := api.IngestSynthetic{} - if params.Ingest != nil || params.Ingest.Synthetic != nil { + if params.Ingest != nil && params.Ingest.Synthetic != nil { confIngestSynthetic = *params.Ingest.Synthetic } if confIngestSynthetic.Connections == 0 { diff --git a/pkg/pipeline/write/write_ipfix.go b/pkg/pipeline/write/write_ipfix.go index 7a66b6458..724eda370 100644 --- a/pkg/pipeline/write/write_ipfix.go +++ b/pkg/pipeline/write/write_ipfix.go @@ -537,7 +537,7 @@ func NewWriteIpfix(params config.StageParam) (Writer, error) { // Initialize IPFIX registry and send templates registry.LoadRegistry() var err error - if params.Write.Ipfix.EnterpriseID != 0 { + if params.Write != nil && params.Write.Ipfix != nil && params.Write.Ipfix.EnterpriseID != 0 { err = loadCustomRegistry(writeIpfix.enrichEnterpriseID) if err != nil { ilog.Fatalf("Failed to load Custom(%d) Registry", writeIpfix.enrichEnterpriseID)