Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jaskerv committed Jun 15, 2023
1 parent f0b81a0 commit 3ac0bc4
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 18 deletions.
5 changes: 3 additions & 2 deletions enricher/eks/eks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func Test_EnrichRecord(t *testing.T) {
mappings.LOG_FIELD_NAME: dummyLog,
mappings.KUBERNETES_RESOURCE_FIELD_NAME: map[interface{}]interface{}{
mappings.KUBERNETES_LABELS_FIELD_NAME: map[interface{}]interface{}{
mappings.KUBERNETES_LABELS_NAME: "service name",
mappings.KUBERNETES_LABELS_NAME: "service name",
},
// default value, check if this isn't removed
"key": "value",
Expand Down Expand Up @@ -84,7 +84,7 @@ func Test_EnrichRecord(t *testing.T) {
mappings.KUBERNETES_RESOURCE_CLUSTER_NAME: defaultEnricher.K8sClusterName,
mappings.KUBERNETES_RESOURCE_NODE_NAME: defaultEnricher.K8sNodeName,
mappings.KUBERNETES_LABELS_FIELD_NAME: map[interface{}]interface{}{
mappings.KUBERNETES_LABELS_NAME: "service name",
mappings.KUBERNETES_LABELS_NAME: "service name",
},
},
mappings.OBSERVED_TIMESTAMP: ExpectedTime,
Expand Down Expand Up @@ -120,6 +120,7 @@ func Test_EnrichRecord(t *testing.T) {
delete(expected, mappings.LOG_FIELD_NAME)
k8s := expected[mappings.KUBERNETES_RESOURCE_FIELD_NAME].(map[interface{}]interface{})
delete(k8s, "key")
delete(k8s, "labels")
expected[mappings.KUBERNETES_RESOURCE_FIELD_NAME] = k8s
expected[mappings.MESSAGE_FIELD_NAME] = "message"
expected[mappings.TRANSPORT_FIELD_NAME] = "transport"
Expand Down
26 changes: 10 additions & 16 deletions enricher/eks/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eks

import (
"context"
"fmt"

"github.com/canva/amazon-kinesis-streams-for-fluent-bit/enricher/mappings"
"github.com/canva/amazon-kinesis-streams-for-fluent-bit/metricserver"
Expand All @@ -19,7 +20,7 @@ type EnricherMetric struct {
func WithMetricServer(ms *metricserver.MetricServer) EnricherConfiguration {
return func(e *Enricher) error {
meter := ms.GetMeter("github.com/canva/amazon-kinesis-streams-for-fluent-bit/enricher/eks")
outputRecordCount, err := meter.Int64Counter("fluentbit.output.record", metric.WithDescription("output record counter"))
outputRecordCount, err := meter.Int64Counter("fluentbit.output.record.count", metric.WithDescription("output record counter"))

if err != nil {
return err
Expand All @@ -46,7 +47,7 @@ func (e *Enricher) AddRecordCount(record map[interface{}]interface{}, recordType
return
}

var serviceName = inferServiceName(record, recordType)
var serviceName = inferServiceName(record)

e.metric.outputRecordCount.Add(context.TODO(), 1, metric.WithAttributes(attribute.Key(mappings.RESOURCE_SERVICE_NAME).String(serviceName)))
}
Expand Down Expand Up @@ -76,21 +77,14 @@ func (e *Enricher) AddDropCount() {
// // e.metric.outputSizseCount.Add(context.TODO(), int64(len(jsonStr)), metric.WithAttributes(attribute.Key(mappings.RESOURCE_SERVICE_NAME).String(serviceName)))
// }

func inferServiceName(record map[interface{}]interface{}, recordType int) string {
var serviceName string

switch recordType {
case TYPE_APPLICATION:
k8sPayload := record[mappings.KUBERNETES_RESOURCE_FIELD_NAME].(map[interface{}]interface{})
labels, labelsExist := k8sPayload[mappings.KUBERNETES_LABELS_FIELD_NAME].(map[interface{}]interface{})
if labelsExist {
if val, ok := labels[mappings.KUBERNETES_LABELS_NAME]; ok {
serviceName = val.(string)
}
}
func inferServiceName(record map[interface{}]interface{}) string {
k8sPayload := record[mappings.KUBERNETES_RESOURCE_FIELD_NAME].(map[interface{}]interface{})
var serviceName = fmt.Sprintf("%v", k8sPayload[mappings.KUBERNETES_CONTAINER_NAME])

if serviceName == "" {
serviceName = k8sPayload[mappings.KUBERNETES_CONTAINER_NAME].(string)
labels, labelsExist := k8sPayload[mappings.KUBERNETES_LABELS_FIELD_NAME].(map[interface{}]interface{})
if labelsExist {
if val, ok := labels[mappings.KUBERNETES_LABELS_NAME]; ok {
serviceName = fmt.Sprintf("%v", val)
}
}

Expand Down
69 changes: 69 additions & 0 deletions enricher/eks/metric_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package eks

import (
"testing"

"github.com/canva/amazon-kinesis-streams-for-fluent-bit/enricher/mappings"
"github.com/stretchr/testify/assert"
)

func Test_InferServiceName(t *testing.T) {
type TestCase struct {
Test string
Input map[interface{}]interface{}
Expected string
}

testCases := []TestCase{
{
Test: "Application",
Input: map[interface{}]interface{}{
mappings.KUBERNETES_RESOURCE_FIELD_NAME: map[interface{}]interface{}{
mappings.KUBERNETES_CONTAINER_NAME: "container name",
mappings.KUBERNETES_LABELS_FIELD_NAME: map[interface{}]interface{}{
mappings.KUBERNETES_LABELS_NAME: "rpc",
},
},
},
Expected: "rpc",
},
{
Test: "Application Non String",
Input: map[interface{}]interface{}{
mappings.KUBERNETES_RESOURCE_FIELD_NAME: map[interface{}]interface{}{
mappings.KUBERNETES_CONTAINER_NAME: "container name",
mappings.KUBERNETES_LABELS_FIELD_NAME: map[interface{}]interface{}{
mappings.KUBERNETES_LABELS_NAME: -123,
},
},
},
Expected: "-123",
},
{
Test: "Container Name",
Input: map[interface{}]interface{}{
mappings.KUBERNETES_RESOURCE_FIELD_NAME: map[interface{}]interface{}{
mappings.KUBERNETES_CONTAINER_NAME: "container name",
},
},
Expected: "container name",
},
{
Test: "Container Name Non String",
Input: map[interface{}]interface{}{
mappings.KUBERNETES_RESOURCE_FIELD_NAME: map[interface{}]interface{}{
mappings.KUBERNETES_CONTAINER_NAME: -123,
},
},
Expected: "-123",
},
}

for _, tc := range testCases {
t.Run(tc.Test, func(t *testing.T) {
actual := inferServiceName(tc.Input)

assert.Equal(t, tc.Expected, actual)
})
}
}

0 comments on commit 3ac0bc4

Please sign in to comment.