diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go index f68669a6b..91483b2da 100644 --- a/plugin/input/file/provider.go +++ b/plugin/input/file/provider.go @@ -60,13 +60,15 @@ type jobProvider struct { } type Job struct { - file *os.File - inode inodeID - sourceID pipeline.SourceID // some value to distinguish jobs with same inode - filename string - symlink string - curOffset int64 // offset to not call Seek() everytime - tail []byte // some data of a new line read by worker, to not seek backwards to read from line start + file *os.File + mimeType string + isCompressed bool + inode inodeID + sourceID pipeline.SourceID // some value to distinguish jobs with same inode + filename string + symlink string + curOffset int64 // offset to not call Seek() everytime + tail []byte // some data of a new line read by worker, to not seek backwards to read from line start ignoreEventsLE uint64 // events with seq id less or equal than this should be ignored in terms offset commitment lastEventSeq uint64 @@ -83,10 +85,15 @@ type Job struct { mu *sync.Mutex } -func (j *Job) seek(offset int64, whence int, hint string) int64 { - n, err := j.file.Seek(offset, whence) - if err != nil { - logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error()) +func (j *Job) seek(offset int64, whence int, hint string) (n int64) { + var err error + if !j.isCompressed { + n, err = j.file.Seek(offset, whence) + if err != nil { + logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error()) + } + } else { + n = 0 } j.curOffset = n @@ -354,6 +361,10 @@ func (jp *jobProvider) checkFileWasTruncated(job *Job, size int64) { } } +func isCompressed(mimeType string) bool { + return mimeType == "application/x-lz4" +} + func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string, symlink string) { sourceID := sourceIDByStat(stat, symlink) @@ -370,12 +381,16 @@ func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string, } inode := getInode(stat) + mimeType := getMimeType(filename) + job := &Job{ - file: file, - inode: inode, - filename: filename, - symlink: symlink, - sourceID: sourceID, + file: file, + isCompressed: isCompressed(mimeType), + mimeType: mimeType, + inode: inode, + filename: filename, + symlink: symlink, + sourceID: sourceID, isVirgin: true, isDone: true, diff --git a/plugin/input/file/worker.go b/plugin/input/file/worker.go index 1a6d6684a..0822881ec 100644 --- a/plugin/input/file/worker.go +++ b/plugin/input/file/worker.go @@ -89,11 +89,20 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi } } - mimeType := getMimeType(file.Name()) var reader io.Reader - - if mimeType == "application/x-lz4" { + if job.mimeType == "application/x-lz4" { lz4Reader := lz4.NewReader(file) + if len(offsets) > 0 { + for lastOffset+int64(readBufferSize) < offsets[0].Offset { + n, err := lz4Reader.Read(readBuf) + if err != nil { + if err == io.EOF { + break // End of file reached + } + } + lastOffset += int64(n) + } + } reader = lz4Reader } else { reader = file @@ -202,11 +211,13 @@ func (w *worker) processEOF(file *os.File, job *Job, jobProvider *jobProvider, t return err } - // files truncated from time to time, after logs from file was processed. - // Position > stat.Size() means that data was truncated and - // caret pointer must be moved to start of file. - if totalOffset > stat.Size() { - jobProvider.truncateJob(job) + if !job.isCompressed { + // files truncated from time to time, after logs from file was processed. + // Position > stat.Size() means that data was truncated and + // caret pointer must be moved to start of file. + if totalOffset > stat.Size() { + jobProvider.truncateJob(job) + } } // Mark job as done till new lines has appeared.