Skip to content

Commit

Permalink
Add support for writing each batch of records as a CLP IR file; Repla…
Browse files Browse the repository at this point in the history
…ce Fluent Bit Go MessagePack decoder with our own implementation. (#3)
  • Loading branch information
davemarco authored Jun 25, 2024
1 parent 6ccee05 commit d79e399
Show file tree
Hide file tree
Showing 14 changed files with 566 additions and 198 deletions.
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ output:
issues:
max-issues-per-linter: 0
max-same-issues: 0
exclude-rules:
- linters:
- revive
source: "^\\s*// (?:\\[.+\\]: )?https?://.+"

linters:
disable-all: true
Expand Down
74 changes: 0 additions & 74 deletions config/config.go

This file was deleted.

10 changes: 8 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ module github.com/y-scope/fluent-bit-clp

go 1.22.3

require github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c
require (
github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c
github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb
)

require github.com/ugorji/go/codec v1.1.7 // indirect
require (
github.com/klauspost/compress v1.16.5
github.com/ugorji/go/codec v1.1.7
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c h1:yKN46XJHYC/gvgH2UsisJ31+n4K3S7QYZSfU2uAWjuI=
github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c/go.mod h1:L92h+dgwElEyUuShEwjbiHjseW410WIcNz+Bjutc8YQ=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb h1:MAuKBGpfQIIl63810kEYZxUv8tfpI9y0nZlyi7tS8A8=
github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb/go.mod h1:EkeM7lP5AWNRcmBWt3MvjAkRx7RT0gzisW4sh+SJYUw=
99 changes: 99 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Package implements loading of Fluent Bit configuration file. Configuration is accessible by
// output plugin and stored by Fluent Bit engine.

package config

import (
"errors"
"fmt"
"log"
"strconv"
"unsafe"

"github.com/fluent/fluent-bit-go/output"
)

// Holds settings for S3 CLP plugin from user defined Fluent Bit configuration file.
type S3Config struct {
Id string
Path string
File string
UseSingleKey bool
AllowMissingKey bool
SingleKey string
TimeZone string
}

// Generates configuration struct containing user-defined settings.
//
// Parameters:
// - plugin: Fluent Bit plugin reference
//
// Returns:
// - S3Config: Configuration based on fluent-bit.conf
// - err: All errors in config wrapped
func NewS3(plugin unsafe.Pointer) (S3Config, error) {
// TODO: Redo validation to simplify configuration error reporting.
// https://pkg.go.dev/github.com/go-playground/validator/v10

// Slice holds config errors allowing function to return all errors at once instead of
// one at a time. User can fix all errors at once.
configErrors := []error{}

var err error
var config S3Config
config.Id, err = getValueFLBConfig(plugin, "id")
configErrors = append(configErrors, err)

config.Path, err = getValueFLBConfig(plugin, "path")
configErrors = append(configErrors, err)

config.File, err = getValueFLBConfig(plugin, "file")
configErrors = append(configErrors, err)

var UseSingleKey string
UseSingleKey, err = getValueFLBConfig(plugin, "use_single_key")
configErrors = append(configErrors, err)

// Type conversion to bool.
config.UseSingleKey, err = strconv.ParseBool(UseSingleKey)
configErrors = append(configErrors, err)

var AllowMissingKey string
AllowMissingKey, err = getValueFLBConfig(plugin, "allow_missing_key")
configErrors = append(configErrors, err)

// Type conversion to bool.
config.AllowMissingKey, err = strconv.ParseBool(AllowMissingKey)
configErrors = append(configErrors, err)

// Allow nil, so no need to check error.
config.SingleKey, _ = getValueFLBConfig(plugin, "single_key")

config.TimeZone, err = getValueFLBConfig(plugin, "time_zone")
configErrors = append(configErrors, err)

// Wrap all errors into one error before returning. Automically excludes nil errors.
err = errors.Join(configErrors...)
return config, err
}

// Retrieves individual values from fluent-bit.conf.
//
// Parameters:
// - plugin: Fluent Bit plugin reference
// - configKey: Key from fluent-bit.conf
//
// Returns:
// - configValue
// - err: Error if config value is blank
func getValueFLBConfig(plugin unsafe.Pointer, configKey string) (string, error) {
configValue := output.FLBPluginConfigKey(plugin, configKey)

if configValue == "" {
err := fmt.Errorf("%s is not defined in fluent-bit configuration", configKey)
return configValue, err
}
log.Printf("fluent-bit config key %s set to value %s", configKey, configValue)
return configValue, nil
}
4 changes: 0 additions & 4 deletions internal/constant/constant.go

This file was deleted.

154 changes: 154 additions & 0 deletions internal/decoder/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Package implements Msgpack decoder. Fluent Bit Go already has a Msgpack decoder; however, it
// will decode strings as []int8. This has two undesirable consequences.
//
// 1. Printing values with %v may output non-human readable arrays.
//
// 2. Strings in []int8 format marshalled to JSON will output non-human readable base64 encoded
// strings.
//
// To solve these issues, all other plugins such as the [aws firehose plugin], have recursive
// functions which comb through decoded Msgpack structures and convert bytes to strings (effectively
// another decoder). Creating a new decoder to output strings instead of bytes is cleaner,
// removes complex recursive functions, and likely more performant.
//
// [aws firehose plugin]: https://github.com/aws/amazon-kinesis-firehose-for-fluent-bit/blob/dcbe1a0191abd6242182af55547ccf99ee650ce9/plugins/plugins.go#L153
package decoder

import (
"C"
"encoding/binary"
"encoding/json"
"fmt"
"reflect"
"time"
"unsafe"

"github.com/ugorji/go/codec"
)

// Initializes a Msgpack decoder which automatically converts bytes to strings. Decoder has an
// extension setup for a custom Fluent Bit [timestamp format]. During [timestamp encoding],
// Fluent Bit will set the [Msgpack extension type] to "0". This decoder can recognize the
// extension type, and will then decode the custom Fluent Bit timestamp using a specific function
// [ReadExt].
//
// Parameters:
// - data: Msgpack data
// - length: Byte length
//
// Returns:
// - decoder: Msgpack decoder
//
// [timestamp format]: https://github.com/fluent/fluent-bit-docs/blob/master/development/msgpack-format.md#fluent-bit-usage
// [timestamp encoding]: https://github.com/fluent/fluent-bit/blob/2138cee8f4878733956d42d82f6dcf95f0aa9339/src/flb_time.c#L237
// [Msgpack extension type]: https://github.com/msgpack/msgpack/blob/master/spec.md#extension-types
func New(data unsafe.Pointer, length int) *codec.Decoder {
var b []byte
var mh codec.MsgpackHandle

// Decoder settings for string conversion and error handling.
mh.RawToString = true
mh.WriteExt = true
mh.ErrorIfNoArrayExpand = true

// Set up custom extension for Fluent Bit timestamp format.
mh.SetBytesExt(reflect.TypeOf(FlbTime{}), 0, &FlbTime{})

b = C.GoBytes(data, C.int(length))
decoder := codec.NewDecoderBytes(b, &mh)
return decoder
}

// Fluent-bit can encode timestamps in Msgpack [fixext 8] format. Format stores an integer and a
// byte array whose length is 8 bytes. The integer is the type, and the 4 MSBs are the seconds
// (big-endian uint32) and 4 LSBs are nanoseconds.
// [fixext 8]: https://github.com/msgpack/msgpack/blob/master/spec.md#ext-format-family
type FlbTime struct {
time.Time
}

// Updates a value from a []byte.
//
// Parameters:
// - i: Pointer to the registered extension type
// - b: Msgpack data in fixext 8 format
func (f FlbTime) ReadExt(i interface{}, b []byte) {
// Note that ts refers to the same object since i is a pointer.
ts := i.(*FlbTime)
sec := binary.BigEndian.Uint32(b)
nsec := binary.BigEndian.Uint32(b[4:])
ts.Time = time.Unix(int64(sec), int64(nsec))
}

// Function required by codec but not being used by decoder.
func (f FlbTime) WriteExt(interface{}) []byte {
panic("unsupported")
}

// Function required by codec but not being used by decoder.
func (f FlbTime) ConvertExt(v interface{}) interface{} {
return nil
}

// Function required by codec but not being used by decoder.
func (f FlbTime) UpdateExt(dest interface{}, v interface{}) {
panic("unsupported")
}

// Retrieves data and timestamp from Msgpack object.
//
// Parameters:
// - decoder: Msgpack decoder
//
// Returns:
// - timestamp: Timestamp retrieved from Fluent Bit
// - record: JSON record from Fluent Bit with variable amount of keys
// - err: decode error, error retrieving timestamp, error marshalling record
func GetRecord(decoder *codec.Decoder) (interface{}, []byte, error) {
// Expect array of length 2 for timestamp and data. Also intialize expected types for
// timestamp and record
m := [2]interface{}{nil, make(map[string]interface{})}

err := decoder.Decode(&m)
if err != nil {
// io.EOF errors signify chunk is empty. They should be caught and trigger end of decoding.
// Other decoding errors are not expected in normal operation of plugin.
return nil, nil, err
}

// Timestamp is located in first index.
t := m[0]
var timestamp interface{}

// Fluent Bit can provide timestamp in multiple formats, so we use type switch to process
// correctly.
switch v := t.(type) {
// For earlier format [TIMESTAMP, MESSAGE].
case FlbTime:
timestamp = v
case uint64:
timestamp = v
// For fluent-bit V2 metadata type of format [[TIMESTAMP, METADATA], MESSAGE].
case []interface{}:
if len(v) < 2 {
err = fmt.Errorf("error decoding timestamp %v from stream", v)
return nil, nil, err
}
timestamp = v[0]
default:
err = fmt.Errorf("error decoding timestamp %v from stream", v)
return nil, nil, err
}

// Record is located in second index.
record := m[1]

// Marshall record to json.
jsonRecord, err := json.Marshal(record)
if err != nil {
err = fmt.Errorf("failed to marshal record %v: %w", record, err)
return nil, nil, err
}

return timestamp, jsonRecord, nil
}
8 changes: 4 additions & 4 deletions plugins/out_clp_s3/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Builds plugin binary in go container and then runs in fluent-bit container.
# Builds plugin binary in go container and then runs in Fluent Bit container.

# Using bullseye tag to match debian version from fluent-bit image [fluent-bit Debian version].
# Using bullseye tag to match debian version from Fluent Bit image [Fluent Bit Debian version].
# Matching debian versions prevents glibc compatibility issues.
# [fluent-bit Debian version]: https://github.com/fluent/fluent-bit/blob/master/dockerfiles/Dockerfile
# [Fluent Bit Debian version]: https://github.com/fluent/fluent-bit/blob/master/dockerfiles/Dockerfile
FROM golang:1.22.3-bullseye as builder

# install task
Expand All @@ -23,7 +23,7 @@ RUN task build

FROM fluent/fluent-bit:3.0.6

# Copy plugin binary to fluent-bit image.
# Copy plugin binary to Fluent Bit image.
COPY --from=builder /root/plugins/out_clp_s3/out_clp_s3.so /fluent-bit/bin/
COPY --from=builder /root/plugins/out_clp_s3/*.conf /fluent-bit/etc/

Expand Down
Loading

0 comments on commit d79e399

Please sign in to comment.