Skip to content

Commit

Permalink
fixed seek error catch
Browse files Browse the repository at this point in the history
  • Loading branch information
davemarco committed Jul 26, 2024
1 parent 87a419d commit 333b288
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 14 deletions.
50 changes: 37 additions & 13 deletions internal/irzstd/irzstd.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Writer struct {
irWriter *ir.Writer
size int
timezone string
tagKey string
irTotalBytes int
zstdWriter *zstd.Encoder
}
Expand All @@ -69,6 +70,7 @@ type Writer struct {
func NewWriter(
timezone string,
size int,
tagKey string,
useDiskBuffer bool,
irBuffer io.ReadWriter,
zstdBuffer io.ReadWriter,
Expand All @@ -88,6 +90,7 @@ func NewWriter(
writer := Writer{
useDiskBuffer: useDiskBuffer,
size: size,
tagKey: tagKey,
timezone: timezone,
irBuffer: irBuffer,
zstdBuffer: zstdBuffer,
Expand Down Expand Up @@ -167,13 +170,13 @@ func (w *Writer) Close() error {
return err
}

zstdFile, ok := w.zstdBuffer.(*os.File)
if !ok {
return fmt.Errorf("error type assertion from buffer to os.File failed")
if !w.useDiskBuffer {
return nil
}

if w.useDiskBuffer {
zstdFile.Seek(0, io.SeekStart)
err = diskBufferSeek(w.zstdBuffer, 0, io.SeekStart)
if err != nil {
return err
}

return nil
Expand Down Expand Up @@ -260,16 +263,14 @@ func (w *Writer) flushIrBuffer() error {
return nil
}

irFile, ok := w.irBuffer.(*os.File)
if !ok {
return fmt.Errorf("error type assertion from buffer to os.File failed")
}

log.Printf("flushing IR buffer %s", irFile.Name())
log.Printf("flushing IR buffer %s", w.tagKey)

irFile.Seek(0, io.SeekStart)
err := diskBufferSeek(w.irBuffer, 0, io.SeekStart)
if err != nil {
return err
}

_, err := io.Copy(w.zstdWriter, w.irBuffer)
_, err = io.Copy(w.zstdWriter, w.irBuffer)
if err != nil {
return err
}
Expand Down Expand Up @@ -338,3 +339,26 @@ func truncateDiskBuffer(diskBuffer io.ReadWriter) error {

return nil
}

// Seek for disk buffer.
//
// Parameters:
// - diskBuffer: Buffer file
// - offset: Byte offset
// - whence: Seek whence values. Either io.SeekStart, io.SeekCurrent, and io.SeekEnd.
//
// Returns:
// - err: error with type assertion, error with truncate
func diskBufferSeek(diskBuffer io.ReadWriter, offset int64, whence int) error {
file, ok := diskBuffer.(*os.File)
if !ok {
return fmt.Errorf("error type assertion from buffer to os.File failed")
}

_, err := file.Seek(offset, whence)
if err != nil {
return fmt.Errorf("error seeking disk buffer: %w", err)
}

return nil
}
2 changes: 1 addition & 1 deletion plugins/out_clp_s3/flush/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func newTag(
irBuffer io.ReadWriter,
zstdBuffer io.ReadWriter,
) (*outctx.Tag, error) {
writer, err := irzstd.NewWriter(timezone, size, useDiskBuffer, irBuffer, zstdBuffer)
writer, err := irzstd.NewWriter(timezone, size, tagKey, useDiskBuffer, irBuffer, zstdBuffer)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 333b288

Please sign in to comment.