Skip to content

Commit

Permalink
Merge pull request #68 from fluent/cosmo0920-handle-fluent-bit-v2-eve…
Browse files Browse the repository at this point in the history
…nt-format

output: Handle Fluent Bit V2 metadata format
  • Loading branch information
niedbalski authored Jul 31, 2023
2 parents b93d969 + 4c65c5a commit a7a013e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 1 deletion.
17 changes: 16 additions & 1 deletion output/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,22 @@ func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]in
return -2, 0, nil
}

t := slice.Index(0).Interface()
var t interface{}
ts = slice.Index(0).Interface()
switch ty := ts.(type) {
case FLBTime:
t = ty
case uint64:
t = ty
case []interface{}: // for Fluent Bit V2 metadata type of format
s := reflect.ValueOf(ty)
if s.Kind() != reflect.Slice || s.Len() < 2 {
return -4, 0, nil
}
t = s.Index(0).Interface()
default:
return -5, 0, nil
}
data := slice.Index(1)

map_data, ok := data.Interface().(map[interface{}]interface{})
Expand Down
44 changes: 44 additions & 0 deletions output/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ var dummyRecord [29]byte = [29]byte{0x92, /* fix array 2 */
0x01, /* fix int 1 */
}

// dummyV2Record should be byte Array, not Slice to be able to Cast c array.
var dummyV2Record [39]byte = [39]byte{0xdd, /* array 32 */ 0x00, 0x00, 0x00,
0x02, /* count of array elements */
0xdd, /* array 32 */ 0x00, 0x00, 0x00,
0x02, /* count of array elements */
0xd7, 0x00, 0x64, 0xbe, 0x0e, 0xeb, 0x16, 0x36, 0xe1, 0x28, 0x80, /* 2023/07/24 14:40:59 */
0x82, /* fix map 2 */
0xa7, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, /* fix str 7 "compact" */
0xc3, /* true */
0xa6, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, /* fix str 6 "schema" */
0x01, /* fix int 1 */
}

func TestGetRecord(t *testing.T) {
dec := NewDecoder(unsafe.Pointer(&dummyRecord), len(dummyRecord))
if dec == nil {
Expand Down Expand Up @@ -63,3 +76,34 @@ func TestGetRecord(t *testing.T) {
t.Errorf(`record["schema"] is not 1 %d`, v)
}
}

func TestGetV2Record(t *testing.T) {
dec := NewDecoder(unsafe.Pointer(&dummyV2Record), len(dummyV2Record))
if dec == nil {
t.Fatal("dec is nil")
}

ret, timestamp, record := GetRecord(dec)
if ret < 0 {
t.Fatalf("ret is negative: code %v", ret)
}

// test timestamp
ts, ok := timestamp.(FLBTime)
if !ok {
t.Fatalf("cast error. Type is %s", reflect.TypeOf(timestamp))
}

if ts.Unix() != int64(0x64be0eeb) {
t.Errorf("ts.Unix() error. given %d", ts.Unix())
}

// test record
v, ok := record["schema"].(int64)
if !ok {
t.Fatalf("cast error. Type is %s", reflect.TypeOf(record["schema"]))
}
if v != 1 {
t.Errorf(`record["schema"] is not 1 %d`, v)
}
}

0 comments on commit a7a013e

Please sign in to comment.