Skip to content

Commit

Permalink
lz4: start read from offset
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Jan 17, 2025
1 parent b724b32 commit b1569e2
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 24 deletions.
47 changes: 31 additions & 16 deletions plugin/input/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ type jobProvider struct {
}

type Job struct {
file *os.File
inode inodeID
sourceID pipeline.SourceID // some value to distinguish jobs with same inode
filename string
symlink string
curOffset int64 // offset to not call Seek() everytime
tail []byte // some data of a new line read by worker, to not seek backwards to read from line start
file *os.File
mimeType string
isCompressed bool
inode inodeID
sourceID pipeline.SourceID // some value to distinguish jobs with same inode
filename string
symlink string
curOffset int64 // offset to not call Seek() everytime
tail []byte // some data of a new line read by worker, to not seek backwards to read from line start

ignoreEventsLE uint64 // events with seq id less or equal than this should be ignored in terms offset commitment
lastEventSeq uint64
Expand All @@ -83,10 +85,15 @@ type Job struct {
mu *sync.Mutex
}

func (j *Job) seek(offset int64, whence int, hint string) int64 {
n, err := j.file.Seek(offset, whence)
if err != nil {
logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error())
func (j *Job) seek(offset int64, whence int, hint string) (n int64) {
var err error
if !j.isCompressed {
n, err = j.file.Seek(offset, whence)
if err != nil {
logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error())
}
} else {
n = 0
}
j.curOffset = n

Expand Down Expand Up @@ -354,6 +361,10 @@ func (jp *jobProvider) checkFileWasTruncated(job *Job, size int64) {
}
}

func isCompressed(mimeType string) bool {
return mimeType == "application/x-lz4"
}

func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string, symlink string) {
sourceID := sourceIDByStat(stat, symlink)

Expand All @@ -370,12 +381,16 @@ func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string,
}

inode := getInode(stat)
mimeType := getMimeType(filename)

job := &Job{
file: file,
inode: inode,
filename: filename,
symlink: symlink,
sourceID: sourceID,
file: file,
isCompressed: isCompressed(mimeType),
mimeType: mimeType,
inode: inode,
filename: filename,
symlink: symlink,
sourceID: sourceID,

isVirgin: true,
isDone: true,
Expand Down
52 changes: 44 additions & 8 deletions plugin/input/file/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"mime"
"os"
"os/exec"
"path/filepath"
"strings"

Expand Down Expand Up @@ -89,11 +90,24 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
}
}

mimeType := getMimeType(file.Name())
var reader io.Reader

if mimeType == "application/x-lz4" {
if job.mimeType == "application/x-lz4" {
if isNotFileBeingWritten(file.Name()) {
logger.Error("cannot lock file", zap.String("filename", file.Name()))
break
}
lz4Reader := lz4.NewReader(file)
if len(offsets) > 0 {
for lastOffset+int64(readBufferSize) < offsets[0].Offset {
n, err := lz4Reader.Read(readBuf)
if err != nil {
if err == io.EOF {
break // End of file reached
}
}
lastOffset += int64(n)
}
}
reader = lz4Reader
} else {
reader = file
Expand Down Expand Up @@ -196,17 +210,39 @@ func getMimeType(filename string) string {
return mimeType
}

func isNotFileBeingWritten(filePath string) bool {
// Run the lsof command to check open file descriptors
cmd := exec.Command("lsof", filePath)
output, err := cmd.Output()
if err != nil {
return false // Error running lsof
}

// Check the output for write access
lines := strings.Split(string(output), "\n")
for _, line := range lines {
// Check if the line contains 'w' indicating write access
if strings.Contains(line, "w") {
return true // File is being written to
}
}

return false // File is not being written to
}

func (w *worker) processEOF(file *os.File, job *Job, jobProvider *jobProvider, totalOffset int64) error {
stat, err := file.Stat()
if err != nil {
return err
}

// files truncated from time to time, after logs from file was processed.
// Position > stat.Size() means that data was truncated and
// caret pointer must be moved to start of file.
if totalOffset > stat.Size() {
jobProvider.truncateJob(job)
if !job.isCompressed {
// files truncated from time to time, after logs from file was processed.
// Position > stat.Size() means that data was truncated and
// caret pointer must be moved to start of file.
if totalOffset > stat.Size() {
jobProvider.truncateJob(job)
}
}

// Mark job as done till new lines has appeared.
Expand Down

0 comments on commit b1569e2

Please sign in to comment.