From d79e399fb1365e874095bad43620f9c25991d738 Mon Sep 17 00:00:00 2001 From: davemarco <83603688+davemarco@users.noreply.github.com> Date: Tue, 25 Jun 2024 17:12:53 -0400 Subject: [PATCH] Add support for writing each batch of records as a CLP IR file; Replace Fluent Bit Go MessagePack decoder with our own implementation. (#3) --- .golangci.yml | 4 + config/config.go | 74 -------- go.mod | 10 +- go.sum | 4 + internal/config/config.go | 99 +++++++++++ internal/constant/constant.go | 4 - internal/decoder/decoder.go | 154 ++++++++++++++++ plugins/out_clp_s3/Dockerfile | 8 +- plugins/out_clp_s3/README.md | 49 +++-- plugins/out_clp_s3/Taskfile.yml | 2 +- plugins/out_clp_s3/fluent-bit.conf | 14 +- plugins/out_clp_s3/flush/flush.go | 246 ++++++++++++++++++++------ plugins/out_clp_s3/out_clp_s3.go | 89 +++++----- plugins/out_clp_s3/plugin-config.conf | 7 +- 14 files changed, 566 insertions(+), 198 deletions(-) delete mode 100644 config/config.go create mode 100644 internal/config/config.go delete mode 100644 internal/constant/constant.go create mode 100644 internal/decoder/decoder.go diff --git a/.golangci.yml b/.golangci.yml index 733b75b..a353528 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -4,6 +4,10 @@ output: issues: max-issues-per-linter: 0 max-same-issues: 0 + exclude-rules: + - linters: + - revive + source: "^\\s*// (?:\\[.+\\]: )?https?://.+" linters: disable-all: true diff --git a/config/config.go b/config/config.go deleted file mode 100644 index 034e9ee..0000000 --- a/config/config.go +++ /dev/null @@ -1,74 +0,0 @@ -// Package implements loading of fluent-bit configuration file. Allows plugin to access values -// defined in configuration file. - -package config - -import ( - "errors" - "fmt" - "log" - "unsafe" - - "github.com/fluent/fluent-bit-go/output" -) - -// Holds settings for s3 clp plugin from user defined fluent-bit configuration file. -type S3Config struct { - Id string - Path string - File string -} - -// Generates configuration struct containing user-defined settings. -// -// Parameters: -// - plugin: fluent-bit plugin reference -// -// Returns: -// - S3Config: Configuration based on fluent-bit.conf -// - err: All errors in config wrapped -func S3New(plugin unsafe.Pointer) (*S3Config, error) { - - // Slice holds config errors allowing function to return all errors at once instead of - // one at a time. User can fix all errors at once. - configErrors := []error{} - - id, errID := getValueFLBConfig(plugin, "Id") - configErrors = append(configErrors, errID) - - path, errPath := getValueFLBConfig(plugin, "Path") - configErrors = append(configErrors, errPath) - - file, errFile := getValueFLBConfig(plugin, "File") - configErrors = append(configErrors, errFile) - - config := &S3Config{ - Id: id, - Path: path, - File: file, - } - - // Wrap all errors into one error before returning. Automically excludes nil errors. - err := errors.Join(configErrors...) - return config, err -} - -// Retrieves individuals values from fluent-bit.conf. -// -// Parameters: -// - plugin: fluent-bit plugin reference -// - configKey: Key from fluent-bit.conf -// -// Returns: -// - configValue -// - err: Error if config value is blank -func getValueFLBConfig(plugin unsafe.Pointer, configKey string) (string, error) { - configValue := output.FLBPluginConfigKey(plugin, configKey) - - if configValue == "" { - err := fmt.Errorf("%s is not defined in fluent-bit configuration", configKey) - return configValue, err - } - log.Printf("fluent-bit config key %s set to value %s", configKey, configValue) - return configValue, nil -} diff --git a/go.mod b/go.mod index 8a4140b..c0a9aed 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,12 @@ module github.com/y-scope/fluent-bit-clp go 1.22.3 -require github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c +require ( + github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c + github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb +) -require github.com/ugorji/go/codec v1.1.7 // indirect +require ( + github.com/klauspost/compress v1.16.5 + github.com/ugorji/go/codec v1.1.7 +) diff --git a/go.sum b/go.sum index 523259f..3ac5030 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,9 @@ github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c h1:yKN46XJHYC/gvgH2UsisJ31+n4K3S7QYZSfU2uAWjuI= github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c/go.mod h1:L92h+dgwElEyUuShEwjbiHjseW410WIcNz+Bjutc8YQ= +github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= +github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb h1:MAuKBGpfQIIl63810kEYZxUv8tfpI9y0nZlyi7tS8A8= +github.com/y-scope/clp-ffi-go v0.0.3-0.20240604153926-969c1151cfcb/go.mod h1:EkeM7lP5AWNRcmBWt3MvjAkRx7RT0gzisW4sh+SJYUw= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..c0b4424 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,99 @@ +// Package implements loading of Fluent Bit configuration file. Configuration is accessible by +// output plugin and stored by Fluent Bit engine. + +package config + +import ( + "errors" + "fmt" + "log" + "strconv" + "unsafe" + + "github.com/fluent/fluent-bit-go/output" +) + +// Holds settings for S3 CLP plugin from user defined Fluent Bit configuration file. +type S3Config struct { + Id string + Path string + File string + UseSingleKey bool + AllowMissingKey bool + SingleKey string + TimeZone string +} + +// Generates configuration struct containing user-defined settings. +// +// Parameters: +// - plugin: Fluent Bit plugin reference +// +// Returns: +// - S3Config: Configuration based on fluent-bit.conf +// - err: All errors in config wrapped +func NewS3(plugin unsafe.Pointer) (S3Config, error) { + // TODO: Redo validation to simplify configuration error reporting. + // https://pkg.go.dev/github.com/go-playground/validator/v10 + + // Slice holds config errors allowing function to return all errors at once instead of + // one at a time. User can fix all errors at once. + configErrors := []error{} + + var err error + var config S3Config + config.Id, err = getValueFLBConfig(plugin, "id") + configErrors = append(configErrors, err) + + config.Path, err = getValueFLBConfig(plugin, "path") + configErrors = append(configErrors, err) + + config.File, err = getValueFLBConfig(plugin, "file") + configErrors = append(configErrors, err) + + var UseSingleKey string + UseSingleKey, err = getValueFLBConfig(plugin, "use_single_key") + configErrors = append(configErrors, err) + + // Type conversion to bool. + config.UseSingleKey, err = strconv.ParseBool(UseSingleKey) + configErrors = append(configErrors, err) + + var AllowMissingKey string + AllowMissingKey, err = getValueFLBConfig(plugin, "allow_missing_key") + configErrors = append(configErrors, err) + + // Type conversion to bool. + config.AllowMissingKey, err = strconv.ParseBool(AllowMissingKey) + configErrors = append(configErrors, err) + + // Allow nil, so no need to check error. + config.SingleKey, _ = getValueFLBConfig(plugin, "single_key") + + config.TimeZone, err = getValueFLBConfig(plugin, "time_zone") + configErrors = append(configErrors, err) + + // Wrap all errors into one error before returning. Automically excludes nil errors. + err = errors.Join(configErrors...) + return config, err +} + +// Retrieves individual values from fluent-bit.conf. +// +// Parameters: +// - plugin: Fluent Bit plugin reference +// - configKey: Key from fluent-bit.conf +// +// Returns: +// - configValue +// - err: Error if config value is blank +func getValueFLBConfig(plugin unsafe.Pointer, configKey string) (string, error) { + configValue := output.FLBPluginConfigKey(plugin, configKey) + + if configValue == "" { + err := fmt.Errorf("%s is not defined in fluent-bit configuration", configKey) + return configValue, err + } + log.Printf("fluent-bit config key %s set to value %s", configKey, configValue) + return configValue, nil +} diff --git a/internal/constant/constant.go b/internal/constant/constant.go deleted file mode 100644 index 5bdc265..0000000 --- a/internal/constant/constant.go +++ /dev/null @@ -1,4 +0,0 @@ -// Package contains constants. -package constant - -const S3PluginName = "out_clp_s3" diff --git a/internal/decoder/decoder.go b/internal/decoder/decoder.go new file mode 100644 index 0000000..2092318 --- /dev/null +++ b/internal/decoder/decoder.go @@ -0,0 +1,154 @@ +// Package implements Msgpack decoder. Fluent Bit Go already has a Msgpack decoder; however, it +// will decode strings as []int8. This has two undesirable consequences. +// +// 1. Printing values with %v may output non-human readable arrays. +// +// 2. Strings in []int8 format marshalled to JSON will output non-human readable base64 encoded +// strings. +// +// To solve these issues, all other plugins such as the [aws firehose plugin], have recursive +// functions which comb through decoded Msgpack structures and convert bytes to strings (effectively +// another decoder). Creating a new decoder to output strings instead of bytes is cleaner, +// removes complex recursive functions, and likely more performant. +// +// [aws firehose plugin]: https://github.com/aws/amazon-kinesis-firehose-for-fluent-bit/blob/dcbe1a0191abd6242182af55547ccf99ee650ce9/plugins/plugins.go#L153 +package decoder + +import ( + "C" + "encoding/binary" + "encoding/json" + "fmt" + "reflect" + "time" + "unsafe" + + "github.com/ugorji/go/codec" +) + +// Initializes a Msgpack decoder which automatically converts bytes to strings. Decoder has an +// extension setup for a custom Fluent Bit [timestamp format]. During [timestamp encoding], +// Fluent Bit will set the [Msgpack extension type] to "0". This decoder can recognize the +// extension type, and will then decode the custom Fluent Bit timestamp using a specific function +// [ReadExt]. +// +// Parameters: +// - data: Msgpack data +// - length: Byte length +// +// Returns: +// - decoder: Msgpack decoder +// +// [timestamp format]: https://github.com/fluent/fluent-bit-docs/blob/master/development/msgpack-format.md#fluent-bit-usage +// [timestamp encoding]: https://github.com/fluent/fluent-bit/blob/2138cee8f4878733956d42d82f6dcf95f0aa9339/src/flb_time.c#L237 +// [Msgpack extension type]: https://github.com/msgpack/msgpack/blob/master/spec.md#extension-types +func New(data unsafe.Pointer, length int) *codec.Decoder { + var b []byte + var mh codec.MsgpackHandle + + // Decoder settings for string conversion and error handling. + mh.RawToString = true + mh.WriteExt = true + mh.ErrorIfNoArrayExpand = true + + // Set up custom extension for Fluent Bit timestamp format. + mh.SetBytesExt(reflect.TypeOf(FlbTime{}), 0, &FlbTime{}) + + b = C.GoBytes(data, C.int(length)) + decoder := codec.NewDecoderBytes(b, &mh) + return decoder +} + +// Fluent-bit can encode timestamps in Msgpack [fixext 8] format. Format stores an integer and a +// byte array whose length is 8 bytes. The integer is the type, and the 4 MSBs are the seconds +// (big-endian uint32) and 4 LSBs are nanoseconds. +// [fixext 8]: https://github.com/msgpack/msgpack/blob/master/spec.md#ext-format-family +type FlbTime struct { + time.Time +} + +// Updates a value from a []byte. +// +// Parameters: +// - i: Pointer to the registered extension type +// - b: Msgpack data in fixext 8 format +func (f FlbTime) ReadExt(i interface{}, b []byte) { + // Note that ts refers to the same object since i is a pointer. + ts := i.(*FlbTime) + sec := binary.BigEndian.Uint32(b) + nsec := binary.BigEndian.Uint32(b[4:]) + ts.Time = time.Unix(int64(sec), int64(nsec)) +} + +// Function required by codec but not being used by decoder. +func (f FlbTime) WriteExt(interface{}) []byte { + panic("unsupported") +} + +// Function required by codec but not being used by decoder. +func (f FlbTime) ConvertExt(v interface{}) interface{} { + return nil +} + +// Function required by codec but not being used by decoder. +func (f FlbTime) UpdateExt(dest interface{}, v interface{}) { + panic("unsupported") +} + +// Retrieves data and timestamp from Msgpack object. +// +// Parameters: +// - decoder: Msgpack decoder +// +// Returns: +// - timestamp: Timestamp retrieved from Fluent Bit +// - record: JSON record from Fluent Bit with variable amount of keys +// - err: decode error, error retrieving timestamp, error marshalling record +func GetRecord(decoder *codec.Decoder) (interface{}, []byte, error) { + // Expect array of length 2 for timestamp and data. Also intialize expected types for + // timestamp and record + m := [2]interface{}{nil, make(map[string]interface{})} + + err := decoder.Decode(&m) + if err != nil { + // io.EOF errors signify chunk is empty. They should be caught and trigger end of decoding. + // Other decoding errors are not expected in normal operation of plugin. + return nil, nil, err + } + + // Timestamp is located in first index. + t := m[0] + var timestamp interface{} + + // Fluent Bit can provide timestamp in multiple formats, so we use type switch to process + // correctly. + switch v := t.(type) { + // For earlier format [TIMESTAMP, MESSAGE]. + case FlbTime: + timestamp = v + case uint64: + timestamp = v + // For fluent-bit V2 metadata type of format [[TIMESTAMP, METADATA], MESSAGE]. + case []interface{}: + if len(v) < 2 { + err = fmt.Errorf("error decoding timestamp %v from stream", v) + return nil, nil, err + } + timestamp = v[0] + default: + err = fmt.Errorf("error decoding timestamp %v from stream", v) + return nil, nil, err + } + + // Record is located in second index. + record := m[1] + + // Marshall record to json. + jsonRecord, err := json.Marshal(record) + if err != nil { + err = fmt.Errorf("failed to marshal record %v: %w", record, err) + return nil, nil, err + } + + return timestamp, jsonRecord, nil +} diff --git a/plugins/out_clp_s3/Dockerfile b/plugins/out_clp_s3/Dockerfile index 90c50f8..914916e 100644 --- a/plugins/out_clp_s3/Dockerfile +++ b/plugins/out_clp_s3/Dockerfile @@ -1,8 +1,8 @@ -# Builds plugin binary in go container and then runs in fluent-bit container. +# Builds plugin binary in go container and then runs in Fluent Bit container. -# Using bullseye tag to match debian version from fluent-bit image [fluent-bit Debian version]. +# Using bullseye tag to match debian version from Fluent Bit image [Fluent Bit Debian version]. # Matching debian versions prevents glibc compatibility issues. -# [fluent-bit Debian version]: https://github.com/fluent/fluent-bit/blob/master/dockerfiles/Dockerfile +# [Fluent Bit Debian version]: https://github.com/fluent/fluent-bit/blob/master/dockerfiles/Dockerfile FROM golang:1.22.3-bullseye as builder # install task @@ -23,7 +23,7 @@ RUN task build FROM fluent/fluent-bit:3.0.6 -# Copy plugin binary to fluent-bit image. +# Copy plugin binary to Fluent Bit image. COPY --from=builder /root/plugins/out_clp_s3/out_clp_s3.so /fluent-bit/bin/ COPY --from=builder /root/plugins/out_clp_s3/*.conf /fluent-bit/etc/ diff --git a/plugins/out_clp_s3/README.md b/plugins/out_clp_s3/README.md index f98181f..6df3390 100644 --- a/plugins/out_clp_s3/README.md +++ b/plugins/out_clp_s3/README.md @@ -1,6 +1,6 @@ # S3 CLP output plugin -Output plugin for fluent-bit that sends records in CLP IR format to AWS S3. +Output plugin for Fluent Bit that sends records in CLP IR format to AWS S3. ### Getting Started @@ -36,18 +36,45 @@ Change [plugin-config.conf](plugin-config.conf) to reference the plugin binary Path //out_clp_s3.so ``` -Change [fluent-bit.conf](fluent-bit.conf) to specify -- Id (output name), -- Path (output directory) -- File (output file name) +Change [fluent-bit.conf](fluent-bit.conf) to suit your needs. +See [Plugin configuration](#plugin-configuration) for description of fields. +Note changing configuration files may break docker setup, so best to copy them first. +Run Fluent Bit ```shell - [OUTPUT] - name out_clp_s3 - Id - Path - File - match * + fluent-bit -c fluent-bit-custom.conf + ``` + +### Plugin configuration + +The following options must be configured in [fluent-bit.conf](fluent-bit.conf) +- `id`: Name of output +- `path`: Directory for output +- `file`: File name prefix. Plugin will generate many files and append a timestamp. +- `use_single_key`: Output the value corresponding to this key, instead of the whole Fluent Bit +record. It is recommended to set this to true. A Fluent Bit record is a JSON-like object, and while +CLP can parse JSON into IR it is not recommended. Key is set with `single_key` and +will typically be set to "log", the default Fluent Bit key for unparsed logs. If this is set to false, +plugin will parse the record as JSON. +- `allow_missing_key`: Fallback to whole record if key is missing from log. If set to false, an error will +be recorded instead. +- `single_key`: Value for single key +- `time_zone`: Time zone of the source producing the log events, so that local times (any time +that is not a unix timestamp) are handled correctly. + +See below for an example: + + ``` +[OUTPUT] + name out_clp_s3 + id dummy_metrics + path ./ + file dummy + use_single_key true + allow_missing_key true + single_key log + time_zone America/Toronto + match * ``` [1]: https://go.dev/doc/install diff --git a/plugins/out_clp_s3/Taskfile.yml b/plugins/out_clp_s3/Taskfile.yml index 341e651..7d3dc03 100644 --- a/plugins/out_clp_s3/Taskfile.yml +++ b/plugins/out_clp_s3/Taskfile.yml @@ -5,7 +5,7 @@ tasks: cmds: - go build -buildmode=c-shared -o out_clp_s3.so sources: - - ./**/*.go + - ../../**/*.go generates: - out_clp_s3.h - out_clp_s3.go diff --git a/plugins/out_clp_s3/fluent-bit.conf b/plugins/out_clp_s3/fluent-bit.conf index a46a841..79df6c4 100644 --- a/plugins/out_clp_s3/fluent-bit.conf +++ b/plugins/out_clp_s3/fluent-bit.conf @@ -1,4 +1,4 @@ -#Sample fluent-bit configuration with output set to clp s3 plugin. +#Sample Fluent Bit configuration with output set to CLP s3 plugin. [SERVICE] # Flush @@ -46,8 +46,12 @@ [OUTPUT] name out_clp_s3 - Id dummy_metrics - Path /root/fluent-bit/logs - #Path ./ - File dummy.txt + id dummy_metrics + path /root/fluent-bit/logs + #path ./ + file dummy + use_single_key true + allow_missing_key true + single_key log + time_zone America/Toronto match * diff --git a/plugins/out_clp_s3/flush/flush.go b/plugins/out_clp_s3/flush/flush.go index f3682d7..1b842e8 100644 --- a/plugins/out_clp_s3/flush/flush.go +++ b/plugins/out_clp_s3/flush/flush.go @@ -1,88 +1,230 @@ -// Package implements methods to send data to output. All data provided by fluent-bit is encoded -// with msgpack. +// Package implements methods to send data to output. All data provided by Fluent Bit is encoded +// with Msgpack. package flush import ( "C" - "bufio" + "encoding/json" "fmt" + "io" + "log" "os" "path/filepath" "time" "unsafe" "github.com/fluent/fluent-bit-go/output" + "github.com/klauspost/compress/zstd" + "github.com/y-scope/clp-ffi-go/ffi" + "github.com/y-scope/clp-ffi-go/ir" - "github.com/y-scope/fluent-bit-clp/config" + "github.com/y-scope/fluent-bit-clp/internal/config" + "github.com/y-scope/fluent-bit-clp/internal/decoder" ) -// Flushes data to file. +// Flushes data to a file in IR format. Decode of Msgpack based on [Fluent Bit reference]. // // Parameters: -// - data: msgpack data +// - data: Msgpack data // - length: Byte length -// - tag: fluent-bit tag -// - S3Config: Configuration based on fluent-bit.conf +// - tag: Fluent Bit tag +// - S3Config: Plugin configuration // // Returns: +// - code: Fluent Bit success code (OK, RETRY, ERROR) // - err: Error if flush fails -func File(data unsafe.Pointer, length int, tag string, config *config.S3Config) error { - fullFilePath := filepath.Join(config.Path, config.File) +// +// [Fluent Bit reference]: https://github.com/fluent/fluent-bit-go/blob/a7a013e2473cdf62d7320822658d5816b3063758/examples/out_multiinstance/out.go#L41 +func ToFile(data unsafe.Pointer, length int, tag string, config *config.S3Config) (int, error) { + // Buffer to store events from Fluent Bit chunk. + var logEvents []ffi.LogEvent - // If the file doesn't exist, create it, or append to the file. Will still cause error if there - // is no directory - f, err := os.OpenFile(fullFilePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o644) - if err != nil { - return err + dec := decoder.New(data, length) + + // Loop through all records in Fluent Bit chunk. + for { + ts, record, err := decoder.GetRecord(dec) + if err == io.EOF { + // Chunk decoding finished. Break out of loop and send log events to output. + break + } else if err != nil { + err = fmt.Errorf("error decoding data from stream: %w", err) + return output.FLB_ERROR, err + } + + timestamp := decodeTs(ts) + msg, err := getMessage(record, config) + if err != nil { + err = fmt.Errorf("failed to get message from record: %w", err) + return output.FLB_ERROR, err + } + + event := ffi.LogEvent{ + LogMessage: msg, + Timestamp: ffi.EpochTimeMs(timestamp.UnixMilli()), + } + logEvents = append(logEvents, event) } + // Create file for IR output. + f, err := createFile(config.Path, config.File) + if err != nil { + return output.FLB_RETRY, err + } defer f.Close() - /* ================== This code is mostly boilerplate [fluent-bit reference] ================== */ - // Temporary changes were made to boilerplate so that writes to file instead of stdout. - // TODO: Update code so converts to IR and sends to s3. - // [fluent-bit reference]: https://github.com/fluent/fluent-bit-go/blob/a7a013e2473cdf62d7320822658d5816b3063758/examples/out_multiinstance/out.go#L41 - // nolint:revive - dec := output.NewDecoder(data, length) + zstdWriter, err := zstd.NewWriter(f) + if err != nil { + err = fmt.Errorf("error opening zstd writer: %w", err) + return output.FLB_RETRY, err + } + defer zstdWriter.Close() - // Buffered writer improves performance and simplifies error handling. Checking for error when - // flushing simplifies retry since nothing before error is written to file (written to buffer - // instead). Buffer size set to value provided by fluent-bit to prevent overflow errors. - w := bufio.NewWriterSize(f, length) + // IR buffer using bytes.Buffer internally, so it will dynamically grow if undersized. Using + // FourByteEncoding as default encoding. + irWriter, err := ir.NewWriterSize[ir.FourByteEncoding](length, config.TimeZone) + if err != nil { + err = fmt.Errorf("error opening IR writer: %w", err) + return output.FLB_RETRY, err + } - count := 0 - for { - ret, ts, record := output.GetRecord(dec) - if ret != 0 { - break - } + err = writeIr(irWriter, logEvents) + if err != nil { + err = fmt.Errorf("error while encoding IR: %w", err) + return output.FLB_ERROR, err + } - var timestamp time.Time - switch t := ts.(type) { - case output.FLBTime: - timestamp = ts.(output.FLBTime).Time - case uint64: - timestamp = time.Unix(int64(t), 0) - default: - fmt.Println("time provided invalid, defaulting to now.") - timestamp = time.Now() - } + // Write zstd compressed IR to file. + _, err = irWriter.CloseTo(zstdWriter) + if err != nil { + err = fmt.Errorf("error writting IR to file: %w", err) + return output.FLB_RETRY, err + } + + log.Printf("zstd compressed IR chunk written to %s", f.Name()) + return output.FLB_OK, nil +} - // Temporary change to boilerplate so writes to file. - // TODO: Update code so converts to IR and sends to s3 - _, _ = w.WriteString(fmt.Sprintf("[%d] %s: [%s, {", count, tag, timestamp.String())) +// Decodes timestamp provided by Fluent Bit engine into time.Time. If timestamp cannot be +// decoded, returns system time. +// +// Parameters: +// - ts: Timestamp provided by Fluent Bit +// +// Returns: +// - timestamp: time.Time timestamp +func decodeTs(ts interface{}) time.Time { + var timestamp time.Time + switch t := ts.(type) { + case decoder.FlbTime: + timestamp = t.Time + case uint64: + timestamp = time.Unix(int64(t), 0) + default: + fmt.Printf("time provided invalid, defaulting to now. Invalid type is %T", t) + timestamp = time.Now() + } + return timestamp +} - for k, v := range record { - _, _ = w.WriteString(fmt.Sprintf("\"%s\": %v, ", k, v)) +// Retrieves message from a record object. The message can consist of the entire object or +// just a single key. For a single key, user should set use_single_key to true in fluent-bit.conf. +// In addition user, should set single_key to "log" which is default Fluent Bit key for unparsed +// messages; however, single_key can be set to another value. To prevent failure if the key is +// missing, user can specify allow_missing_key, and behaviour will fallback to the entire object. +// +// Parameters: +// - record: JSON record from Fluent Bit with variable amount of keys +// - config: Configuration based on fluent-bit.conf +// +// Returns: +// - msg: Retrieved message +// - err: Key not found, json.Unmarshal error, string type assertion error +func getMessage(jsonRecord []byte, config *config.S3Config) (string, error) { + // If use_single_key=false, return the entire record. + if !config.UseSingleKey { + return string(jsonRecord), nil + } + + // If use_single_key=true, then look for key in record, and set message to the key's value. + var record map[string]interface{} + err := json.Unmarshal(jsonRecord, &record) + if err != nil { + return "", fmt.Errorf("failed to unmarshal json record %v: %w", jsonRecord, err) + } + + singleKeyMsg, ok := record[config.SingleKey] + if !ok { + // If key not found in record, see if allow_missing_key=true. If missing key is + // allowed, then return entire record. + if config.AllowMissingKey { + return string(jsonRecord), nil + // If key not found in record and allow_missing_key=false, then return an error. + } else { + return "", fmt.Errorf("key %s not found in record %v", config.SingleKey, record) } + } + + stringMsg, ok := singleKeyMsg.(string) + if !ok { + return "", fmt.Errorf("string type assertion for message failed %v", singleKeyMsg) + } + + return stringMsg, nil +} + +// Creates a new file to output IR. A new file is created for every Fluent Bit chunk. +// The system timestamp is added as a suffix. +// +// Parameters: +// - path: Directory path to create to write files inside +// - file: File name prefix +// +// Returns: +// - f: The created file +// - err: Could not create directory, could not create file +func createFile(path string, file string) (*os.File, error) { + // Make directory if does not exist. + err := os.MkdirAll(path, 0o644) + if err != nil { + err = fmt.Errorf("failed to create directory %s: %w", path, err) + return nil, err + } + + currentTime := time.Now() + + // Format the time as a string in RFC3339 format. + timeString := currentTime.Format(time.RFC3339) + + fileWithTs := fmt.Sprintf("%s_%s.zst", file, timeString) + + fullFilePath := filepath.Join(path, fileWithTs) - _, _ = w.WriteString("}\n") - count++ + // If the file doesn't exist, create it. + f, err := os.OpenFile(fullFilePath, os.O_WRONLY|os.O_CREATE, 0o644) + if err != nil { + err = fmt.Errorf("failed to create file %s: %w", fullFilePath, err) + return nil, err } - /* ================== End of boilerplate ================== */ + return f, nil +} - // If an error occurs writing to a Writer, Writer.Flush will return the error. - err = w.Flush() - return err +// Writes log events to a IR Writer. +// +// Parameters: +// - irWriter: CLP IR writer to write each log event with +// - eventBuffer: A slice of log events to be encoded +// +// Returns: +// - err: error if an event could not be written +func writeIr(irWriter *ir.Writer, eventBuffer []ffi.LogEvent) error { + for _, event := range eventBuffer { + _, err := irWriter.Write(event) + if err != nil { + err = fmt.Errorf("failed to encode event %v into ir: %w", event, err) + return err + } + } + return nil } diff --git a/plugins/out_clp_s3/out_clp_s3.go b/plugins/out_clp_s3/out_clp_s3.go index 9e4a026..782dc6a 100644 --- a/plugins/out_clp_s3/out_clp_s3.go +++ b/plugins/out_clp_s3/out_clp_s3.go @@ -1,14 +1,13 @@ -// Package defines high-level callback functions required by fluent-bit go plugin documentation. See -// article/repo fo more information [fluent-bit go], [fluent-bit stdout example]. +// Package defines high-level callback functions required by Fluent Bit go plugin documentation. +// See article/repo fo more information [Fluent Bit go], [Fluent Bit stdout example]. // -// [fluent-bit go]: https://docs.fluentbit.io/manual/development/golang-output-plugins -// [fluent-bit stdout example]: https://github.com/fluent/fluent-bit-go/tree/master/examples/out_multiinstance -// nolint:revive - -// Note package name "main" is required by fluent-bit which suppresses go docs. Do not remove -// export, required for use by fluent-bit C calls. +// [Fluent Bit go]: https://docs.fluentbit.io/manual/development/golang-output-plugins +// [Fluent Bit stdout example]: https://github.com/fluent/fluent-bit-go/tree/master/examples/out_multiinstance package main +// Note package name "main" is required by Fluent Bit which suppresses go docs. Do not remove +// export, required for use by Fluent Bit C calls. + import ( "C" "log" @@ -16,72 +15,75 @@ import ( "github.com/fluent/fluent-bit-go/output" - "github.com/y-scope/fluent-bit-clp/config" - "github.com/y-scope/fluent-bit-clp/internal/constant" + "github.com/y-scope/fluent-bit-clp/internal/config" "github.com/y-scope/fluent-bit-clp/plugins/out_clp_s3/flush" ) -// Required fluent-bit registration callback. +const s3PluginName = "out_clp_s3" + +// Required Fluent Bit registration callback. // // Parameters: -// - def: fluent-bit plugin definition +// - def: Fluent Bit plugin definition // // Returns: // - nil // //export FLBPluginRegister func FLBPluginRegister(def unsafe.Pointer) int { - log.Printf("[%s] Register called", constant.S3PluginName) - return output.FLBPluginRegister(def, constant.S3PluginName, "Clp s3 plugin") + log.Printf("[%s] Register called", s3PluginName) + return output.FLBPluginRegister(def, s3PluginName, "Clp s3 plugin") } -// Required fluent-bit initialization callback. +// Required Fluent Bit initialization callback. // // Parameters: -// - def: fluent-bit plugin reference +// - def: Fluent Bit plugin reference // // Returns: -// - code: fluent-bit success code (OK, RETRY, ERROR) +// - code: Fluent Bit success code (OK, RETRY, ERROR) // //export FLBPluginInit func FLBPluginInit(plugin unsafe.Pointer) int { - // Returns pointer to a config instance based on fluent-bit configuration. - config, err := config.S3New(plugin) + config, err := config.NewS3(plugin) if err != nil { log.Fatalf("Failed to load configuration %s", err) } - log.Printf("[%s] Init called for id: %s", constant.S3PluginName, config.Id) + log.Printf("[%s] Init called for id: %s", s3PluginName, config.Id) - // Set the context for this instance so that params can be retrieved during flush. Context - // should only be set once to avoid race condition. - output.FLBPluginSetContext(plugin, config) + // Set the context for this instance so that params can be retrieved during flush. + output.FLBPluginSetContext(plugin, &config) return output.FLB_OK } -// Required fluent-bit flush callback. +// Required Fluent Bit flush callback. // // Parameters: -// - ctx: fluent-bit plugin context -// - data: msgpack data +// - ctx: Fluent Bit plugin context +// - data: Msgpack data // - length: Byte length -// - tag: fluent-bit tag +// - tag: Fluent Bit tag // // Returns: -// - code: fluent-bit success code (OK, RETRY, ERROR) +// - code: Fluent Bit success code (OK, RETRY, ERROR) // //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { p := output.FLBPluginGetContext(ctx) // Type assert context back into the original type for the Go variable. - config := (p).(*config.S3Config) - log.Printf("[%s] Flush called for id: %s", constant.S3PluginName, config.Id) + config, ok := p.(*config.S3Config) + if !ok { + log.Fatal("Could not read context during flush") + } - err := flush.File(data, int(length), C.GoString(tag), config) + log.Printf("[%s] Flush called for id: %s", s3PluginName, config.Id) + + code, err := flush.ToFile(data, int(length), C.GoString(tag), config) if err != nil { - log.Printf("error flushing data %s", err) - // retry later - return output.FLB_RETRY + log.Printf("error flushing data: %s", err) + // RETRY or ERROR + return code } return output.FLB_OK @@ -89,30 +91,35 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int //export FLBPluginExit func FLBPluginExit() int { - log.Printf("[%s] Exit called for unknown instance", constant.S3PluginName) + log.Printf("[%s] Exit called for unknown instance", s3PluginName) return output.FLB_OK } -// Required fluent-bit exit callback. +// Required Fluent Bit exit callback. // // Parameters: -// - ctx: fluent-bit plugin context +// - ctx: Fluent Bit plugin context // // Returns: -// - code: fluent-bit success code (OK, RETRY, ERROR) +// - code: Fluent Bit success code (OK, RETRY, ERROR) // //export FLBPluginExitCtx func FLBPluginExitCtx(ctx unsafe.Pointer) int { p := output.FLBPluginGetContext(ctx) // Type assert context back into the original type for the Go variable. - config := (p).(*config.S3Config) - log.Printf("[%s] Exit called for id: %s", constant.S3PluginName, config.Id) + + config, ok := p.(*config.S3Config) + if !ok { + log.Fatal("Could not read context during flush") + } + + log.Printf("[%s] Exit called for id: %s", s3PluginName, config.Id) return output.FLB_OK } //export FLBPluginUnregister func FLBPluginUnregister(def unsafe.Pointer) { - log.Printf("[%s] Unregister called", constant.S3PluginName) + log.Printf("[%s] Unregister called", s3PluginName) output.FLBPluginUnregister(def) } diff --git a/plugins/out_clp_s3/plugin-config.conf b/plugins/out_clp_s3/plugin-config.conf index dd5c2da..43f24d8 100644 --- a/plugins/out_clp_s3/plugin-config.conf +++ b/plugins/out_clp_s3/plugin-config.conf @@ -1,5 +1,4 @@ -# plugin configuration referenced in fluent-bit configuration +# Plugin configuration referenced in Fluent Bit configuration [PLUGINS] - Path /fluent-bit/bin/out_clp_s3.so - #Path ./out_clp_s3.so - + path /fluent-bit/bin/out_clp_s3.so + #path ./out_clp_s3.so