-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for buffering records on disk allowing control over upload size. #5
Conversation
Removed code to shorten PR. Specifically removed recovery code that looks for stores on startup, and sends to s3. Recovery logic will be added back for next PR. |
plugins/out_clp_s3/flush/flush.go
Outdated
if ctx.Config.UseDiskBuffer { | ||
zstdFile, ok := tag.Writer.ZstdBuffer.(*os.File) | ||
if !ok { | ||
return fmt.Errorf("error type assertion from buffer to file failed") | ||
} | ||
// Seek to start of Zstd file. | ||
zstdFile.Seek(0, io.SeekStart) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this logic should be in IrZstdWriter.Close, but ideally the underlying zstd and ir buffers shouldn't leak out of IrZstdWriter.
Co-authored-by: davidlion <[email protected]>
Description
Plugin can now accumulate logs on disk and upload to s3 once a certain size threshold is reached. Logs are stored
using a "trash compactor" design described below. A recovery mechanism was added for abrupt crashes. On start up, the plugin can find buffered logs stored on disk and send them to s3. Lastly, I added an index as part of the object key which increments after each upload.
I wanted to include a timeout threshold in this PR (i.e. will send buffered logs to s3 after timeout even if size threshold is not reached), however, the PR is already too large. The timeout is non-trivial since we have no easy way to retake execution of Fluent Bit after a timeout assuming no new logs are sent to output plugin. I believe timeout requires the use of goroutines. I will add another PR to incorporate the timeout threshold.
Trash Compactor Disk Store
When designing the buffer, I had to decide whether to buffer logs on disk as uncompressed or compressed. I decided on compressed over uncompressed for a few reasons.
Using a compressed buffer introduced challenges related to data recovery and the compression ratio.
A simple approach for the buffer would be to send all the events destined for one S3 upload to a streaming compressor and only close the stream when the target upload size is reached. However, the streaming compressor will keep frames/blocks open in between receipt of Fluent Bit chunks. Open frames/blocks may not be recoverable after an abrupt crash. Therefore, I decided to "compact" each chunk into its own Zstd frame. When the upload size is reached, stacks of frames are sent to S3. As a result, for the majority of runtime, logs are stored as valid Zstd and can be sent to s3 on startup. An EndofStream byte is appended to Zstd data on upload to terminate IR stream. This approach fixes data recovery issue; however, if the chunks are small, the compressor ratio will be poor.
To fix the compression ratio, I added a second uncompressed IR buffer. First, log events are converted to uncompressed IR and buffered into "bins". Uncompressed IR represents uncompressed trash in "trash compactor". Once the bin is full, the bin is then "compacted" into its own separate Zstd frame. Each bin may contain multiple Fluent Bit chunks. Adding a buffer for uncompressed IR fixes the poor compression ratio associated with "compacting" each Fluent Bit chunk.
Below is summary of control flow for disk buffer:
There is currently no timeout functionality as mentioned at the top of PR description. As a result, the plugin will not upload logs if log quantity < upload size threshold. A timeout is being added in next PR.
Recovery
On startup the plugin will look for IR and Zstd files in the store directory and group them based on tag. It will then compress the IR into the Zstd file and send it to s3.
Index
I added an index in the s3 object key which increments after each upload, helping prevent namespace collision.
Validation performed
Tested buffering and recovery worked as expected with test logs. Tested files sent to s3 could be archived by clp.