From 17c9cda510c0ceb40d480ef3dc35810e2d063f13 Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Tue, 13 Aug 2024 14:17:09 +0000 Subject: [PATCH 01/13] godoc test --- README.md | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 344991c..187f015 100644 --- a/README.md +++ b/README.md @@ -6,28 +6,28 @@ Repository contains Fluent Bit output plugins that store records in CLP's compre The general flow is as follows: -```mermaid -%%{init: { - 'theme': 'base', - 'themeVariables': { - 'primaryColor': '#0066cc', - 'primaryTextColor': '#fff', - 'primaryBorderColor': 'transparent', - 'lineColor': '#9580ff', - 'secondaryColor': '#9580ff', - 'tertiaryColor': '#fff' - } - } -}%% -flowchart LR - A(Fluent Bit Input) --> B - subgraph CLP Output Plugin - B(Parse into IR) --> C(Compress with Zstd) - end - C --> D(Output) - classDef format fill:#007DF4,color:white - class A,B,C,D format -``` +//mermaid +//%%{init: { +// 'theme': 'base', +// 'themeVariables': { +// 'primaryColor': '#0066cc', +// 'primaryTextColor': '#fff', +// 'primaryBorderColor': 'transparent', +// 'lineColor': '#9580ff', +// 'secondaryColor': '#9580ff', +// 'tertiaryColor': '#fff' +// } +// } +//}%% +//flowchart LR +// A(Fluent Bit Input) --> B +// subgraph CLP Output Plugin +// B(Parse into IR) --> C(Compress with Zstd) +// end +// C --> D(Output) +// classDef format fill:#007DF4,color:white +// class A,B,C,D format + #### Fluent Bit Input From 74e050e4793ea99e775608d5718a53c518b93abf Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Tue, 13 Aug 2024 14:24:10 +0000 Subject: [PATCH 02/13] revert test did not work --- README.md | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 187f015..344991c 100644 --- a/README.md +++ b/README.md @@ -6,28 +6,28 @@ Repository contains Fluent Bit output plugins that store records in CLP's compre The general flow is as follows: -//mermaid -//%%{init: { -// 'theme': 'base', -// 'themeVariables': { -// 'primaryColor': '#0066cc', -// 'primaryTextColor': '#fff', -// 'primaryBorderColor': 'transparent', -// 'lineColor': '#9580ff', -// 'secondaryColor': '#9580ff', -// 'tertiaryColor': '#fff' -// } -// } -//}%% -//flowchart LR -// A(Fluent Bit Input) --> B -// subgraph CLP Output Plugin -// B(Parse into IR) --> C(Compress with Zstd) -// end -// C --> D(Output) -// classDef format fill:#007DF4,color:white -// class A,B,C,D format - +```mermaid +%%{init: { + 'theme': 'base', + 'themeVariables': { + 'primaryColor': '#0066cc', + 'primaryTextColor': '#fff', + 'primaryBorderColor': 'transparent', + 'lineColor': '#9580ff', + 'secondaryColor': '#9580ff', + 'tertiaryColor': '#fff' + } + } +}%% +flowchart LR + A(Fluent Bit Input) --> B + subgraph CLP Output Plugin + B(Parse into IR) --> C(Compress with Zstd) + end + C --> D(Output) + classDef format fill:#007DF4,color:white + class A,B,C,D format +``` #### Fluent Bit Input From cba3d2023735eca312f0a588be2ca5db0d64908c Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Tue, 13 Aug 2024 17:15:31 +0000 Subject: [PATCH 03/13] interface for irzstd writer --- README.md | 2 +- internal/irzstd/{irzstd.go => disk.go} | 189 +++++---------------- internal/irzstd/memory.go | 141 +++++++++++++++ internal/outctx/context.go | 4 +- internal/outctx/manager.go | 2 +- plugins/out_clp_s3/internal/flush/flush.go | 7 +- 6 files changed, 189 insertions(+), 156 deletions(-) rename internal/irzstd/{irzstd.go => disk.go} (74%) create mode 100644 internal/irzstd/memory.go diff --git a/README.md b/README.md index 344991c..32856b7 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ tailing log files and other Fluent Bit instances. #### CLP Output Plugin -Output plugin recieves logs from Fluent Bit and parses them into [CLP IR][1]. CLP IR consists of a +Output plugin receives logs from Fluent Bit and parses them into [CLP IR][1]. CLP IR consists of a timestamp, a list of variable values, and the log type. IR is then compressed with [Zstd][3] in default mode without dictionaries. diff --git a/internal/irzstd/irzstd.go b/internal/irzstd/disk.go similarity index 74% rename from internal/irzstd/irzstd.go rename to internal/irzstd/disk.go index 9f9e1ee..35ff90a 100644 --- a/internal/irzstd/irzstd.go +++ b/internal/irzstd/disk.go @@ -1,10 +1,6 @@ -// Package implements writer that converts log events to Zstd compressed IR. Effectively chaining -// together [ir.Writer] and [zstd.Encoder] in series. - package irzstd import ( - "bytes" "fmt" "io" "log" @@ -20,17 +16,7 @@ import ( // 2 MB threshold to buffer IR before compressing to Zstd when use_disk_buffer is on. const irSizeThreshold = 2 << 20 -// Converts log events into Zstd compressed IR. Writer can be initialized with use_disk_buffer -// on/off depending on user configuration. -// -// Behavior with use_disk_buffer off: -// Log events provided to writer are immediately converted to Zstd compressed IR and stored in -// [Writer.ZstdBuffer]. With use_disk_buffer off, ZstdBuffer is a memory buffer. After the Zstd -// buffer receives logs, they are immediately sent to s3. There is no IR buffer and it is set to -// nil. -// -// Behavior with use_disk_buffer on: -// Logs events are not immediately converted to Zstd compressed IR, and instead compressed using +// Converts log events into Zstd compressed IR using // "trash compactor" design. Log events are converted to uncompressed IR and buffered into "bins". // Uncompressed IR represents uncompressed trash in "trash compactor". Once the bin is full, the // bin is "compacted" into its own separate Zstd frame. The compressor is explicitly closed after @@ -44,13 +30,12 @@ const irSizeThreshold = 2 << 20 // "bins" (i.e. Fluent Bit chunks could be directly "compacted"); however, if the chunks are // small, the compression ratio would deteriorate. "Trash compactor" design provides protection from // log loss during abrupt crashes and maintains a high compression ratio. -type Writer struct { +type diskWriter struct { useDiskBuffer bool irPath string // Path variable for debugging zstdPath string // Path variable for debugging irFile *os.File zstdFile *os.File - zstdMemBuf *bytes.Buffer irWriter *ir.Writer size int timezone string @@ -58,34 +43,6 @@ type Writer struct { zstdWriter *zstd.Encoder } -// Opens a new [Writer] with a memory buffer for Zstd Output. For use when use_disk_store is off. -// Writer does not use IR buffer. -// -// Parameters: -// - timezone: Time zone of the log source -// - size: Byte length -// -// Returns: -// - Writer: Writer for Zstd compressed IR -// - err: Error opening Zstd/IR writers -func NewMemWriter(timezone string, size int) (*Writer, error) { - var membuf bytes.Buffer - irWriter, zstdWriter, err := newIrZstdWriters(&membuf, timezone, size) - if err != nil { - return nil, err - } - - writer := Writer{ - size: size, - timezone: timezone, - irWriter: irWriter, - zstdWriter: zstdWriter, - zstdMemBuf: &membuf, - } - - return &writer, nil -} - // Opens a new [Writer] using disk files for IR and Zstd buffers. For use when use_disk_store // is on. // @@ -103,7 +60,7 @@ func NewDiskWriter( size int, irPath string, zstdPath string, -) (*Writer, error) { +) (Writer, error) { irFile, zstdFile, err := newFileBuffers(irPath, zstdPath) if err != nil { return nil, err @@ -114,7 +71,7 @@ func NewDiskWriter( return nil, err } - writer := Writer{ + writer := diskWriter{ useDiskBuffer: true, size: size, timezone: timezone, @@ -147,7 +104,7 @@ func RecoverWriter( size int, irPath string, zstdPath string, -) (*Writer, error) { +) (Writer, error) { irFile, zstdFile, err := openBufferFiles(irPath, zstdPath) if err != nil { return nil, fmt.Errorf("error opening files: %w", err) @@ -158,7 +115,7 @@ func RecoverWriter( return nil, err } - writer := Writer{ + writer := diskWriter{ useDiskBuffer: true, size: size, timezone: timezone, @@ -170,7 +127,7 @@ func RecoverWriter( zstdWriter: zstdWriter, } - irFileSize, _, err := writer.GetFileSizes() + irFileSize, err := writer.getIrFileSize() if err != nil { return nil, fmt.Errorf("error getting size of disk buffer: %w", err) } @@ -193,19 +150,12 @@ func RecoverWriter( // // Returns: // - err: Error writing IR/Zstd, error flushing buffers -func (w *Writer) WriteIrZstd(logEvents []ffi.LogEvent) error { - // Write log events to IR writer buffer. +func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) error { err := writeIr(w.irWriter, logEvents) if err != nil { return err } - // If disk buffering is off, write directly to the Zstd buffer (skipping the IR buffer). - if !w.useDiskBuffer { - _, err = w.irWriter.WriteTo(w.zstdWriter) - return err - } - numBytes, err := w.irWriter.WriteTo(w.irFile) if err != nil { return err @@ -231,18 +181,17 @@ func (w *Writer) WriteIrZstd(logEvents []ffi.LogEvent) error { // // Returns: // - err: Error flushing/closing buffers -func (w *Writer) CloseStreams() error { +func (w *diskWriter) CloseStreams() error { // IR buffer may not be empty, so must be flushed prior to adding trailing EndOfStream byte. If // not using disk buffering, IR writer buffer should always be empty since it is always flushed // to Zstd buffer on write. - if w.useDiskBuffer { - err := w.flushIrBuffer() - if err != nil { - return fmt.Errorf("error flushing IR buffer: %w", err) - } + + err := w.flushIrBuffer() + if err != nil { + return fmt.Errorf("error flushing IR buffer: %w", err) } - _, err := w.irWriter.CloseTo(w.zstdWriter) + _, err = w.irWriter.CloseTo(w.zstdWriter) if err != nil { return err } @@ -254,10 +203,6 @@ func (w *Writer) CloseStreams() error { return err } - if !w.useDiskBuffer { - return nil - } - _, err = w.zstdFile.Seek(0, io.SeekStart) if err != nil { return err @@ -271,19 +216,13 @@ func (w *Writer) CloseStreams() error { // // Returns: // - err: Error opening IR writer, error IR buffer not empty, error with type assertion -func (w *Writer) Reset() error { +func (w *diskWriter) Reset() error { var err error w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone) if err != nil { return err } - if !w.useDiskBuffer { - w.zstdMemBuf.Reset() - w.zstdWriter.Reset(w.zstdMemBuf) - return nil - } - // Flush should be called prior to reset, so buffer should be empty. There may be a future // use case to truncate a non-empty IR buffer; however, there is currently no use case // so safer to throw an error. @@ -314,26 +253,16 @@ func (w *Writer) Reset() error { // // Returns: // - err: Error called with useDiskBuffer off, error calling stat -func (w *Writer) GetFileSizes() (int, int, error) { - if !w.useDiskBuffer { - return 0, 0, fmt.Errorf("error cannot get file sizes when useDiskBuffer is off") - } - - irFileInfo, err := w.irFile.Stat() - if err != nil { - return 0, 0, err - } - - irFileSize := int(irFileInfo.Size()) +func (w *diskWriter) GetOutputSize() (int, error) { zstdFileInfo, err := w.zstdFile.Stat() if err != nil { - return 0, 0, err + return 0, err } zstdFileSize := int(zstdFileInfo.Size()) - return irFileSize, zstdFileSize, err + return zstdFileSize, err } // Closes [Writer]. Currently used during recovery only, and advise caution using elsewhere. @@ -344,7 +273,7 @@ func (w *Writer) GetFileSizes() (int, int, error) { // // Returns: // - err: Error closing irWriter, error closing files -func (w *Writer) Close() error { +func (w *diskWriter) Close() error { if w.irWriter != nil { err := w.irWriter.Serializer.Close() if err != nil { @@ -352,10 +281,6 @@ func (w *Writer) Close() error { } } - if !w.useDiskBuffer { - return nil - } - err := w.irFile.Close() if err != nil { return fmt.Errorf("error could not close IR file %s: %w", w.irPath, err) @@ -372,17 +297,14 @@ func (w *Writer) Close() error { // Getter for Zstd Output. // Returns: // - zstdOutput: Reader for writer Zstd output -func (w *Writer) GetZstdOutput() io.Reader { - if !w.useDiskBuffer { - return w.zstdMemBuf - } +func (w *diskWriter) GetZstdOutput() io.Reader { return w.zstdFile } // Getter for useDiskBuffer. // Returns: // - useDiskBuffer: On/off for disk buffering -func (w *Writer) GetUseDiskBuffer() bool { +func (w *diskWriter) GetUseDiskBuffer() bool { return w.useDiskBuffer } @@ -391,7 +313,7 @@ func (w *Writer) GetUseDiskBuffer() bool { // // Returns: // - err: Error nil buffer, error from Zstd Encoder, error from operations on file -func (w *Writer) flushIrBuffer() error { +func (w *diskWriter) flushIrBuffer() error { if (w.irFile == nil) || (w.zstdFile == nil) { return fmt.Errorf("error flush called with non-existent buffer") } @@ -437,25 +359,6 @@ func (w *Writer) flushIrBuffer() error { return nil } -// Writes log events to a IR Writer. -// -// Parameters: -// - irWriter: CLP IR writer to write each log event with -// - logEvents: A slice of log events to be encoded -// -// Returns: -// - err: error if an event could not be written -func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) error { - for _, event := range logEvents { - _, err := irWriter.Write(event) - if err != nil { - err = fmt.Errorf("failed to encode event %v into ir: %w", event, err) - return err - } - } - return nil -} - // Creates file buffers to hold logs prior to sending to s3. // // Parameters: @@ -509,36 +412,6 @@ func createFile(path string) (*os.File, error) { return f, nil } -// Opens a new [ir.Writer] and [zstd.Encoder]. -// -// Parameters: -// - zstdOutput: Output location for Zstd -// - timezone: Time zone of the log source -// - size: Byte length -// -// Returns: -// - irWriter: Writer for CLP IR -// - ZstdWriter: Writer for Zstd -// - err: Error opening IR/Zstd writer -func newIrZstdWriters( - zstdOutput io.Writer, - timezone string, - size int, -) (*ir.Writer, *zstd.Encoder, error) { - // IR buffer using bytes.Buffer internally, so it will dynamically grow if undersized. Using - // FourByteEncoding as default encoding. - irWriter, err := ir.NewWriterSize[ir.FourByteEncoding](size, timezone) - if err != nil { - return nil, nil, fmt.Errorf("error opening IR writer: %w", err) - } - - zstdWriter, err := zstd.NewWriter(zstdOutput) - if err != nil { - return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err) - } - return irWriter, zstdWriter, err -} - // Opens IR and Zstd disk buffer files. Zstd file whence is [io.SeekEnd]. // // Parameters: @@ -568,3 +441,21 @@ func openBufferFiles(irPath string, zstdPath string) (*os.File, *os.File, error) return irFile, zstdFile, nil } + +// Get size of IR and Zstd files. In general, can use [irTotalBytes] to track size of IR file; +// however, [irTotalBytes] will only track writes by current process and will not have info for +// recovered stores. For recovered stores, must use stat to get size. [zstd] does not provide the +// amount of bytes written with each write. Therefore, cannot keep track of size with variable as +// implemented for IR with [IrTotalBytes]. Instead, must always use stat. +// +// Returns: +// - err: Error called with useDiskBuffer off, error calling stat +func (w *diskWriter) getIrFileSize() (int, error) { + irFileInfo, err := w.irFile.Stat() + if err != nil { + return 0, err + } + + irFileSize := int(irFileInfo.Size()) + return irFileSize, err +} diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go new file mode 100644 index 0000000..8b8248a --- /dev/null +++ b/internal/irzstd/memory.go @@ -0,0 +1,141 @@ +package irzstd + +import ( + "bytes" + "fmt" + "io" + + "github.com/klauspost/compress/zstd" + + "github.com/y-scope/clp-ffi-go/ffi" + "github.com/y-scope/clp-ffi-go/ir" +) + +// Converts log events into Zstd compressed IR . Log events provided to writer are immediately +// converted to Zstd compressed IR and stored in [MemoryWriter.ZstdBuffer]. After the Zstd buffer +// receives logs, they are immediately sent to s3. +type memoryWriter struct { + zstdBuffer *bytes.Buffer + irWriter *ir.Writer + size int + timezone string + zstdWriter *zstd.Encoder +} + +// Opens a new [MemoryWriter] with a memory buffer for Zstd Output. For use when use_disk_store is +// off. +// +// Parameters: +// - timezone: Time zone of the log source +// - size: Byte length +// +// Returns: +// - Writer: Writer for Zstd compressed IR +// - err: Error opening Zstd/IR writers +func NewMemoryWriter(timezone string, size int) (Writer, error) { + var zstdBuffer bytes.Buffer + irWriter, zstdWriter, err := newIrZstdWriters(&zstdBuffer, timezone, size) + if err != nil { + return nil, err + } + + writer := memoryWriter{ + size: size, + timezone: timezone, + irWriter: irWriter, + zstdWriter: zstdWriter, + zstdBuffer: &zstdBuffer, + } + + return &writer, nil +} + +// Converts log events to Zstd compressed IR and outputs to the Zstd buffer. +// +// Parameters: +// - logEvents: A slice of log events to be encoded +// +// Returns: +// - err: Error writing IR/Zstd, error flushing buffers +func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) error { + err := writeIr(w.irWriter, logEvents) + if err != nil { + return err + } + + _, err = w.irWriter.WriteTo(w.zstdWriter) + return err +} + +// Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding. After +// calling close, +// [Writer] must be reset prior to calling write. +// +// Returns: +// - err: Error flushing/closing buffers +func (w *memoryWriter) CloseStreams() error { + _, err := w.irWriter.CloseTo(w.zstdWriter) + if err != nil { + return err + } + + w.irWriter = nil + + err = w.zstdWriter.Close() + return err +} + +// Reinitialize [Writer] after calling [CloseStreams]. Resets individual IR and Zstd writers and +// associated buffers. +// +// Returns: +// - err: Error opening IR writer, error IR buffer not empty, error with type assertion +func (w *memoryWriter) Reset() error { + var err error + w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone) + if err != nil { + return err + } + + w.zstdBuffer.Reset() + w.zstdWriter.Reset(w.zstdBuffer) + return nil +} + +// Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write. +// Therefore, cannot keep track of size with variable as implemented for IR with [IrTotalBytes]. +// Instead, calling Len() on buffer. +// +// Returns: +// - size: Bytes written +// - err: Error called with useDiskBuffer off, error calling stat +func (w *memoryWriter) GetOutputSize() (int, error) { + return w.zstdBuffer.Len(), nil + +} + +// Closes [MemoryWriter]. Currently used during recovery only, and advise caution using elsewhere. +// Using [ir.Writer.Serializer.Close] instead of [ir.Writer.Close] so EndofStream byte is not +// added. It is preferable to add postamble on recovery so that IR is in the same state +// (i.e. not terminated) for an abrupt crash and a graceful exit. Function does not call +// [zstd.Encoder.Close] as it does not explicitly free memory and may add undesirable null frame. +// +// Returns: +// - err: Error closing irWriter, error closing files +func (w *memoryWriter) Close() error { + if w.irWriter != nil { + err := w.irWriter.Serializer.Close() + if err != nil { + return fmt.Errorf("error could not close irWriter: %w", err) + } + } + + return nil +} + +// Getter for Zstd Output. +// Returns: +// - zstdOutput: Reader for writer Zstd output +func (w *memoryWriter) GetZstdOutput() io.Reader { + return w.zstdBuffer +} \ No newline at end of file diff --git a/internal/outctx/context.go b/internal/outctx/context.go index b8dfb21..6f3eb67 100644 --- a/internal/outctx/context.go +++ b/internal/outctx/context.go @@ -187,7 +187,7 @@ func (ctx *S3Context) newEventManager( size int, ) (*EventManager, error) { var err error - var writer *irzstd.Writer + var writer irzstd.Writer if ctx.Config.UseDiskBuffer { irPath, zstdPath := ctx.GetBufferFilePaths(tag) @@ -198,7 +198,7 @@ func (ctx *S3Context) newEventManager( zstdPath, ) } else { - writer, err = irzstd.NewMemWriter(ctx.Config.TimeZone, size) + writer, err = irzstd.NewMemoryWriter(ctx.Config.TimeZone, size) } if err != nil { diff --git a/internal/outctx/manager.go b/internal/outctx/manager.go index 8bb0653..0ecf85a 100644 --- a/internal/outctx/manager.go +++ b/internal/outctx/manager.go @@ -22,7 +22,7 @@ const s3TagKey = "fluentBitTag" type EventManager struct { Tag string Index int - Writer *irzstd.Writer + Writer irzstd.Writer } // Sends Zstd buffer to s3 and reset writer and buffers for future uploads. Prior to upload, diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index aa28607..9976bdf 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -51,6 +51,7 @@ func Ingest(data unsafe.Pointer, size int, tag string, ctx *outctx.S3Context) (i uploadCriteriaMet, err := checkUploadCriteriaMet( eventManager, + ctx.Config.UseDiskBuffer, ctx.Config.UploadSizeMb, ) if err != nil { @@ -181,12 +182,12 @@ func getMessage(jsonRecord []byte, config outctx.S3Config) (string, error) { // Returns: // - readyToUpload: Boolean if upload criteria met or not // - err: Error getting Zstd buffer size -func checkUploadCriteriaMet(eventManager *outctx.EventManager, uploadSizeMb int) (bool, error) { - if !eventManager.Writer.GetUseDiskBuffer() { +func checkUploadCriteriaMet(eventManager *outctx.EventManager, useDiskBuffer bool, uploadSizeMb int) (bool, error) { + if !useDiskBuffer { return true, nil } - _, bufferSize, err := eventManager.Writer.GetFileSizes() + bufferSize, err := eventManager.Writer.GetOutputSize() if err != nil { return false, fmt.Errorf("error could not get size of buffer: %w", err) } From 3ff80f5ba419f1095b3250a11be1b977dad26727 Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Tue, 13 Aug 2024 18:21:45 +0000 Subject: [PATCH 04/13] refactor --- internal/irzstd/disk.go | 152 ++++++++---------- internal/irzstd/memory.go | 51 +++--- plugins/out_clp_s3/internal/flush/flush.go | 8 +- .../out_clp_s3/internal/recovery/recovery.go | 4 - 4 files changed, 102 insertions(+), 113 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 35ff90a..2a9ab2e 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -13,37 +13,36 @@ import ( "github.com/y-scope/clp-ffi-go/ir" ) -// 2 MB threshold to buffer IR before compressing to Zstd when use_disk_buffer is on. +// 2 MB threshold to buffer IR before compressing to Zstd. const irSizeThreshold = 2 << 20 -// Converts log events into Zstd compressed IR using -// "trash compactor" design. Log events are converted to uncompressed IR and buffered into "bins". -// Uncompressed IR represents uncompressed trash in "trash compactor". Once the bin is full, the -// bin is "compacted" into its own separate Zstd frame. The compressor is explicitly closed after -// receiving input terminating the Zstd frame. Stacks of Zstd frames are then sent to S3. For -// majority of runtime, log events are stored as a mixture uncompressed IR and compressed -// Zstd frames. A simpler approach would be to send all the events for one S3 upload to the -// streaming compressor and only close the stream when the upload size is reached. However, the -// streaming compressor will keep frames/blocks open in between receipt of Fluent Bit chunks. Open -// frames/blocks may not be recoverable after an abrupt crash. Closed frames on the other hand are -// valid Zstd and can be send to s3 on startup. It is not explicitly necessary to buffer IR into -// "bins" (i.e. Fluent Bit chunks could be directly "compacted"); however, if the chunks are -// small, the compression ratio would deteriorate. "Trash compactor" design provides protection from -// log loss during abrupt crashes and maintains a high compression ratio. +// Converts log events into Zstd compressed IR using "trash compactor" design. Log events are +// converted to uncompressed IR and buffered into "bins". Uncompressed IR represents uncompressed +// trash in "trash compactor". Once the bin is full, the bin is "compacted" into its own separate +// Zstd frame. The compressor is explicitly closed after receiving input terminating the Zstd frame. +// Stacks of Zstd frames are then sent to S3. For majority of runtime, log events are stored as a +// mixture uncompressed IR and compressed Zstd frames. A simpler approach would be to send all the +// events for one S3 upload to the streaming compressor and only close the stream when the upload +// size is reached. However, the streaming compressor will keep frames/blocks open in between +// receipt of Fluent Bit chunks. Open frames/blocks may not be recoverable after an abrupt crash. +// Closed frames on the other hand are valid Zstd and can be send to s3 on startup. It is not +// explicitly necessary to buffer IR into "bins" (i.e. Fluent Bit chunks could be directly +// "compacted"); however, if the chunks are small, the compression ratio would deteriorate. "Trash +// compactor" design provides protection from log loss during abrupt crashes and maintains a high +// compression ratio. type diskWriter struct { - useDiskBuffer bool - irPath string // Path variable for debugging - zstdPath string // Path variable for debugging - irFile *os.File - zstdFile *os.File - irWriter *ir.Writer - size int - timezone string - irTotalBytes int - zstdWriter *zstd.Encoder + irPath string // Path variable for debugging + zstdPath string // Path variable for debugging + irFile *os.File + zstdFile *os.File + irWriter *ir.Writer + size int + timezone string + irTotalBytes int + zstdWriter *zstd.Encoder } -// Opens a new [Writer] using disk files for IR and Zstd buffers. For use when use_disk_store +// Opens a new [diskWriter] using files for IR and Zstd buffers. For use when use_disk_store // is on. // // Parameters: @@ -53,14 +52,14 @@ type diskWriter struct { // - zstdPath: Path to Zstd disk buffer file // // Returns: -// - Writer: Writer for Zstd compressed IR +// - diskWriter: Disk writer for Zstd compressed IR // - err: Error creating new buffers, error opening Zstd/IR writers func NewDiskWriter( timezone string, size int, irPath string, zstdPath string, -) (Writer, error) { +) (*diskWriter, error) { irFile, zstdFile, err := newFileBuffers(irPath, zstdPath) if err != nil { return nil, err @@ -71,22 +70,21 @@ func NewDiskWriter( return nil, err } - writer := diskWriter{ - useDiskBuffer: true, - size: size, - timezone: timezone, - irPath: irPath, - irFile: irFile, - zstdPath: zstdPath, - zstdFile: zstdFile, - irWriter: irWriter, - zstdWriter: zstdWriter, + diskWriter := diskWriter{ + size: size, + timezone: timezone, + irPath: irPath, + irFile: irFile, + zstdPath: zstdPath, + zstdFile: zstdFile, + irWriter: irWriter, + zstdWriter: zstdWriter, } - return &writer, nil + return &diskWriter, nil } -// Recovers a [Writer] opening buffer files from a previous execution of output plugin. +// Recovers a [diskWriter] opening buffer files from a previous execution of output plugin. // Recovery of files necessitates that use_disk_store is on. IR preamble is removed for // recovered store. Avoid use with empty disk stores as there will be no preamble. // @@ -97,7 +95,7 @@ func NewDiskWriter( // - zstdPath: Path to Zstd disk buffer file // // Returns: -// - Writer: Writer for Zstd compressed IR +// - diskWriter: Disk writer for Zstd compressed IR // - err: Error opening buffers, error opening Zstd/IR writers, error getting file sizes func RecoverWriter( timezone string, @@ -115,19 +113,18 @@ func RecoverWriter( return nil, err } - writer := diskWriter{ - useDiskBuffer: true, - size: size, - timezone: timezone, - irPath: irPath, - irFile: irFile, - zstdPath: zstdPath, - zstdFile: zstdFile, - irWriter: irWriter, - zstdWriter: zstdWriter, + diskWriter := diskWriter{ + size: size, + timezone: timezone, + irPath: irPath, + irFile: irFile, + zstdPath: zstdPath, + zstdFile: zstdFile, + irWriter: irWriter, + zstdWriter: zstdWriter, } - irFileSize, err := writer.getIrFileSize() + irFileSize, err := diskWriter.getIrFileSize() if err != nil { return nil, fmt.Errorf("error getting size of disk buffer: %w", err) } @@ -135,15 +132,15 @@ func RecoverWriter( // During recovery, IR buffer may not be empty, so the size must be set. In addition, // the non-empty disk buffers already have existing preamble so remove it. Disk buffer // must have non-zero size or else would be deleted in recover. - writer.irTotalBytes = irFileSize + diskWriter.irTotalBytes = irFileSize irWriter.Reset() - return &writer, nil + return &diskWriter, nil } -// Converts log events to Zstd compressed IR and outputs to the Zstd buffer. IR may be temporarily +// Converts log events to Zstd compressed IR and outputs to the Zstd buffer. IR is temporarily // stored in the IR buffer until it surpasses [irSizeThreshold] with compression to Zstd pushed out -// to a later call. See [Writer] for more specific details on behaviour. +// to a later call. See [diskWriter] for more specific details on behaviour. // // Parameters: // - logEvents: A slice of log events to be encoded @@ -175,17 +172,14 @@ func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) error { return nil } -// Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding. If -// UseDiskBuffer is true, the IR buffer is also flushed before ending stream. After calling close, -// [Writer] must be reset prior to calling write. +// Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding. +// The IR buffer is also flushed before ending stream. After calling close, +// [diskWriter] must be reset prior to calling write. // // Returns: // - err: Error flushing/closing buffers func (w *diskWriter) CloseStreams() error { - // IR buffer may not be empty, so must be flushed prior to adding trailing EndOfStream byte. If - // not using disk buffering, IR writer buffer should always be empty since it is always flushed - // to Zstd buffer on write. - + // IR buffer may not be empty, so must be flushed prior to adding trailing EndOfStream byte. err := w.flushIrBuffer() if err != nil { return fmt.Errorf("error flushing IR buffer: %w", err) @@ -211,11 +205,11 @@ func (w *diskWriter) CloseStreams() error { return nil } -// Reinitialize [Writer] after calling [CloseStreams]. Resets individual IR and Zstd writers and +// Reinitialize [diskWriter] after calling [CloseStreams]. Resets individual IR and Zstd writers and // associated buffers. // // Returns: -// - err: Error opening IR writer, error IR buffer not empty, error with type assertion +// - err: Error opening IR writer, error IR buffer not empty func (w *diskWriter) Reset() error { var err error w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone) @@ -245,15 +239,13 @@ func (w *diskWriter) Reset() error { return nil } -// Get size of IR and Zstd files. In general, can use [irTotalBytes] to track size of IR file; -// however, [irTotalBytes] will only track writes by current process and will not have info for -// recovered stores. For recovered stores, must use stat to get size. [zstd] does not provide the -// amount of bytes written with each write. Therefore, cannot keep track of size with variable as -// implemented for IR with [IrTotalBytes]. Instead, must always use stat. +// Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write. +// Therefore, cannot keep track of size with variable as implemented for IR with [IrTotalBytes]. +// Instead, must always use stat. // // Returns: -// - err: Error called with useDiskBuffer off, error calling stat -func (w *diskWriter) GetOutputSize() (int, error) { +// - err: Error calling stat +func (w *diskWriter) GetZstdOutputSize() (int, error) { zstdFileInfo, err := w.zstdFile.Stat() if err != nil { @@ -262,10 +254,10 @@ func (w *diskWriter) GetOutputSize() (int, error) { zstdFileSize := int(zstdFileInfo.Size()) - return zstdFileSize, err + return zstdFileSize, err } -// Closes [Writer]. Currently used during recovery only, and advise caution using elsewhere. +// Closes [diskWriter]. Currently used during recovery only, and advise caution using elsewhere. // Using [ir.Writer.Serializer.Close] instead of [ir.Writer.Close] so EndofStream byte is not // added. It is preferable to add postamble on recovery so that IR is in the same state // (i.e. not terminated) for an abrupt crash and a graceful exit. Function does not call @@ -296,7 +288,7 @@ func (w *diskWriter) Close() error { // Getter for Zstd Output. // Returns: -// - zstdOutput: Reader for writer Zstd output +// - zstdOutput: Reader for Zstd output func (w *diskWriter) GetZstdOutput() io.Reader { return w.zstdFile } @@ -305,7 +297,7 @@ func (w *diskWriter) GetZstdOutput() io.Reader { // Returns: // - useDiskBuffer: On/off for disk buffering func (w *diskWriter) GetUseDiskBuffer() bool { - return w.useDiskBuffer + return true } // Compresses contents of the IR buffer and outputs it to the Zstd buffer. The IR buffer is then @@ -442,14 +434,12 @@ func openBufferFiles(irPath string, zstdPath string) (*os.File, *os.File, error) return irFile, zstdFile, nil } -// Get size of IR and Zstd files. In general, can use [irTotalBytes] to track size of IR file; +// Get size of IR file. In general, can use [irTotalBytes] to track size of IR file; // however, [irTotalBytes] will only track writes by current process and will not have info for -// recovered stores. For recovered stores, must use stat to get size. [zstd] does not provide the -// amount of bytes written with each write. Therefore, cannot keep track of size with variable as -// implemented for IR with [IrTotalBytes]. Instead, must always use stat. +// recovered stores. // // Returns: -// - err: Error called with useDiskBuffer off, error calling stat +// - err: Error calling stat func (w *diskWriter) getIrFileSize() (int, error) { irFileInfo, err := w.irFile.Stat() if err != nil { diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 8b8248a..7b2f8b7 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -11,7 +11,7 @@ import ( "github.com/y-scope/clp-ffi-go/ir" ) -// Converts log events into Zstd compressed IR . Log events provided to writer are immediately +// Converts log events into Zstd compressed IR. Log events provided to writer are immediately // converted to Zstd compressed IR and stored in [MemoryWriter.ZstdBuffer]. After the Zstd buffer // receives logs, they are immediately sent to s3. type memoryWriter struct { @@ -22,7 +22,7 @@ type memoryWriter struct { zstdWriter *zstd.Encoder } -// Opens a new [MemoryWriter] with a memory buffer for Zstd Output. For use when use_disk_store is +// Opens a new [MemoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is // off. // // Parameters: @@ -32,14 +32,14 @@ type memoryWriter struct { // Returns: // - Writer: Writer for Zstd compressed IR // - err: Error opening Zstd/IR writers -func NewMemoryWriter(timezone string, size int) (Writer, error) { +func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) { var zstdBuffer bytes.Buffer irWriter, zstdWriter, err := newIrZstdWriters(&zstdBuffer, timezone, size) if err != nil { return nil, err } - writer := memoryWriter{ + memoryWriter := memoryWriter{ size: size, timezone: timezone, irWriter: irWriter, @@ -47,7 +47,7 @@ func NewMemoryWriter(timezone string, size int) (Writer, error) { zstdBuffer: &zstdBuffer, } - return &writer, nil + return &memoryWriter, nil } // Converts log events to Zstd compressed IR and outputs to the Zstd buffer. @@ -56,7 +56,7 @@ func NewMemoryWriter(timezone string, size int) (Writer, error) { // - logEvents: A slice of log events to be encoded // // Returns: -// - err: Error writing IR/Zstd, error flushing buffers +// - err: Error writing IR/Zstd func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) error { err := writeIr(w.irWriter, logEvents) if err != nil { @@ -68,11 +68,10 @@ func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) error { } // Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding. After -// calling close, -// [Writer] must be reset prior to calling write. +// calling close, [memoryWriter] must be reset prior to calling write. // // Returns: -// - err: Error flushing/closing buffers +// - err: Error closing buffers func (w *memoryWriter) CloseStreams() error { _, err := w.irWriter.CloseTo(w.zstdWriter) if err != nil { @@ -85,11 +84,11 @@ func (w *memoryWriter) CloseStreams() error { return err } -// Reinitialize [Writer] after calling [CloseStreams]. Resets individual IR and Zstd writers and +// Reinitialize [memoryWriter] after calling [CloseStreams]. Resets individual IR and Zstd writers and // associated buffers. // // Returns: -// - err: Error opening IR writer, error IR buffer not empty, error with type assertion +// - err: Error opening IR writer func (w *memoryWriter) Reset() error { var err error w.irWriter, err = ir.NewWriterSize[ir.FourByteEncoding](w.size, w.timezone) @@ -102,16 +101,30 @@ func (w *memoryWriter) Reset() error { return nil } +// Getter for useDiskBuffer. +// +// Returns: +// - useDiskBuffer: On/off for disk buffering +func (w *memoryWriter) GetUseDiskBuffer() bool { + return false +} + +// Getter for Zstd Output. +// +// Returns: +// - zstdOutput: Reader for Zstd output +func (w *memoryWriter) GetZstdOutput() io.Reader { + return w.zstdBuffer +} + // Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write. -// Therefore, cannot keep track of size with variable as implemented for IR with [IrTotalBytes]. // Instead, calling Len() on buffer. // // Returns: // - size: Bytes written -// - err: Error called with useDiskBuffer off, error calling stat -func (w *memoryWriter) GetOutputSize() (int, error) { +// - err: nil error to comply with interface +func (w *memoryWriter) GetZstdOutputSize() (int, error) { return w.zstdBuffer.Len(), nil - } // Closes [MemoryWriter]. Currently used during recovery only, and advise caution using elsewhere. @@ -129,13 +142,5 @@ func (w *memoryWriter) Close() error { return fmt.Errorf("error could not close irWriter: %w", err) } } - return nil -} - -// Getter for Zstd Output. -// Returns: -// - zstdOutput: Reader for writer Zstd output -func (w *memoryWriter) GetZstdOutput() io.Reader { - return w.zstdBuffer } \ No newline at end of file diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 9976bdf..f93dde7 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -51,7 +51,6 @@ func Ingest(data unsafe.Pointer, size int, tag string, ctx *outctx.S3Context) (i uploadCriteriaMet, err := checkUploadCriteriaMet( eventManager, - ctx.Config.UseDiskBuffer, ctx.Config.UploadSizeMb, ) if err != nil { @@ -176,18 +175,17 @@ func getMessage(jsonRecord []byte, config outctx.S3Config) (string, error) { // // Parameters: // - eventManager: Manager for Fluent Bit events with the same tag -// - useDiskBuffer: On/off for disk buffering // - uploadSizeMb: S3 upload size in MB // // Returns: // - readyToUpload: Boolean if upload criteria met or not // - err: Error getting Zstd buffer size -func checkUploadCriteriaMet(eventManager *outctx.EventManager, useDiskBuffer bool, uploadSizeMb int) (bool, error) { - if !useDiskBuffer { +func checkUploadCriteriaMet(eventManager *outctx.EventManager, uploadSizeMb int) (bool, error) { + if !eventManager.Writer.GetUseDiskBuffer() { return true, nil } - bufferSize, err := eventManager.Writer.GetOutputSize() + bufferSize, err := eventManager.Writer.GetZstdOutputSize() if err != nil { return false, fmt.Errorf("error could not get size of buffer: %w", err) } diff --git a/plugins/out_clp_s3/internal/recovery/recovery.go b/plugins/out_clp_s3/internal/recovery/recovery.go index a4fcff2..2683506 100644 --- a/plugins/out_clp_s3/internal/recovery/recovery.go +++ b/plugins/out_clp_s3/internal/recovery/recovery.go @@ -24,10 +24,6 @@ import ( // Returns: // - err: Error closing file func GracefulExit(ctx *outctx.S3Context) error { - if !ctx.Config.UseDiskBuffer { - return nil - } - for _, eventManager := range ctx.EventManagers { err := eventManager.Writer.Close() if err != nil { From 9f9f20eba67e0811cba045ab75078d166c0de8d0 Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Tue, 13 Aug 2024 18:49:16 +0000 Subject: [PATCH 05/13] refactor --- internal/irzstd/disk.go | 63 ++++++++++++++++++++------------------- internal/irzstd/memory.go | 6 ++-- 2 files changed, 36 insertions(+), 33 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 2a9ab2e..1ce7cd2 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -102,7 +102,7 @@ func RecoverWriter( size int, irPath string, zstdPath string, -) (Writer, error) { +) (*diskWriter, error) { irFile, zstdFile, err := openBufferFiles(irPath, zstdPath) if err != nil { return nil, fmt.Errorf("error opening files: %w", err) @@ -126,7 +126,7 @@ func RecoverWriter( irFileSize, err := diskWriter.getIrFileSize() if err != nil { - return nil, fmt.Errorf("error getting size of disk buffer: %w", err) + return nil, fmt.Errorf("error getting size of IR file: %w", err) } // During recovery, IR buffer may not be empty, so the size must be set. In addition, @@ -138,8 +138,8 @@ func RecoverWriter( return &diskWriter, nil } -// Converts log events to Zstd compressed IR and outputs to the Zstd buffer. IR is temporarily -// stored in the IR buffer until it surpasses [irSizeThreshold] with compression to Zstd pushed out +// Converts log events to Zstd compressed IR and outputs to the Zstd file. IR is temporarily +// stored in the IR file until it surpasses [irSizeThreshold] with compression to Zstd pushed out // to a later call. See [diskWriter] for more specific details on behaviour. // // Parameters: @@ -205,7 +205,7 @@ func (w *diskWriter) CloseStreams() error { return nil } -// Reinitialize [diskWriter] after calling [CloseStreams]. Resets individual IR and Zstd writers and +// Reinitialize [diskWriter] after calling CloseStreams(). Resets individual IR and Zstd writers and // associated buffers. // // Returns: @@ -239,24 +239,6 @@ func (w *diskWriter) Reset() error { return nil } -// Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write. -// Therefore, cannot keep track of size with variable as implemented for IR with [IrTotalBytes]. -// Instead, must always use stat. -// -// Returns: -// - err: Error calling stat -func (w *diskWriter) GetZstdOutputSize() (int, error) { - - zstdFileInfo, err := w.zstdFile.Stat() - if err != nil { - return 0, err - } - - zstdFileSize := int(zstdFileInfo.Size()) - - return zstdFileSize, err -} - // Closes [diskWriter]. Currently used during recovery only, and advise caution using elsewhere. // Using [ir.Writer.Serializer.Close] instead of [ir.Writer.Close] so EndofStream byte is not // added. It is preferable to add postamble on recovery so that IR is in the same state @@ -286,25 +268,46 @@ func (w *diskWriter) Close() error { return nil } +// Getter for useDiskBuffer. +// +// Returns: +// - useDiskBuffer: On/off for disk buffering +func (w *diskWriter) GetUseDiskBuffer() bool { + return true +} + + // Getter for Zstd Output. +// // Returns: // - zstdOutput: Reader for Zstd output func (w *diskWriter) GetZstdOutput() io.Reader { return w.zstdFile } -// Getter for useDiskBuffer. +// Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write. +// Therefore, cannot keep track of size with variable as implemented for IR with [IrTotalBytes]. +// Instead, must always use stat. +// // Returns: -// - useDiskBuffer: On/off for disk buffering -func (w *diskWriter) GetUseDiskBuffer() bool { - return true +// - err: Error calling stat +func (w *diskWriter) GetZstdOutputSize() (int, error) { + + zstdFileInfo, err := w.zstdFile.Stat() + if err != nil { + return 0, err + } + + zstdFileSize := int(zstdFileInfo.Size()) + + return zstdFileSize, err } -// Compresses contents of the IR buffer and outputs it to the Zstd buffer. The IR buffer is then -// reset. +// Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then +// truncated. // // Returns: -// - err: Error nil buffer, error from Zstd Encoder, error from operations on file +// - err: Error nil files, error from Zstd Encoder, error from operations on file func (w *diskWriter) flushIrBuffer() error { if (w.irFile == nil) || (w.zstdFile == nil) { return fmt.Errorf("error flush called with non-existent buffer") diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 7b2f8b7..0d6b3e6 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -22,7 +22,7 @@ type memoryWriter struct { zstdWriter *zstd.Encoder } -// Opens a new [MemoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is +// Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is // off. // // Parameters: @@ -84,7 +84,7 @@ func (w *memoryWriter) CloseStreams() error { return err } -// Reinitialize [memoryWriter] after calling [CloseStreams]. Resets individual IR and Zstd writers and +// Reinitialize [memoryWriter] after calling CloseStreams(). Resets individual IR and Zstd writers and // associated buffers. // // Returns: @@ -127,7 +127,7 @@ func (w *memoryWriter) GetZstdOutputSize() (int, error) { return w.zstdBuffer.Len(), nil } -// Closes [MemoryWriter]. Currently used during recovery only, and advise caution using elsewhere. +// Closes [memoryWriter]. Currently used during recovery only, and advise caution using elsewhere. // Using [ir.Writer.Serializer.Close] instead of [ir.Writer.Close] so EndofStream byte is not // added. It is preferable to add postamble on recovery so that IR is in the same state // (i.e. not terminated) for an abrupt crash and a graceful exit. Function does not call From a439aca25162a68f53236d3184665a590d381fdd Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Tue, 13 Aug 2024 18:55:18 +0000 Subject: [PATCH 06/13] refactor --- internal/irzstd/disk.go | 2 -- internal/irzstd/memory.go | 18 +++++++++--------- plugins/out_clp_s3/internal/flush/flush.go | 2 +- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 1ce7cd2..b56f73e 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -276,7 +276,6 @@ func (w *diskWriter) GetUseDiskBuffer() bool { return true } - // Getter for Zstd Output. // // Returns: @@ -292,7 +291,6 @@ func (w *diskWriter) GetZstdOutput() io.Reader { // Returns: // - err: Error calling stat func (w *diskWriter) GetZstdOutputSize() (int, error) { - zstdFileInfo, err := w.zstdFile.Stat() if err != nil { return 0, err diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 0d6b3e6..82e527e 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -15,11 +15,11 @@ import ( // converted to Zstd compressed IR and stored in [MemoryWriter.ZstdBuffer]. After the Zstd buffer // receives logs, they are immediately sent to s3. type memoryWriter struct { - zstdBuffer *bytes.Buffer - irWriter *ir.Writer - size int - timezone string - zstdWriter *zstd.Encoder + zstdBuffer *bytes.Buffer + irWriter *ir.Writer + size int + timezone string + zstdWriter *zstd.Encoder } // Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is @@ -84,8 +84,8 @@ func (w *memoryWriter) CloseStreams() error { return err } -// Reinitialize [memoryWriter] after calling CloseStreams(). Resets individual IR and Zstd writers and -// associated buffers. +// Reinitialize [memoryWriter] after calling CloseStreams(). Resets individual IR and Zstd writers +// and associated buffers. // // Returns: // - err: Error opening IR writer @@ -121,7 +121,7 @@ func (w *memoryWriter) GetZstdOutput() io.Reader { // Instead, calling Len() on buffer. // // Returns: -// - size: Bytes written +// - size: Bytes written // - err: nil error to comply with interface func (w *memoryWriter) GetZstdOutputSize() (int, error) { return w.zstdBuffer.Len(), nil @@ -143,4 +143,4 @@ func (w *memoryWriter) Close() error { } } return nil -} \ No newline at end of file +} diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index f93dde7..54fdd32 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -180,7 +180,7 @@ func getMessage(jsonRecord []byte, config outctx.S3Config) (string, error) { // Returns: // - readyToUpload: Boolean if upload criteria met or not // - err: Error getting Zstd buffer size -func checkUploadCriteriaMet(eventManager *outctx.EventManager, uploadSizeMb int) (bool, error) { +func checkUploadCriteriaMet(eventManager *outctx.EventManager, uploadSizeMb int) (bool, error) { if !eventManager.Writer.GetUseDiskBuffer() { return true, nil } From 7695e5f2c02c66044401bc0a506fcc66b5534b05 Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Tue, 13 Aug 2024 19:01:45 +0000 Subject: [PATCH 07/13] refactor --- internal/irzstd/memory.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 82e527e..2f6674f 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -12,7 +12,7 @@ import ( ) // Converts log events into Zstd compressed IR. Log events provided to writer are immediately -// converted to Zstd compressed IR and stored in [MemoryWriter.ZstdBuffer]. After the Zstd buffer +// converted to Zstd compressed IR and stored in [memoryWriter.ZstdBuffer]. After the Zstd buffer // receives logs, they are immediately sent to s3. type memoryWriter struct { zstdBuffer *bytes.Buffer @@ -30,7 +30,7 @@ type memoryWriter struct { // - size: Byte length // // Returns: -// - Writer: Writer for Zstd compressed IR +// - memoryWriter: Memory writer for Zstd compressed IR // - err: Error opening Zstd/IR writers func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) { var zstdBuffer bytes.Buffer From c9a93c3dc76ac35ee2ae547b80488d75ed8c8ffe Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Tue, 13 Aug 2024 19:21:24 +0000 Subject: [PATCH 08/13] add missing file --- internal/irzstd/irzstd.go | 112 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 internal/irzstd/irzstd.go diff --git a/internal/irzstd/irzstd.go b/internal/irzstd/irzstd.go new file mode 100644 index 0000000..3a06d3e --- /dev/null +++ b/internal/irzstd/irzstd.go @@ -0,0 +1,112 @@ +// Package implements writer that converts log events to Zstd compressed IR. Effectively chaining +// together [ir.Writer] and [zstd.Encoder] in series. + +package irzstd + +import ( + "fmt" + "io" + + "github.com/klauspost/compress/zstd" + + "github.com/y-scope/clp-ffi-go/ffi" + "github.com/y-scope/clp-ffi-go/ir" +) + +type Writer interface { + // Converts log events to Zstd compressed IR and outputs to the Zstd buffer. + // + // Parameters: + // - logEvents: A slice of log events to be encoded + // + // Returns: + // - err + WriteIrZstd([]ffi.LogEvent) error + + // Closes IR stream and Zstd frame. After calling close, Writer must be Reset() prior to calling + // write. + // + // Returns: + // - err + CloseStreams() error + + // Closes Writer. + // + // Returns: + // - err + Close() error + + // Reinitialize Writer after calling CloseStreams(). + // + // Returns: + // - err + Reset() error + + // Getter for useDiskBuffer. + // + // Returns: + // - useDiskBuffer: On/off for disk buffering + GetUseDiskBuffer() bool + + // Getter for Zstd Output. + // + // Returns: + // - zstdOutput: Reader for Zstd output + GetZstdOutput() io.Reader + + // Get size of Zstd output. + // + // Returns: + // - size: Bytes written + // - err + GetZstdOutputSize() (int, error) +} + +// Writes log events to a IR Writer. +// +// Parameters: +// - irWriter: CLP IR writer to write each log event with +// - logEvents: A slice of log events to be encoded +// +// Returns: +// - err: error if an event could not be written +func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) error { + for _, event := range logEvents { + _, err := irWriter.Write(event) + if err != nil { + err = fmt.Errorf("failed to encode event %v into ir: %w", event, err) + return err + } + } + return nil +} + +// Opens a new [ir.Writer] and [zstd.Encoder]. +// +// Parameters: +// - zstdOutput: Output location for Zstd +// - timezone: Time zone of the log source +// - size: Byte length +// +// Returns: +// - irWriter: Writer for CLP IR +// - ZstdWriter: Writer for Zstd +// - err: Error opening IR/Zstd writer +func newIrZstdWriters( + zstdOutput io.Writer, + timezone string, + size int, +) (*ir.Writer, *zstd.Encoder, error) { + // IR buffer using bytes.Buffer internally, so it will dynamically grow if undersized. Using + // FourByteEncoding as default encoding. + irWriter, err := ir.NewWriterSize[ir.FourByteEncoding](size, timezone) + if err != nil { + return nil, nil, fmt.Errorf("error opening IR writer: %w", err) + } + + zstdWriter, err := zstd.NewWriter(zstdOutput) + if err != nil { + return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err) + } + return irWriter, zstdWriter, err +} From 15cef304893b28c94475ec4a69a5548b00f72628 Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Tue, 13 Aug 2024 19:23:37 +0000 Subject: [PATCH 09/13] rename file so git tracking is better --- internal/irzstd/writer.go | 112 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 internal/irzstd/writer.go diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go new file mode 100644 index 0000000..3a06d3e --- /dev/null +++ b/internal/irzstd/writer.go @@ -0,0 +1,112 @@ +// Package implements writer that converts log events to Zstd compressed IR. Effectively chaining +// together [ir.Writer] and [zstd.Encoder] in series. + +package irzstd + +import ( + "fmt" + "io" + + "github.com/klauspost/compress/zstd" + + "github.com/y-scope/clp-ffi-go/ffi" + "github.com/y-scope/clp-ffi-go/ir" +) + +type Writer interface { + // Converts log events to Zstd compressed IR and outputs to the Zstd buffer. + // + // Parameters: + // - logEvents: A slice of log events to be encoded + // + // Returns: + // - err + WriteIrZstd([]ffi.LogEvent) error + + // Closes IR stream and Zstd frame. After calling close, Writer must be Reset() prior to calling + // write. + // + // Returns: + // - err + CloseStreams() error + + // Closes Writer. + // + // Returns: + // - err + Close() error + + // Reinitialize Writer after calling CloseStreams(). + // + // Returns: + // - err + Reset() error + + // Getter for useDiskBuffer. + // + // Returns: + // - useDiskBuffer: On/off for disk buffering + GetUseDiskBuffer() bool + + // Getter for Zstd Output. + // + // Returns: + // - zstdOutput: Reader for Zstd output + GetZstdOutput() io.Reader + + // Get size of Zstd output. + // + // Returns: + // - size: Bytes written + // - err + GetZstdOutputSize() (int, error) +} + +// Writes log events to a IR Writer. +// +// Parameters: +// - irWriter: CLP IR writer to write each log event with +// - logEvents: A slice of log events to be encoded +// +// Returns: +// - err: error if an event could not be written +func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) error { + for _, event := range logEvents { + _, err := irWriter.Write(event) + if err != nil { + err = fmt.Errorf("failed to encode event %v into ir: %w", event, err) + return err + } + } + return nil +} + +// Opens a new [ir.Writer] and [zstd.Encoder]. +// +// Parameters: +// - zstdOutput: Output location for Zstd +// - timezone: Time zone of the log source +// - size: Byte length +// +// Returns: +// - irWriter: Writer for CLP IR +// - ZstdWriter: Writer for Zstd +// - err: Error opening IR/Zstd writer +func newIrZstdWriters( + zstdOutput io.Writer, + timezone string, + size int, +) (*ir.Writer, *zstd.Encoder, error) { + // IR buffer using bytes.Buffer internally, so it will dynamically grow if undersized. Using + // FourByteEncoding as default encoding. + irWriter, err := ir.NewWriterSize[ir.FourByteEncoding](size, timezone) + if err != nil { + return nil, nil, fmt.Errorf("error opening IR writer: %w", err) + } + + zstdWriter, err := zstd.NewWriter(zstdOutput) + if err != nil { + return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err) + } + return irWriter, zstdWriter, err +} From 15b6e01300f5dabcdcd91b5a60d4503811ffedba Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Tue, 13 Aug 2024 19:25:21 +0000 Subject: [PATCH 10/13] delete old file --- internal/irzstd/irzstd.go | 112 -------------------------------------- 1 file changed, 112 deletions(-) delete mode 100644 internal/irzstd/irzstd.go diff --git a/internal/irzstd/irzstd.go b/internal/irzstd/irzstd.go deleted file mode 100644 index 3a06d3e..0000000 --- a/internal/irzstd/irzstd.go +++ /dev/null @@ -1,112 +0,0 @@ -// Package implements writer that converts log events to Zstd compressed IR. Effectively chaining -// together [ir.Writer] and [zstd.Encoder] in series. - -package irzstd - -import ( - "fmt" - "io" - - "github.com/klauspost/compress/zstd" - - "github.com/y-scope/clp-ffi-go/ffi" - "github.com/y-scope/clp-ffi-go/ir" -) - -type Writer interface { - // Converts log events to Zstd compressed IR and outputs to the Zstd buffer. - // - // Parameters: - // - logEvents: A slice of log events to be encoded - // - // Returns: - // - err - WriteIrZstd([]ffi.LogEvent) error - - // Closes IR stream and Zstd frame. After calling close, Writer must be Reset() prior to calling - // write. - // - // Returns: - // - err - CloseStreams() error - - // Closes Writer. - // - // Returns: - // - err - Close() error - - // Reinitialize Writer after calling CloseStreams(). - // - // Returns: - // - err - Reset() error - - // Getter for useDiskBuffer. - // - // Returns: - // - useDiskBuffer: On/off for disk buffering - GetUseDiskBuffer() bool - - // Getter for Zstd Output. - // - // Returns: - // - zstdOutput: Reader for Zstd output - GetZstdOutput() io.Reader - - // Get size of Zstd output. - // - // Returns: - // - size: Bytes written - // - err - GetZstdOutputSize() (int, error) -} - -// Writes log events to a IR Writer. -// -// Parameters: -// - irWriter: CLP IR writer to write each log event with -// - logEvents: A slice of log events to be encoded -// -// Returns: -// - err: error if an event could not be written -func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) error { - for _, event := range logEvents { - _, err := irWriter.Write(event) - if err != nil { - err = fmt.Errorf("failed to encode event %v into ir: %w", event, err) - return err - } - } - return nil -} - -// Opens a new [ir.Writer] and [zstd.Encoder]. -// -// Parameters: -// - zstdOutput: Output location for Zstd -// - timezone: Time zone of the log source -// - size: Byte length -// -// Returns: -// - irWriter: Writer for CLP IR -// - ZstdWriter: Writer for Zstd -// - err: Error opening IR/Zstd writer -func newIrZstdWriters( - zstdOutput io.Writer, - timezone string, - size int, -) (*ir.Writer, *zstd.Encoder, error) { - // IR buffer using bytes.Buffer internally, so it will dynamically grow if undersized. Using - // FourByteEncoding as default encoding. - irWriter, err := ir.NewWriterSize[ir.FourByteEncoding](size, timezone) - if err != nil { - return nil, nil, fmt.Errorf("error opening IR writer: %w", err) - } - - zstdWriter, err := zstd.NewWriter(zstdOutput) - if err != nil { - return nil, nil, fmt.Errorf("error opening Zstd writer: %w", err) - } - return irWriter, zstdWriter, err -} From b47ba7860a1262f9e8a426ae01eb4b8736be69b1 Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Thu, 15 Aug 2024 21:31:51 +0000 Subject: [PATCH 11/13] made change so write returns number of events --- internal/irzstd/disk.go | 13 +++++++------ internal/irzstd/memory.go | 9 +++++---- internal/irzstd/writer.go | 14 +++++++++----- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index b56f73e..0d1cf59 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -146,16 +146,17 @@ func RecoverWriter( // - logEvents: A slice of log events to be encoded // // Returns: +// - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd, error flushing buffers -func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) error { - err := writeIr(w.irWriter, logEvents) +func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { + numEvents, err := writeIr(w.irWriter, logEvents) if err != nil { - return err + return numEvents, err } numBytes, err := w.irWriter.WriteTo(w.irFile) if err != nil { - return err + return numEvents, err } w.irTotalBytes += int(numBytes) @@ -165,11 +166,11 @@ func (w *diskWriter) WriteIrZstd(logEvents []ffi.LogEvent) error { if w.irTotalBytes >= irSizeThreshold { err := w.flushIrBuffer() if err != nil { - return fmt.Errorf("error flushing IR buffer: %w", err) + return numEvents, fmt.Errorf("error flushing IR buffer: %w", err) } } - return nil + return numEvents, nil } // Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding. diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 2f6674f..5d8816b 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -56,15 +56,16 @@ func NewMemoryWriter(timezone string, size int) (*memoryWriter, error) { // - logEvents: A slice of log events to be encoded // // Returns: +// - numEvents: Number of log events successfully written to IR writer buffer // - err: Error writing IR/Zstd -func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) error { - err := writeIr(w.irWriter, logEvents) +func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { + numEvents, err := writeIr(w.irWriter, logEvents) if err != nil { - return err + return numEvents, err } _, err = w.irWriter.WriteTo(w.zstdWriter) - return err + return numEvents, err } // Closes IR stream and Zstd frame. Add trailing byte(s) required for IR/Zstd decoding. After diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index 3a06d3e..eb43758 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -20,8 +20,9 @@ type Writer interface { // - logEvents: A slice of log events to be encoded // // Returns: + // - numEvents: Number of log events successfully written to IR writer buffer // - err - WriteIrZstd([]ffi.LogEvent) error + WriteIrZstd([]ffi.LogEvent) (int, error) // Closes IR stream and Zstd frame. After calling close, Writer must be Reset() prior to calling // write. @@ -69,16 +70,19 @@ type Writer interface { // - logEvents: A slice of log events to be encoded // // Returns: -// - err: error if an event could not be written -func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) error { +// - numEvents: Number of log events successfully written to IR writer buffer +// - err: Error if an event could not be written +func writeIr(irWriter *ir.Writer, logEvents []ffi.LogEvent) (int, error) { + var numEvents int for _, event := range logEvents { _, err := irWriter.Write(event) if err != nil { err = fmt.Errorf("failed to encode event %v into ir: %w", event, err) - return err + return numEvents, err } + numEvents += 1 } - return nil + return numEvents, nil } // Opens a new [ir.Writer] and [zstd.Encoder]. From 09da6b9f76250604c978ed7dec8cdbcd330b256e Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Thu, 15 Aug 2024 21:36:03 +0000 Subject: [PATCH 12/13] silence return value in flush --- plugins/out_clp_s3/internal/flush/flush.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 54fdd32..5204aac 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -44,7 +44,7 @@ func Ingest(data unsafe.Pointer, size int, tag string, ctx *outctx.S3Context) (i return output.FLB_RETRY, fmt.Errorf("error getting event manager: %w", err) } - err = eventManager.Writer.WriteIrZstd(logEvents) + _, err = eventManager.Writer.WriteIrZstd(logEvents) if err != nil { return output.FLB_ERROR, err } From 8bb2abea2e140a5ea1f97de51918c9370eee1484 Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Thu, 15 Aug 2024 21:41:59 +0000 Subject: [PATCH 13/13] add log on error --- plugins/out_clp_s3/internal/flush/flush.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 5204aac..714be4f 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -44,8 +44,14 @@ func Ingest(data unsafe.Pointer, size int, tag string, ctx *outctx.S3Context) (i return output.FLB_RETRY, fmt.Errorf("error getting event manager: %w", err) } - _, err = eventManager.Writer.WriteIrZstd(logEvents) + numEvents, err := eventManager.Writer.WriteIrZstd(logEvents) if err != nil { + log.Printf( + "Wrote %d out of %d total log events for tag %s", + numEvents, + len(logEvents), + eventManager.Tag, + ) return output.FLB_ERROR, err }