diff --git a/internal/irzstd/irzstd.go b/internal/irzstd/irzstd.go index 10c3ebc..509c3af 100644 --- a/internal/irzstd/irzstd.go +++ b/internal/irzstd/irzstd.go @@ -50,6 +50,7 @@ type Writer struct { irWriter *ir.Writer size int timezone string + tagKey string irTotalBytes int zstdWriter *zstd.Encoder } @@ -69,6 +70,7 @@ type Writer struct { func NewWriter( timezone string, size int, + tagKey string, useDiskBuffer bool, irBuffer io.ReadWriter, zstdBuffer io.ReadWriter, @@ -88,6 +90,7 @@ func NewWriter( writer := Writer{ useDiskBuffer: useDiskBuffer, size: size, + tagKey: tagKey, timezone: timezone, irBuffer: irBuffer, zstdBuffer: zstdBuffer, @@ -167,13 +170,13 @@ func (w *Writer) Close() error { return err } - zstdFile, ok := w.zstdBuffer.(*os.File) - if !ok { - return fmt.Errorf("error type assertion from buffer to os.File failed") + if !w.useDiskBuffer { + return nil } - if w.useDiskBuffer { - zstdFile.Seek(0, io.SeekStart) + err = diskBufferSeek(w.zstdBuffer, 0, io.SeekStart) + if err != nil { + return err } return nil @@ -260,16 +263,14 @@ func (w *Writer) flushIrBuffer() error { return nil } - irFile, ok := w.irBuffer.(*os.File) - if !ok { - return fmt.Errorf("error type assertion from buffer to os.File failed") - } - - log.Printf("flushing IR buffer %s", irFile.Name()) + log.Printf("flushing IR buffer %s", w.tagKey) - irFile.Seek(0, io.SeekStart) + err := diskBufferSeek(w.irBuffer, 0, io.SeekStart) + if err != nil { + return err + } - _, err := io.Copy(w.zstdWriter, w.irBuffer) + _, err = io.Copy(w.zstdWriter, w.irBuffer) if err != nil { return err } @@ -338,3 +339,26 @@ func truncateDiskBuffer(diskBuffer io.ReadWriter) error { return nil } + +// Seek for disk buffer. +// +// Parameters: +// - diskBuffer: Buffer file +// - offset: Byte offset +// - whence: Seek whence values. Either io.SeekStart, io.SeekCurrent, and io.SeekEnd. +// +// Returns: +// - err: error with type assertion, error with truncate +func diskBufferSeek(diskBuffer io.ReadWriter, offset int64, whence int) error { + file, ok := diskBuffer.(*os.File) + if !ok { + return fmt.Errorf("error type assertion from buffer to os.File failed") + } + + _, err := file.Seek(offset, whence) + if err != nil { + return fmt.Errorf("error seeking disk buffer: %w", err) + } + + return nil +} diff --git a/plugins/out_clp_s3/flush/flush.go b/plugins/out_clp_s3/flush/flush.go index 75820c8..e5fc0c0 100644 --- a/plugins/out_clp_s3/flush/flush.go +++ b/plugins/out_clp_s3/flush/flush.go @@ -113,7 +113,7 @@ func newTag( irBuffer io.ReadWriter, zstdBuffer io.ReadWriter, ) (*outctx.Tag, error) { - writer, err := irzstd.NewWriter(timezone, size, useDiskBuffer, irBuffer, zstdBuffer) + writer, err := irzstd.NewWriter(timezone, size, tagKey, useDiskBuffer, irBuffer, zstdBuffer) if err != nil { return nil, err }