Skip to content

Commit

Permalink
Fixes a couple of minor weaknesses (#491)
Browse files Browse the repository at this point in the history
Some weaknesses were detected, fixing here a bunch of them, especially
inconsistent null-checks or suppressed errors
  • Loading branch information
jotak authored Sep 14, 2023
1 parent 11a7516 commit 9043619
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 34 deletions.
5 changes: 4 additions & 1 deletion cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ func initLogger() {
}

func dumpConfig(opts config.Options) {
configAsJSON, _ := json.MarshalIndent(opts, "", " ")
configAsJSON, err := json.MarshalIndent(opts, "", " ")
if err != nil {
panic(fmt.Sprintf("error dumping config: %v", err))
}
fmt.Printf("Using configuration:\n%s\n", configAsJSON)
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/pipeline/encode/encode_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,16 @@ type encodeKafka struct {
// Encode writes entries to kafka topic
func (r *encodeKafka) Encode(entry config.GenericMap) {
var entryByteArray []byte
entryByteArray, _ = json.Marshal(entry)
var err error
entryByteArray, err = json.Marshal(entry)
if err != nil {
log.Errorf("encodeKafka error: %v", err)
return
}
msg := kafkago.Message{
Value: entryByteArray,
}
err := r.kafkaWriter.WriteMessages(context.Background(), msg)
err = r.kafkaWriter.WriteMessages(context.Background(), msg)
if err != nil {
log.Errorf("encodeKafka error: %v", err)
} else {
Expand Down
36 changes: 20 additions & 16 deletions pkg/pipeline/extract/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,22 +153,26 @@ func (aggregate *Aggregate) UpdateByEntry(entry config.GenericMap, normalizedVal
value, ok := entry[operationKey]
if ok {
valueString := fmt.Sprintf("%v", value)
valueFloat64, _ := strconv.ParseFloat(valueString, 64)
switch operation {
case OperationSum:
groupState.totalValue += valueFloat64
groupState.recentOpValue += valueFloat64
case OperationMax:
groupState.totalValue = math.Max(groupState.totalValue, valueFloat64)
groupState.recentOpValue = math.Max(groupState.recentOpValue, valueFloat64)
case OperationMin:
groupState.totalValue = math.Min(groupState.totalValue, valueFloat64)
groupState.recentOpValue = math.Min(groupState.recentOpValue, valueFloat64)
case OperationAvg:
groupState.totalValue = (groupState.totalValue*float64(groupState.totalCount) + valueFloat64) / float64(groupState.totalCount+1)
groupState.recentOpValue = (groupState.recentOpValue*float64(groupState.recentCount) + valueFloat64) / float64(groupState.recentCount+1)
case OperationRawValues:
groupState.recentRawValues = append(groupState.recentRawValues, valueFloat64)
if valueFloat64, err := strconv.ParseFloat(valueString, 64); err != nil {
// Log as debug to avoid performance impact
log.Debugf("UpdateByEntry error when parsing float '%s': %v", valueString, err)
} else {
switch operation {
case OperationSum:
groupState.totalValue += valueFloat64
groupState.recentOpValue += valueFloat64
case OperationMax:
groupState.totalValue = math.Max(groupState.totalValue, valueFloat64)
groupState.recentOpValue = math.Max(groupState.recentOpValue, valueFloat64)
case OperationMin:
groupState.totalValue = math.Min(groupState.totalValue, valueFloat64)
groupState.recentOpValue = math.Min(groupState.recentOpValue, valueFloat64)
case OperationAvg:
groupState.totalValue = (groupState.totalValue*float64(groupState.totalCount) + valueFloat64) / float64(groupState.totalCount+1)
groupState.recentOpValue = (groupState.recentOpValue*float64(groupState.recentCount) + valueFloat64) / float64(groupState.recentCount+1)
case OperationRawValues:
groupState.recentRawValues = append(groupState.recentRawValues, valueFloat64)
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/extract/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,5 @@ func Test_GetMetrics(t *testing.T) {
require.Equal(t, len(metrics), 1)
require.Equal(t, metrics[0]["name"], aggregate.Definition.Name)
valueFloat64 := metrics[0]["total_value"].(float64)
require.Equal(t, valueFloat64, float64(7))
require.Equal(t, float64(7), valueFloat64)
}
22 changes: 13 additions & 9 deletions pkg/pipeline/extract/timebased/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,19 @@ func (fs *FilterStruct) CalculateValue(l *list.List, oldestValidTime time.Time)
// entry is out of time range; ignore it
continue
}
valueFloat64, _ := utils.ConvertToFloat64(cEntry.entry[fs.Rule.OperationKey])
nItems++
switch fs.Rule.OperationType {
case api.FilterOperationName("FilterOperationSum"), api.FilterOperationName("FilterOperationAvg"):
currentValue += valueFloat64
case api.FilterOperationName("FilterOperationMax"):
currentValue = math.Max(currentValue, valueFloat64)
case api.FilterOperationName("FilterOperationMin"):
currentValue = math.Min(currentValue, valueFloat64)
if valueFloat64, err := utils.ConvertToFloat64(cEntry.entry[fs.Rule.OperationKey]); err != nil {
// Log as debug to avoid performance impact
log.Debugf("CalculateValue error with OperationKey %s: %v", fs.Rule.OperationKey, err)
} else {
nItems++
switch fs.Rule.OperationType {
case api.FilterOperationName("FilterOperationSum"), api.FilterOperationName("FilterOperationAvg"):
currentValue += valueFloat64
case api.FilterOperationName("FilterOperationMax"):
currentValue = math.Max(currentValue, valueFloat64)
case api.FilterOperationName("FilterOperationMin"):
currentValue = math.Min(currentValue, valueFloat64)
}
}
}
if fs.Rule.OperationType == api.FilterOperationName("FilterOperationAvg") && nItems > 0 {
Expand Down
10 changes: 7 additions & 3 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,12 @@ func (k *ingestKafka) reportStats() {
func NewIngestKafka(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) {
klog.Debugf("entering NewIngestKafka")
jsonIngestKafka := api.IngestKafka{}
if params.Ingest != nil && params.Ingest.Kafka != nil {
jsonIngestKafka = *params.Ingest.Kafka
var ingestType string
if params.Ingest != nil {
ingestType = params.Ingest.Type
if params.Ingest.Kafka != nil {
jsonIngestKafka = *params.Ingest.Kafka
}
}

// connect to the kafka server
Expand Down Expand Up @@ -278,7 +282,7 @@ func NewIngestKafka(opMetrics *operational.Metrics, params config.StageParam) (I
}

in := make(chan []byte, 2*bml)
metrics := newMetrics(opMetrics, params.Name, params.Ingest.Type, func() int { return len(in) })
metrics := newMetrics(opMetrics, params.Name, ingestType, func() int { return len(in) })

return &ingestKafka{
kafkaReader: kafkaReader,
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/ingest/ingest_synthetic.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (ingestS *IngestSynthetic) Ingest(out chan<- config.GenericMap) {
func NewIngestSynthetic(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) {
log.Debugf("entering NewIngestSynthetic")
confIngestSynthetic := api.IngestSynthetic{}
if params.Ingest != nil || params.Ingest.Synthetic != nil {
if params.Ingest != nil && params.Ingest.Synthetic != nil {
confIngestSynthetic = *params.Ingest.Synthetic
}
if confIngestSynthetic.Connections == 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/write/write_ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func NewWriteIpfix(params config.StageParam) (Writer, error) {
// Initialize IPFIX registry and send templates
registry.LoadRegistry()
var err error
if params.Write.Ipfix.EnterpriseID != 0 {
if params.Write != nil && params.Write.Ipfix != nil && params.Write.Ipfix.EnterpriseID != 0 {
err = loadCustomRegistry(writeIpfix.enrichEnterpriseID)
if err != nil {
ilog.Fatalf("Failed to load Custom(%d) Registry", writeIpfix.enrichEnterpriseID)
Expand Down

0 comments on commit 9043619

Please sign in to comment.