From ea73410c20ec63d999f9215067f229164f5beb46 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 4 Oct 2023 12:54:29 +0000 Subject: [PATCH] [8.10](backport #36736) Re-use buffers to optimise memory allocation in fingerprint (#36739) * Re-use buffers to optimise memory allocation in fingerprint (#36736) This dramatically drops the memory usage, particularly on large amount of files. (cherry picked from commit 429b38f8cd1df1187c0642aa04882598499c2181) * Fix changelog --------- Co-authored-by: Denis --- CHANGELOG.next.asciidoc | 1 + filebeat/input/filestream/fswatch.go | 30 ++++++++------- filebeat/input/filestream/fswatch_test.go | 46 +++++++++++------------ 3 files changed, 40 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c7428267700c..d73663a6df0e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -120,6 +120,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add nginx ingress_controller parsing if one of upstreams fails to return response {pull}34787[34787] - Add oracle authentication messages parsing {pull}35127[35127] - Update mito CEL extension library to v1.6.0. {pull}36651[36651] +- Re-use buffers to optimise memory allocation in fingerprint mode of filestream {pull}36736[36736] *Auditbeat* diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index 8986794340d3..a32b4409ef2d 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -18,10 +18,10 @@ package filestream import ( - "bufio" "crypto/sha256" "encoding/hex" "fmt" + "hash" "io" "os" "path/filepath" @@ -282,16 +282,19 @@ func defaultFileScannerConfig() fileScannerConfig { // fileScanner looks for files which match the patterns in paths. // It is able to exclude files and symlinks. type fileScanner struct { - paths []string - cfg fileScannerConfig - log *logp.Logger + paths []string + cfg fileScannerConfig + log *logp.Logger + hasher hash.Hash + readBuffer []byte } -func newFileScanner(paths []string, config fileScannerConfig) (loginp.FSScanner, error) { +func newFileScanner(paths []string, config fileScannerConfig) (*fileScanner, error) { s := fileScanner{ - paths: paths, - cfg: config, - log: logp.NewLogger(scannerDebugKey), + paths: paths, + cfg: config, + log: logp.NewLogger(scannerDebugKey), + hasher: sha256.New(), } if s.cfg.Fingerprint.Enabled { @@ -300,6 +303,7 @@ func newFileScanner(paths []string, config fileScannerConfig) (loginp.FSScanner, return nil, fmt.Errorf("error while reading configuration of fingerprint: %w", err) } s.log.Debugf("fingerprint mode enabled: offset %d, length %d", s.cfg.Fingerprint.Offset, s.cfg.Fingerprint.Length) + s.readBuffer = make([]byte, s.cfg.Fingerprint.Length) } err := s.resolveRecursiveGlobs(config) @@ -463,6 +467,7 @@ func (s *fileScanner) toFileDescriptor(it *ingestTarget) (fd loginp.FileDescript if s.cfg.Fingerprint.Enabled { fileSize := it.info.Size() + // we should not open the file if we know it's too small minSize := s.cfg.Fingerprint.Offset + s.cfg.Fingerprint.Length if fileSize < minSize { return fd, fmt.Errorf("filesize of %q is %d bytes, expected at least %d bytes for fingerprinting", fd.Filename, fileSize, minSize) @@ -481,10 +486,9 @@ func (s *fileScanner) toFileDescriptor(it *ingestTarget) (fd loginp.FileDescript } } - bfile := bufio.NewReaderSize(file, int(s.cfg.Fingerprint.Length)) - r := io.LimitReader(bfile, s.cfg.Fingerprint.Length) - h := sha256.New() - written, err := io.Copy(h, r) + s.hasher.Reset() + lr := io.LimitReader(file, s.cfg.Fingerprint.Length) + written, err := io.CopyBuffer(s.hasher, lr, s.readBuffer) if err != nil { return fd, fmt.Errorf("failed to compute hash for first %d bytes of %q: %w", s.cfg.Fingerprint.Length, fd.Filename, err) } @@ -492,7 +496,7 @@ func (s *fileScanner) toFileDescriptor(it *ingestTarget) (fd loginp.FileDescript return fd, fmt.Errorf("failed to read %d bytes from %q to compute fingerprint, read only %d", written, fd.Filename, s.cfg.Fingerprint.Length) } - fd.Fingerprint = hex.EncodeToString(h.Sum(nil)) + fd.Fingerprint = hex.EncodeToString(s.hasher.Sum(nil)) } return fd, nil diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index ded2459628be..ffa9b9c510e4 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -831,15 +831,14 @@ func BenchmarkGetFiles(b *testing.B) { err := os.WriteFile(filename, []byte(strings.Repeat(content, 1024)), 0777) require.NoError(b, err) } - - s := fileScanner{ - paths: []string{filepath.Join(dir, "*.log")}, - cfg: fileScannerConfig{ - Fingerprint: fingerprintConfig{ - Enabled: false, - }, + paths := []string{filepath.Join(dir, "*.log")} + cfg := fileScannerConfig{ + Fingerprint: fingerprintConfig{ + Enabled: false, }, } + s, err := newFileScanner(paths, cfg) + require.NoError(b, err) for i := 0; i < b.N; i++ { files := s.GetFiles() @@ -857,17 +856,16 @@ func BenchmarkGetFilesWithFingerprint(b *testing.B) { err := os.WriteFile(filename, []byte(strings.Repeat(content, 1024)), 0777) require.NoError(b, err) } - - s := fileScanner{ - paths: []string{filepath.Join(dir, "*.log")}, - cfg: fileScannerConfig{ - Fingerprint: fingerprintConfig{ - Enabled: true, - Offset: 0, - Length: 1024, - }, + paths := []string{filepath.Join(dir, "*.log")} + cfg := fileScannerConfig{ + Fingerprint: fingerprintConfig{ + Enabled: true, + Offset: 0, + Length: 1024, }, } + s, err := newFileScanner(paths, cfg) + require.NoError(b, err) for i := 0; i < b.N; i++ { files := s.GetFiles() @@ -947,16 +945,16 @@ func BenchmarkToFileDescriptor(b *testing.B) { err := os.WriteFile(filename, []byte(strings.Repeat("a", 1024)), 0777) require.NoError(b, err) - s := fileScanner{ - paths: []string{filename}, - cfg: fileScannerConfig{ - Fingerprint: fingerprintConfig{ - Enabled: true, - Offset: 0, - Length: 1024, - }, + paths := []string{filename} + cfg := fileScannerConfig{ + Fingerprint: fingerprintConfig{ + Enabled: true, + Offset: 0, + Length: 1024, }, } + s, err := newFileScanner(paths, cfg) + require.NoError(b, err) it, err := s.getIngestTarget(filename) require.NoError(b, err)