diff --git a/cmd/api/src/daemons/datapipe/cleanup.go b/cmd/api/src/daemons/datapipe/cleanup.go new file mode 100644 index 0000000000..5b24ec3e61 --- /dev/null +++ b/cmd/api/src/daemons/datapipe/cleanup.go @@ -0,0 +1,113 @@ +// Copyright 2024 Specter Ops, Inc. +// +// Licensed under the Apache License, Version 2.0 +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package datapipe + +//go:generate go run go.uber.org/mock/mockgen -copyright_file=../../../../../LICENSE.header -destination=./mocks/cleanup.go -package=mocks . FileOperations + +import ( + "context" + "github.com/specterops/bloodhound/log" + "os" + "path/filepath" + "sync" +) + +// FileOperations is an interface for describing filesystem actions. This implementation exists due to deficiencies in +// the fs package where the fs.FS interface does not support destructive operations like RemoveAll. +type FileOperations interface { + ReadDir(path string) ([]os.DirEntry, error) + RemoveAll(path string) error +} + +// osFileOperations is a passthrough implementation of the FileOperations interface that uses the os package +// functions. +type osFileOperations struct{} + +func NewOSFileOperations() FileOperations { + return osFileOperations{} +} + +func (s osFileOperations) ReadDir(path string) ([]os.DirEntry, error) { + return os.ReadDir(path) +} + +func (s osFileOperations) RemoveAll(path string) error { + return os.RemoveAll(path) +} + +// OrphanFileSweeper is a file cleanup utility that allows only one goroutine to attempt file cleanup at a time. +type OrphanFileSweeper struct { + lock *sync.Mutex + fileOps FileOperations + tempDirectoryRootPath string +} + +func NewOrphanFileSweeper(fileOps FileOperations, tempDirectoryRootPath string) *OrphanFileSweeper { + return &OrphanFileSweeper{ + lock: &sync.Mutex{}, + fileOps: fileOps, + tempDirectoryRootPath: tempDirectoryRootPath, + } +} + +// Clear takes a context and a list of expected file names. The function will list all directory entries within the +// configured tempDirectoryRootPath that the sweeper was instantiated with and compare the resulting list against the +// passed expected file names. The function will then call RemoveAll on each directory entry not found in the expected +// file name slice. +func (s *OrphanFileSweeper) Clear(ctx context.Context, expectedFileNames []string) { + // Only allow one background thread to run for clearing orphaned data/ + if !s.lock.TryLock() { + return + } + + // Release the lock once finished + defer s.lock.Unlock() + + if dirEntries, err := s.fileOps.ReadDir(s.tempDirectoryRootPath); err != nil { + log.Errorf("Failed reading work directory %s: %v", s.tempDirectoryRootPath, err) + } else { + numDeleted := 0 + + // Remove expected files from the deletion list + for _, expectedFileName := range expectedFileNames { + for idx, dirEntry := range dirEntries { + if expectedFileName == dirEntry.Name() { + dirEntries = append(dirEntries[:idx], dirEntries[idx+1:]...) + } + } + } + + for _, orphanedDirEntry := range dirEntries { + // Check for context cancellation before each file deletion + if ctx.Err() != nil { + break + } + + fullPath := filepath.Join(s.tempDirectoryRootPath, orphanedDirEntry.Name()) + + if err := s.fileOps.RemoveAll(fullPath); err != nil { + log.Errorf("Failed removing orphaned file %s: %v", fullPath, err) + } + + numDeleted += 1 + } + + if numDeleted > 0 { + log.Infof("Finished removing %d orphaned ingest files", numDeleted) + } + } +} diff --git a/cmd/api/src/daemons/datapipe/cleanup_test.go b/cmd/api/src/daemons/datapipe/cleanup_test.go new file mode 100644 index 0000000000..7717e5dc0d --- /dev/null +++ b/cmd/api/src/daemons/datapipe/cleanup_test.go @@ -0,0 +1,159 @@ +// Copyright 2024 Specter Ops, Inc. +// +// Licensed under the Apache License, Version 2.0 +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package datapipe_test + +import ( + "context" + "github.com/specterops/bloodhound/src/daemons/datapipe" + "github.com/specterops/bloodhound/src/daemons/datapipe/mocks" + "go.uber.org/mock/gomock" + "io/fs" + "os" + "path/filepath" + "sync" + "testing" +) + +type dirEntry struct { + name string + isDir bool + mode fs.FileMode + info fs.FileInfo + infoErr error +} + +func (s dirEntry) Name() string { + return s.name +} + +func (s dirEntry) IsDir() bool { + return s.isDir +} + +func (s dirEntry) Type() fs.FileMode { + return s.mode +} + +func (s dirEntry) Info() (fs.FileInfo, error) { + return s.info, s.infoErr +} + +func TestOrphanFileSweeper_Clear(t *testing.T) { + const workDir = "/fake/work/dir" + + t.Run("Allow Only One Goroutine", func(t *testing.T) { + var ( + mockCtrl = gomock.NewController(t) + mockFileOps = mocks.NewMockFileOperations(mockCtrl) + sweeper = datapipe.NewOrphanFileSweeper(mockFileOps, workDir) + wgCoordination = &sync.WaitGroup{} + wgReadDir = &sync.WaitGroup{} + ) + + defer mockCtrl.Finish() + + // Prep the wait groups for coordination + wgCoordination.Add(1) + wgReadDir.Add(1) + + mockFileOps.EXPECT().ReadDir(workDir).DoAndReturn(func(path string) ([]os.DirEntry, error) { + // Release the coordination wait group + wgCoordination.Done() + + // Block on the readDir wait group + wgReadDir.Wait() + + return nil, nil + }) + + // Launch the clear function in a goroutine. The wait group will cause this call to block + go sweeper.Clear(context.Background(), []string{}) + + // Wait for the go routine to reach the ReadDir function + wgCoordination.Wait() + + // Run the clear function in the current thread context as this should exit without blocking + sweeper.Clear(context.Background(), []string{}) + + // Release the wait group to complete the test + wgReadDir.Done() + }) + + t.Run("Clear Orphan Files", func(t *testing.T) { + var ( + mockCtrl = gomock.NewController(t) + mockFileOps = mocks.NewMockFileOperations(mockCtrl) + sweeper = datapipe.NewOrphanFileSweeper(mockFileOps, workDir) + ) + + defer mockCtrl.Finish() + + mockFileOps.EXPECT().ReadDir(workDir).Return([]os.DirEntry{ + dirEntry{ + name: "1", + }, + }, nil) + + mockFileOps.EXPECT().RemoveAll(filepath.Join(workDir, "1")).Return(nil) + + sweeper.Clear(context.Background(), []string{}) + }) + + t.Run("Exclude Expected Files", func(t *testing.T) { + var ( + mockCtrl = gomock.NewController(t) + mockFileOps = mocks.NewMockFileOperations(mockCtrl) + sweeper = datapipe.NewOrphanFileSweeper(mockFileOps, workDir) + ) + + defer mockCtrl.Finish() + + mockFileOps.EXPECT().ReadDir(workDir).Return([]os.DirEntry{ + dirEntry{ + name: "1", + }, + }, nil) + + // This one is a negative assertion. Because we're passing in "1" to be excluded the listed dirEntries will be + // empty and therefore the sweeper MUST NOT call RemoveAll() on the FileOperations mock. If RemoveAll() is + // called then this test MUST fail. + sweeper.Clear(context.Background(), []string{"1"}) + }) + + t.Run("Exit on Context Cancellation", func(t *testing.T) { + var ( + mockCtrl = gomock.NewController(t) + mockFileOps = mocks.NewMockFileOperations(mockCtrl) + sweeper = datapipe.NewOrphanFileSweeper(mockFileOps, workDir) + ) + + defer mockCtrl.Finish() + + mockFileOps.EXPECT().ReadDir(workDir).Return([]os.DirEntry{ + dirEntry{ + name: "1", + }, + }, nil) + + // Create a cancellable context and cancel it right away + ctx, done := context.WithCancel(context.Background()) + done() + + // When passed in with the cancelled context the sweeper should not call os.RemoveAll("1") + sweeper.Clear(ctx, []string{}) + }) +} diff --git a/cmd/api/src/daemons/datapipe/datapipe.go b/cmd/api/src/daemons/datapipe/datapipe.go index ef2b2e2d71..432e91cc15 100644 --- a/cmd/api/src/daemons/datapipe/datapipe.go +++ b/cmd/api/src/daemons/datapipe/datapipe.go @@ -14,16 +14,13 @@ // // SPDX-License-Identifier: Apache-2.0 -//go:generate go run go.uber.org/mock/mockgen -copyright_file=../../../../../LICENSE.header -destination=./mocks/mock.go -package=mocks . Tasker +//go:generate go run go.uber.org/mock/mockgen -copyright_file=../../../../../LICENSE.header -destination=./mocks/tasker.go -package=mocks . Tasker package datapipe import ( "context" "errors" "github.com/specterops/bloodhound/src/bootstrap" - "os" - "path/filepath" - "sync" "sync/atomic" "time" @@ -47,15 +44,15 @@ type Tasker interface { } type Daemon struct { - db database.Database - graphdb graph.Database - cache cache.Cache - cfg config.Configuration - analysisRequested *atomic.Bool - tickInterval time.Duration - status model.DatapipeStatusWrapper - ctx context.Context - clearOrphanedFilesLock *sync.Mutex + db database.Database + graphdb graph.Database + cache cache.Cache + cfg config.Configuration + analysisRequested *atomic.Bool + tickInterval time.Duration + status model.DatapipeStatusWrapper + ctx context.Context + orphanedFileSweeper *OrphanFileSweeper } func (s *Daemon) Name() string { @@ -64,14 +61,14 @@ func (s *Daemon) Name() string { func NewDaemon(ctx context.Context, cfg config.Configuration, connections bootstrap.DatabaseConnections[*database.BloodhoundDB, *graph.DatabaseSwitch], cache cache.Cache, tickInterval time.Duration) *Daemon { return &Daemon{ - db: connections.RDMS, - graphdb: connections.Graph, - cache: cache, - cfg: cfg, - ctx: ctx, - analysisRequested: &atomic.Bool{}, - clearOrphanedFilesLock: &sync.Mutex{}, - tickInterval: tickInterval, + db: connections.RDMS, + graphdb: connections.Graph, + cache: cache, + cfg: cfg, + ctx: ctx, + analysisRequested: &atomic.Bool{}, + orphanedFileSweeper: NewOrphanFileSweeper(NewOSFileOperations(), cfg.TempDirectory()), + tickInterval: tickInterval, status: model.DatapipeStatusWrapper{ Status: model.DatapipeStatusIdle, UpdatedAt: time.Now().UTC(), @@ -105,7 +102,7 @@ func (s *Daemon) analyze() { } s.status.Update(model.DatapipeStatusAnalyzing, false) - log.LogAndMeasure(log.LevelInfo, "Graph Analysis")() + defer log.LogAndMeasure(log.LevelInfo, "Graph Analysis")() if err := RunAnalysisOperations(s.ctx, s.db, s.graphdb, s.cfg); err != nil { if errors.Is(err, ErrAnalysisFailed) { @@ -190,44 +187,15 @@ func (s *Daemon) Stop(ctx context.Context) error { } func (s *Daemon) clearOrphanedData() { - // Only allow one background thread to run for clearing orphaned data - if !s.clearOrphanedFilesLock.TryLock() { - return - } - - // Release the lock once finished - defer s.clearOrphanedFilesLock.Unlock() - - relativeTmpDir := s.cfg.TempDirectory() - - if orphanFiles, err := os.ReadDir(s.cfg.TempDirectory()); err != nil { - log.Errorf("Failed fetching available files: %v", err) - } else if ingestTasks, err := s.db.GetAllIngestTasks(); err != nil { - log.Errorf("Failed fetching available ingest tasks: %v", err) + if ingestTasks, err := s.db.GetAllIngestTasks(); err != nil { + log.Errorf("Failed fetching available file upload ingest tasks: %v", err) } else { - for _, ingestTask := range ingestTasks { - for idx, orphanFile := range orphanFiles { - if ingestTask.FileName == filepath.Join(relativeTmpDir, orphanFile.Name()) { - orphanFiles = append(orphanFiles[:idx], orphanFiles[idx+1:]...) - } - } - } + expectedFiles := make([]string, len(ingestTasks)) - for _, orphanFile := range orphanFiles { - fullPath := filepath.Join(relativeTmpDir, orphanFile.Name()) - - if err := os.RemoveAll(fullPath); err != nil { - log.Errorf("Failed removing file: %s", fullPath) - } - - // Check to see if we need to exit after every file deletion - if s.ctx.Err() != nil { - return - } + for idx, ingestTask := range ingestTasks { + expectedFiles[idx] = ingestTask.FileName } - if len(orphanFiles) > 0 { - log.Infof("Finished removing %d orphaned ingest files", len(orphanFiles)) - } + go s.orphanedFileSweeper.Clear(s.ctx, expectedFiles) } } diff --git a/cmd/api/src/daemons/datapipe/mocks/cleanup.go b/cmd/api/src/daemons/datapipe/mocks/cleanup.go new file mode 100644 index 0000000000..d7d3b8aa15 --- /dev/null +++ b/cmd/api/src/daemons/datapipe/mocks/cleanup.go @@ -0,0 +1,80 @@ +// Copyright 2023 Specter Ops, Inc. +// +// Licensed under the Apache License, Version 2.0 +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/specterops/bloodhound/src/daemons/datapipe (interfaces: FileOperations) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + fs "io/fs" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockFileOperations is a mock of FileOperations interface. +type MockFileOperations struct { + ctrl *gomock.Controller + recorder *MockFileOperationsMockRecorder +} + +// MockFileOperationsMockRecorder is the mock recorder for MockFileOperations. +type MockFileOperationsMockRecorder struct { + mock *MockFileOperations +} + +// NewMockFileOperations creates a new mock instance. +func NewMockFileOperations(ctrl *gomock.Controller) *MockFileOperations { + mock := &MockFileOperations{ctrl: ctrl} + mock.recorder = &MockFileOperationsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFileOperations) EXPECT() *MockFileOperationsMockRecorder { + return m.recorder +} + +// ReadDir mocks base method. +func (m *MockFileOperations) ReadDir(arg0 string) ([]fs.DirEntry, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadDir", arg0) + ret0, _ := ret[0].([]fs.DirEntry) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadDir indicates an expected call of ReadDir. +func (mr *MockFileOperationsMockRecorder) ReadDir(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadDir", reflect.TypeOf((*MockFileOperations)(nil).ReadDir), arg0) +} + +// RemoveAll mocks base method. +func (m *MockFileOperations) RemoveAll(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RemoveAll", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RemoveAll indicates an expected call of RemoveAll. +func (mr *MockFileOperationsMockRecorder) RemoveAll(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveAll", reflect.TypeOf((*MockFileOperations)(nil).RemoveAll), arg0) +} diff --git a/cmd/api/src/daemons/datapipe/mocks/mock.go b/cmd/api/src/daemons/datapipe/mocks/tasker.go similarity index 100% rename from cmd/api/src/daemons/datapipe/mocks/mock.go rename to cmd/api/src/daemons/datapipe/mocks/tasker.go