From 149565b4a060bc37699018a5ac2d5634adbb8a89 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 18 Jul 2023 15:53:44 +0900 Subject: [PATCH 1/2] output: Handle Fluent Bit V2 metadata format Signed-off-by: Hiroshi Hatake --- output/decoder.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/output/decoder.go b/output/decoder.go index 0242990..ffdf5b3 100644 --- a/output/decoder.go +++ b/output/decoder.go @@ -82,7 +82,22 @@ func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]in return -2, 0, nil } - t := slice.Index(0).Interface() + var t interface{} + ts = slice.Index(0).Interface() + switch ty := ts.(type) { + case FLBTime: + t = ty + case uint64: + t = ty + case []interface{}: // for Fluent Bit V2 metadata type of format + s := reflect.ValueOf(ty) + if s.Kind() != reflect.Slice || s.Len() < 2 { + return -4, 0, nil + } + t = s.Index(0).Interface() + default: + return -5, 0, nil + } data := slice.Index(1) map_data, ok := data.Interface().(map[interface{}]interface{}) From 4c65c5a7d30cdec51c9bd1a0e15601fa8cb1b73b Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 24 Jul 2023 14:58:18 +0900 Subject: [PATCH 2/2] decoder: Add a decoder test case for Fluent Bit V2 format Signed-off-by: Hiroshi Hatake --- output/decoder_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/output/decoder_test.go b/output/decoder_test.go index 53e3cb8..24d0f93 100644 --- a/output/decoder_test.go +++ b/output/decoder_test.go @@ -33,6 +33,19 @@ var dummyRecord [29]byte = [29]byte{0x92, /* fix array 2 */ 0x01, /* fix int 1 */ } +// dummyV2Record should be byte Array, not Slice to be able to Cast c array. +var dummyV2Record [39]byte = [39]byte{0xdd, /* array 32 */ 0x00, 0x00, 0x00, + 0x02, /* count of array elements */ + 0xdd, /* array 32 */ 0x00, 0x00, 0x00, + 0x02, /* count of array elements */ + 0xd7, 0x00, 0x64, 0xbe, 0x0e, 0xeb, 0x16, 0x36, 0xe1, 0x28, 0x80, /* 2023/07/24 14:40:59 */ + 0x82, /* fix map 2 */ + 0xa7, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, /* fix str 7 "compact" */ + 0xc3, /* true */ + 0xa6, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, /* fix str 6 "schema" */ + 0x01, /* fix int 1 */ +} + func TestGetRecord(t *testing.T) { dec := NewDecoder(unsafe.Pointer(&dummyRecord), len(dummyRecord)) if dec == nil { @@ -63,3 +76,34 @@ func TestGetRecord(t *testing.T) { t.Errorf(`record["schema"] is not 1 %d`, v) } } + +func TestGetV2Record(t *testing.T) { + dec := NewDecoder(unsafe.Pointer(&dummyV2Record), len(dummyV2Record)) + if dec == nil { + t.Fatal("dec is nil") + } + + ret, timestamp, record := GetRecord(dec) + if ret < 0 { + t.Fatalf("ret is negative: code %v", ret) + } + + // test timestamp + ts, ok := timestamp.(FLBTime) + if !ok { + t.Fatalf("cast error. Type is %s", reflect.TypeOf(timestamp)) + } + + if ts.Unix() != int64(0x64be0eeb) { + t.Errorf("ts.Unix() error. given %d", ts.Unix()) + } + + // test record + v, ok := record["schema"].(int64) + if !ok { + t.Fatalf("cast error. Type is %s", reflect.TypeOf(record["schema"])) + } + if v != 1 { + t.Errorf(`record["schema"] is not 1 %d`, v) + } +}