Skip to content

Commit

Permalink
Setup template output plugin that writes to file (#2)
Browse files Browse the repository at this point in the history
Co-authored-by: David Marcovitch <[email protected]>
  • Loading branch information
davemarco and David Marcovitch authored Jun 11, 2024
1 parent 38744c9 commit 6ccee05
Show file tree
Hide file tree
Showing 12 changed files with 449 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ linters-settings:
- "dupSubExpr"
goimports:
# Put imports beginning with prefix after 3rd-party packages.
local-prefixes: "github.com/y-scope/clp-ffi-go"
local-prefixes: "github.com/y-scope/fluent-bit-clp"
nakedret:
# Completely disallow naked returns
max-func-lines: 0
Expand Down
74 changes: 74 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Package implements loading of fluent-bit configuration file. Allows plugin to access values
// defined in configuration file.

package config

import (
"errors"
"fmt"
"log"
"unsafe"

"github.com/fluent/fluent-bit-go/output"
)

// Holds settings for s3 clp plugin from user defined fluent-bit configuration file.
type S3Config struct {
Id string
Path string
File string
}

// Generates configuration struct containing user-defined settings.
//
// Parameters:
// - plugin: fluent-bit plugin reference
//
// Returns:
// - S3Config: Configuration based on fluent-bit.conf
// - err: All errors in config wrapped
func S3New(plugin unsafe.Pointer) (*S3Config, error) {

// Slice holds config errors allowing function to return all errors at once instead of
// one at a time. User can fix all errors at once.
configErrors := []error{}

id, errID := getValueFLBConfig(plugin, "Id")
configErrors = append(configErrors, errID)

path, errPath := getValueFLBConfig(plugin, "Path")
configErrors = append(configErrors, errPath)

file, errFile := getValueFLBConfig(plugin, "File")
configErrors = append(configErrors, errFile)

config := &S3Config{
Id: id,
Path: path,
File: file,
}

// Wrap all errors into one error before returning. Automically excludes nil errors.
err := errors.Join(configErrors...)
return config, err
}

// Retrieves individuals values from fluent-bit.conf.
//
// Parameters:
// - plugin: fluent-bit plugin reference
// - configKey: Key from fluent-bit.conf
//
// Returns:
// - configValue
// - err: Error if config value is blank
func getValueFLBConfig(plugin unsafe.Pointer, configKey string) (string, error) {
configValue := output.FLBPluginConfigKey(plugin, configKey)

if configValue == "" {
err := fmt.Errorf("%s is not defined in fluent-bit configuration", configKey)
return configValue, err
}
log.Printf("fluent-bit config key %s set to value %s", configKey, configValue)
return configValue, nil
}
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
module github.com/y-scope/fluent-bit-clp

go 1.22.3

require github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c

require github.com/ugorji/go/codec v1.1.7 // indirect
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c h1:yKN46XJHYC/gvgH2UsisJ31+n4K3S7QYZSfU2uAWjuI=
github.com/fluent/fluent-bit-go v0.0.0-20230731091245-a7a013e2473c/go.mod h1:L92h+dgwElEyUuShEwjbiHjseW410WIcNz+Bjutc8YQ=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
4 changes: 4 additions & 0 deletions internal/constant/constant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Package contains constants.
package constant

const S3PluginName = "out_clp_s3"
33 changes: 33 additions & 0 deletions plugins/out_clp_s3/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Builds plugin binary in go container and then runs in fluent-bit container.

# Using bullseye tag to match debian version from fluent-bit image [fluent-bit Debian version].
# Matching debian versions prevents glibc compatibility issues.
# [fluent-bit Debian version]: https://github.com/fluent/fluent-bit/blob/master/dockerfiles/Dockerfile
FROM golang:1.22.3-bullseye as builder

# install task
RUN sh -c "$(curl --location https://taskfile.dev/install.sh)" -- -d -b /bin

WORKDIR /root

ENV GOOS=linux\
GOARCH=amd64

COPY / /root/

RUN go mod download

WORKDIR /root/plugins/out_clp_s3

RUN task build

FROM fluent/fluent-bit:3.0.6

# Copy plugin binary to fluent-bit image.
COPY --from=builder /root/plugins/out_clp_s3/out_clp_s3.so /fluent-bit/bin/
COPY --from=builder /root/plugins/out_clp_s3/*.conf /fluent-bit/etc/

# Port for listening interface for HTTP Server.
EXPOSE 2020

CMD ["/fluent-bit/bin/fluent-bit", "--config", "/fluent-bit/etc/fluent-bit.conf"]
51 changes: 51 additions & 0 deletions plugins/out_clp_s3/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,54 @@
# S3 CLP output plugin

Output plugin for fluent-bit that sends records in CLP IR format to AWS S3.

### Getting Started

You can start the plugin
- [Using Docker](#using-docker)
- [Using local setup](#using-local-setup)

#### Using Docker

First build the image
```shell
docker build ../../ -t fluent-bit-clp -f Dockerfile
```

Start a container
```shell
docker run -it -v $(pwd):/root/fluent-bit/logs:rw --rm fluent-bit-clp
```

Dummy logs will be written to your current working directory.

#### Using local setup

Install [go][1] and [fluent-bit][2]

Run task to build a binary in the plugin directory
```shell
task build
```
Change [plugin-config.conf](plugin-config.conf) to reference the plugin binary
```shell
[PLUGINS]
Path /<LOCAL_PATH>/out_clp_s3.so
```

Change [fluent-bit.conf](fluent-bit.conf) to specify
- Id (output name),
- Path (output directory)
- File (output file name)

```shell
[OUTPUT]
name out_clp_s3
Id <OUTPUT_NAME>
Path <OUTPUT_DIRECTORY>
File <OUTPUT_FILE_NAME>
match *
```

[1]: https://go.dev/doc/install
[2]: https://docs.fluentbit.io/manual/installation/getting-started-with-fluent-bit
15 changes: 15 additions & 0 deletions plugins/out_clp_s3/Taskfile.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: '3'

tasks:
build:
cmds:
- go build -buildmode=c-shared -o out_clp_s3.so
sources:
- ./**/*.go
generates:
- out_clp_s3.h
- out_clp_s3.go

clean:
cmds:
- rm -rf *.so *.h *~
53 changes: 53 additions & 0 deletions plugins/out_clp_s3/fluent-bit.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#Sample fluent-bit configuration with output set to clp s3 plugin.

[SERVICE]
# Flush
# =====
# set an interval of seconds before to flush records to a destination
flush 1

# Daemon
# ======
# instruct Fluent Bit to run in foreground or background mode.
daemon Off

# Log_Level
# =========
# Set the verbosity level of the service, values can be:
#
# - error
# - warning
# - info
# - debug
# - trace
#
# by default 'info' is set, that means it includes 'error' and 'warning'.
log_level info

# Plugins File
# ============
# specify an optional 'Plugins' configuration file to load external plugins.
plugins_file /fluent-bit/etc/plugin-config.conf
# plugins_file plugin-config.conf

# HTTP Server
# ===========
# Enable/Disable the built-in HTTP Server for metrics.
http_server Off
http_listen 0.0.0.0
http_port 2020

[INPUT]
name cpu
tag cpu.local

# Read interval (sec) Default: 1
interval_sec 1

[OUTPUT]
name out_clp_s3
Id dummy_metrics
Path /root/fluent-bit/logs
#Path ./
File dummy.txt
match *
88 changes: 88 additions & 0 deletions plugins/out_clp_s3/flush/flush.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Package implements methods to send data to output. All data provided by fluent-bit is encoded
// with msgpack.

package flush

import (
"C"
"bufio"
"fmt"
"os"
"path/filepath"
"time"
"unsafe"

"github.com/fluent/fluent-bit-go/output"

"github.com/y-scope/fluent-bit-clp/config"
)

// Flushes data to file.
//
// Parameters:
// - data: msgpack data
// - length: Byte length
// - tag: fluent-bit tag
// - S3Config: Configuration based on fluent-bit.conf
//
// Returns:
// - err: Error if flush fails
func File(data unsafe.Pointer, length int, tag string, config *config.S3Config) error {
fullFilePath := filepath.Join(config.Path, config.File)

// If the file doesn't exist, create it, or append to the file. Will still cause error if there
// is no directory
f, err := os.OpenFile(fullFilePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o644)
if err != nil {
return err
}

defer f.Close()

/* ================== This code is mostly boilerplate [fluent-bit reference] ================== */
// Temporary changes were made to boilerplate so that writes to file instead of stdout.
// TODO: Update code so converts to IR and sends to s3.
// [fluent-bit reference]: https://github.com/fluent/fluent-bit-go/blob/a7a013e2473cdf62d7320822658d5816b3063758/examples/out_multiinstance/out.go#L41
// nolint:revive
dec := output.NewDecoder(data, length)

// Buffered writer improves performance and simplifies error handling. Checking for error when
// flushing simplifies retry since nothing before error is written to file (written to buffer
// instead). Buffer size set to value provided by fluent-bit to prevent overflow errors.
w := bufio.NewWriterSize(f, length)

count := 0
for {
ret, ts, record := output.GetRecord(dec)
if ret != 0 {
break
}

var timestamp time.Time
switch t := ts.(type) {
case output.FLBTime:
timestamp = ts.(output.FLBTime).Time
case uint64:
timestamp = time.Unix(int64(t), 0)
default:
fmt.Println("time provided invalid, defaulting to now.")
timestamp = time.Now()
}

// Temporary change to boilerplate so writes to file.
// TODO: Update code so converts to IR and sends to s3
_, _ = w.WriteString(fmt.Sprintf("[%d] %s: [%s, {", count, tag, timestamp.String()))

for k, v := range record {
_, _ = w.WriteString(fmt.Sprintf("\"%s\": %v, ", k, v))
}

_, _ = w.WriteString("}\n")
count++
}
/* ================== End of boilerplate ================== */

// If an error occurs writing to a Writer, Writer.Flush will return the error.
err = w.Flush()
return err
}
Loading

0 comments on commit 6ccee05

Please sign in to comment.