Skip to content

Commit

Permalink
add cmetrics decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
gracewehner committed Dec 16, 2024
1 parent a7a013e commit d8a1166
Showing 1 changed file with 145 additions and 23 deletions.
168 changes: 145 additions & 23 deletions output/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ package output
import (
"C"
"encoding/binary"
"fmt"
"reflect"
"strings"
"time"
"unsafe"

"github.com/mitchellh/mapstructure"
"github.com/ugorji/go/codec"
)

Expand Down Expand Up @@ -55,6 +58,116 @@ func (f FLBTime) UpdateExt(dest interface{}, v interface{}) {
panic("unsupported")
}

type AggregationType int64

const (
UNSPECIFIED AggregationType = 0
DELTA AggregationType = 1
CUMMULATIVE AggregationType = 2
)

func (at AggregationType) String() string {
switch at {
case UNSPECIFIED:
return "unspecified"
case DELTA:
return "delta"
case CUMMULATIVE:
return "cumulative"
default:
return ""
}
}

type MetricType int64

const (
COUNTER MetricType = 0
GAUGE MetricType = 1
HISTOGRAM MetricType = 2
SUMMARY MetricType = 3
UNTYPED MetricType = 4
)

func (mt MetricType) String() string {
switch mt {
case COUNTER:
return "counter"
case GAUGE:
return "gauge"
case HISTOGRAM:
return "histogram"
case SUMMARY:
return "summary"
case UNTYPED:
return "untyped"
default:
return ""
}
}

type CMetrics struct {
Meta struct {
Cmetrics map[string]interface{} `mapstructure:"cmetrics"`
External map[string]interface{} `mapstructure:"external"`
Processing struct {
StaticLabels []interface{} `mapstructure:"static_labels"`
} `mapstructure:"processing"`
} `mapstructure:"meta"`
Metrics []struct {
Meta struct {
AggregationType AggregationType `mapstructure:"aggregation_type"`
Labels []string `mapstructure:"labels"`
/* Formatted full qualified metric name is: namespace_subsystem_name */
Opts struct {
Desc string `mapstructure:"desc"`
Name string `mapstructure:"name"`
Namespace string `mapstructure:"ns"`
Subsystem string `mapstructure:"ss"`
} `mapstructure:"opts"`
Type MetricType `mapstructure:"type"`
Ver int `mapstructure:"ver"`
} `mapstructure:"meta"`
Values []struct {
Hash int64 `mapstructure:"hash"`
Labels []string `mapstructure:"labels"`
Ts int64 `mapstructure:"ts"`
Value float64 `mapstructure:"value"`
} `mapstructure:"values"`
} `mapstructure:"metrics"`
}

// Use Prometheus text format when printing CMetrics
func (cm CMetrics) String() string {
var ret strings.Builder

for _, metric := range cm.Metrics {
fullMetricName := fmt.Sprintf("%s_%s_%s", metric.Meta.Opts.Namespace, metric.Meta.Opts.Subsystem, metric.Meta.Opts.Name)
ret.WriteString(fmt.Sprintf("# HELP %s %s\n", fullMetricName, metric.Meta.Opts.Desc))
ret.WriteString(fmt.Sprintf("# TYPE %s %s\n", fullMetricName, metric.Meta.Type))

for _, value := range metric.Values {
ret.WriteString(fmt.Sprintf("%s{", fullMetricName))
for i, labelName := range metric.Meta.Labels {
ret.WriteString(fmt.Sprintf("%s=%s", labelName, value.Labels[i]))
if i < len(metric.Meta.Labels)-1 {
ret.WriteString(",")
}
}
ret.WriteString(fmt.Sprintf("} %.0f\n", value.Value))
}
}

return ret.String()
}

// ConvertRecordToCMetrics converts the data returned by GetRecord() to a CMetrics struct
func ConvertRecordToCMetrics(record map[interface{}]interface{}) (cMetrics CMetrics) {
var result CMetrics
mapstructure.WeakDecode(record, &result)
return result
}

func NewDecoder(data unsafe.Pointer, length int) *FLBDecoder {
var b []byte

Expand All @@ -77,33 +190,42 @@ func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]in
return -1, 0, nil
}

slice := reflect.ValueOf(m)
if slice.Kind() != reflect.Slice || slice.Len() != 2 {
val := reflect.ValueOf(m)
if val.Len() != 2 {
return -2, 0, nil
}

var t interface{}
ts = slice.Index(0).Interface()
switch ty := ts.(type) {
case FLBTime:
t = ty
case uint64:
t = ty
case []interface{}: // for Fluent Bit V2 metadata type of format
s := reflect.ValueOf(ty)
if s.Kind() != reflect.Slice || s.Len() < 2 {
return -4, 0, nil
switch val.Kind() {
case reflect.Map: // Metrics
map_data := val.Interface().(map[interface{}]interface{})
return 0, 0, map_data
case reflect.Slice: // Logs
var t interface{}
ts = val.Index(0).Interface()
switch ty := ts.(type) {
case FLBTime:
t = ty
case uint64:
t = ty
case []interface{}: // for Fluent Bit V2 metadata type of format
s := reflect.ValueOf(ty)
if s.Kind() != reflect.Slice || s.Len() < 2 {
return -4, 0, nil
}
t = s.Index(0).Interface()
default:
return -5, 0, nil
}
t = s.Index(0).Interface()
default:
return -5, 0, nil
}
data := slice.Index(1)
data := val.Index(1)

map_data, ok := data.Interface().(map[interface{}]interface{})
if !ok {
return -3, 0, nil
}
map_data, ok := data.Interface().(map[interface{}]interface{})
if !ok {
return -3, 0, nil
}

return 0, t, map_data
return 0, t, map_data

default: // if the interface is not a map or slice
return -2, 0, nil
}
}

0 comments on commit d8a1166

Please sign in to comment.