Skip to content

Commit

Permalink
chore: simplify datapipe and test orphan file cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
zinic committed Feb 9, 2024
1 parent e1bfed1 commit 250a7c4
Show file tree
Hide file tree
Showing 5 changed files with 377 additions and 57 deletions.
113 changes: 113 additions & 0 deletions cmd/api/src/daemons/datapipe/cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2024 Specter Ops, Inc.
//
// Licensed under the Apache License, Version 2.0
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package datapipe

//go:generate go run go.uber.org/mock/mockgen -copyright_file=../../../../../LICENSE.header -destination=./mocks/cleanup.go -package=mocks . FileOperations

import (
"context"
"github.com/specterops/bloodhound/log"
"os"
"path/filepath"
"sync"
)

// FileOperations is an interface for describing filesystem actions. This implementation exists due to deficiencies in
// the fs package where the fs.FS interface does not support destructive operations like RemoveAll.
type FileOperations interface {
ReadDir(path string) ([]os.DirEntry, error)
RemoveAll(path string) error
}

// osFileOperations is a passthrough implementation of the FileOperations interface that uses the os package
// functions.
type osFileOperations struct{}

func NewOSFileOperations() FileOperations {
return osFileOperations{}
}

func (s osFileOperations) ReadDir(path string) ([]os.DirEntry, error) {
return os.ReadDir(path)
}

func (s osFileOperations) RemoveAll(path string) error {
return os.RemoveAll(path)
}

// OrphanFileSweeper is a file cleanup utility that allows only one goroutine to attempt file cleanup at a time.
type OrphanFileSweeper struct {
lock *sync.Mutex
fileOps FileOperations
tempDirectoryRootPath string
}

func NewOrphanFileSweeper(fileOps FileOperations, tempDirectoryRootPath string) *OrphanFileSweeper {
return &OrphanFileSweeper{
lock: &sync.Mutex{},
fileOps: fileOps,
tempDirectoryRootPath: tempDirectoryRootPath,
}
}

// Clear takes a context and a list of expected file names. The function will list all directory entries within the
// configured tempDirectoryRootPath that the sweeper was instantiated with and compare the resulting list against the
// passed expected file names. The function will then call RemoveAll on each directory entry not found in the expected
// file name slice.
func (s *OrphanFileSweeper) Clear(ctx context.Context, expectedFileNames []string) {
// Only allow one background thread to run for clearing orphaned data/
if !s.lock.TryLock() {
return
}

// Release the lock once finished
defer s.lock.Unlock()

if dirEntries, err := s.fileOps.ReadDir(s.tempDirectoryRootPath); err != nil {
log.Errorf("Failed reading work directory %s: %v", s.tempDirectoryRootPath, err)
} else {
numDeleted := 0

// Remove expected files from the deletion list
for _, expectedFileName := range expectedFileNames {
for idx, dirEntry := range dirEntries {
if expectedFileName == dirEntry.Name() {
dirEntries = append(dirEntries[:idx], dirEntries[idx+1:]...)
}
}
}

for _, orphanedDirEntry := range dirEntries {
// Check for context cancellation before each file deletion
if ctx.Err() != nil {
break
}

fullPath := filepath.Join(s.tempDirectoryRootPath, orphanedDirEntry.Name())

if err := s.fileOps.RemoveAll(fullPath); err != nil {
log.Errorf("Failed removing orphaned file %s: %v", fullPath, err)
}

numDeleted += 1
}

if numDeleted > 0 {
log.Infof("Finished removing %d orphaned ingest files", numDeleted)
}
}
}
159 changes: 159 additions & 0 deletions cmd/api/src/daemons/datapipe/cleanup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2024 Specter Ops, Inc.
//
// Licensed under the Apache License, Version 2.0
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package datapipe_test

import (
"context"
"github.com/specterops/bloodhound/src/daemons/datapipe"
"github.com/specterops/bloodhound/src/daemons/datapipe/mocks"
"go.uber.org/mock/gomock"
"io/fs"
"os"
"path/filepath"
"sync"
"testing"
)

type dirEntry struct {
name string
isDir bool
mode fs.FileMode
info fs.FileInfo
infoErr error
}

func (s dirEntry) Name() string {
return s.name
}

func (s dirEntry) IsDir() bool {
return s.isDir
}

func (s dirEntry) Type() fs.FileMode {
return s.mode
}

func (s dirEntry) Info() (fs.FileInfo, error) {
return s.info, s.infoErr
}

func TestOrphanFileSweeper_Clear(t *testing.T) {
const workDir = "/fake/work/dir"

t.Run("Allow Only One Goroutine", func(t *testing.T) {
var (
mockCtrl = gomock.NewController(t)
mockFileOps = mocks.NewMockFileOperations(mockCtrl)
sweeper = datapipe.NewOrphanFileSweeper(mockFileOps, workDir)
wgCoordination = &sync.WaitGroup{}
wgReadDir = &sync.WaitGroup{}
)

defer mockCtrl.Finish()

// Prep the wait groups for coordination
wgCoordination.Add(1)
wgReadDir.Add(1)

mockFileOps.EXPECT().ReadDir(workDir).DoAndReturn(func(path string) ([]os.DirEntry, error) {
// Release the coordination wait group
wgCoordination.Done()

// Block on the readDir wait group
wgReadDir.Wait()

return nil, nil
})

// Launch the clear function in a goroutine. The wait group will cause this call to block
go sweeper.Clear(context.Background(), []string{})

// Wait for the go routine to reach the ReadDir function
wgCoordination.Wait()

// Run the clear function in the current thread context as this should exit without blocking
sweeper.Clear(context.Background(), []string{})

// Release the wait group to complete the test
wgReadDir.Done()
})

t.Run("Clear Orphan Files", func(t *testing.T) {
var (
mockCtrl = gomock.NewController(t)
mockFileOps = mocks.NewMockFileOperations(mockCtrl)
sweeper = datapipe.NewOrphanFileSweeper(mockFileOps, workDir)
)

defer mockCtrl.Finish()

mockFileOps.EXPECT().ReadDir(workDir).Return([]os.DirEntry{
dirEntry{
name: "1",
},
}, nil)

mockFileOps.EXPECT().RemoveAll(filepath.Join(workDir, "1")).Return(nil)

sweeper.Clear(context.Background(), []string{})
})

t.Run("Exclude Expected Files", func(t *testing.T) {
var (
mockCtrl = gomock.NewController(t)
mockFileOps = mocks.NewMockFileOperations(mockCtrl)
sweeper = datapipe.NewOrphanFileSweeper(mockFileOps, workDir)
)

defer mockCtrl.Finish()

mockFileOps.EXPECT().ReadDir(workDir).Return([]os.DirEntry{
dirEntry{
name: "1",
},
}, nil)

// This one is a negative assertion. Because we're passing in "1" to be excluded the listed dirEntries will be
// empty and therefore the sweeper MUST NOT call RemoveAll() on the FileOperations mock. If RemoveAll() is
// called then this test MUST fail.
sweeper.Clear(context.Background(), []string{"1"})
})

t.Run("Exit on Context Cancellation", func(t *testing.T) {
var (
mockCtrl = gomock.NewController(t)
mockFileOps = mocks.NewMockFileOperations(mockCtrl)
sweeper = datapipe.NewOrphanFileSweeper(mockFileOps, workDir)
)

defer mockCtrl.Finish()

mockFileOps.EXPECT().ReadDir(workDir).Return([]os.DirEntry{
dirEntry{
name: "1",
},
}, nil)

// Create a cancellable context and cancel it right away
ctx, done := context.WithCancel(context.Background())
done()

// When passed in with the cancelled context the sweeper should not call os.RemoveAll("1")
sweeper.Clear(ctx, []string{})
})
}
82 changes: 25 additions & 57 deletions cmd/api/src/daemons/datapipe/datapipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,13 @@
//
// SPDX-License-Identifier: Apache-2.0

//go:generate go run go.uber.org/mock/mockgen -copyright_file=../../../../../LICENSE.header -destination=./mocks/mock.go -package=mocks . Tasker
//go:generate go run go.uber.org/mock/mockgen -copyright_file=../../../../../LICENSE.header -destination=./mocks/tasker.go -package=mocks . Tasker
package datapipe

import (
"context"
"errors"
"github.com/specterops/bloodhound/src/bootstrap"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

Expand All @@ -47,15 +44,15 @@ type Tasker interface {
}

type Daemon struct {
db database.Database
graphdb graph.Database
cache cache.Cache
cfg config.Configuration
analysisRequested *atomic.Bool
tickInterval time.Duration
status model.DatapipeStatusWrapper
ctx context.Context
clearOrphanedFilesLock *sync.Mutex
db database.Database
graphdb graph.Database
cache cache.Cache
cfg config.Configuration
analysisRequested *atomic.Bool
tickInterval time.Duration
status model.DatapipeStatusWrapper
ctx context.Context
orphanedFileSweeper *OrphanFileSweeper
}

func (s *Daemon) Name() string {
Expand All @@ -64,14 +61,14 @@ func (s *Daemon) Name() string {

func NewDaemon(ctx context.Context, cfg config.Configuration, connections bootstrap.DatabaseConnections[*database.BloodhoundDB, *graph.DatabaseSwitch], cache cache.Cache, tickInterval time.Duration) *Daemon {
return &Daemon{
db: connections.RDMS,
graphdb: connections.Graph,
cache: cache,
cfg: cfg,
ctx: ctx,
analysisRequested: &atomic.Bool{},
clearOrphanedFilesLock: &sync.Mutex{},
tickInterval: tickInterval,
db: connections.RDMS,
graphdb: connections.Graph,
cache: cache,
cfg: cfg,
ctx: ctx,
analysisRequested: &atomic.Bool{},
orphanedFileSweeper: NewOrphanFileSweeper(NewOSFileOperations(), cfg.TempDirectory()),
tickInterval: tickInterval,
status: model.DatapipeStatusWrapper{
Status: model.DatapipeStatusIdle,
UpdatedAt: time.Now().UTC(),
Expand Down Expand Up @@ -105,7 +102,7 @@ func (s *Daemon) analyze() {
}

s.status.Update(model.DatapipeStatusAnalyzing, false)
log.LogAndMeasure(log.LevelInfo, "Graph Analysis")()
defer log.LogAndMeasure(log.LevelInfo, "Graph Analysis")()

if err := RunAnalysisOperations(s.ctx, s.db, s.graphdb, s.cfg); err != nil {
if errors.Is(err, ErrAnalysisFailed) {
Expand Down Expand Up @@ -190,44 +187,15 @@ func (s *Daemon) Stop(ctx context.Context) error {
}

func (s *Daemon) clearOrphanedData() {
// Only allow one background thread to run for clearing orphaned data
if !s.clearOrphanedFilesLock.TryLock() {
return
}

// Release the lock once finished
defer s.clearOrphanedFilesLock.Unlock()

relativeTmpDir := s.cfg.TempDirectory()

if orphanFiles, err := os.ReadDir(s.cfg.TempDirectory()); err != nil {
log.Errorf("Failed fetching available files: %v", err)
} else if ingestTasks, err := s.db.GetAllIngestTasks(); err != nil {
log.Errorf("Failed fetching available ingest tasks: %v", err)
if ingestTasks, err := s.db.GetAllIngestTasks(); err != nil {
log.Errorf("Failed fetching available file upload ingest tasks: %v", err)
} else {
for _, ingestTask := range ingestTasks {
for idx, orphanFile := range orphanFiles {
if ingestTask.FileName == filepath.Join(relativeTmpDir, orphanFile.Name()) {
orphanFiles = append(orphanFiles[:idx], orphanFiles[idx+1:]...)
}
}
}
expectedFiles := make([]string, len(ingestTasks))

for _, orphanFile := range orphanFiles {
fullPath := filepath.Join(relativeTmpDir, orphanFile.Name())

if err := os.RemoveAll(fullPath); err != nil {
log.Errorf("Failed removing file: %s", fullPath)
}

// Check to see if we need to exit after every file deletion
if s.ctx.Err() != nil {
return
}
for idx, ingestTask := range ingestTasks {
expectedFiles[idx] = ingestTask.FileName
}

if len(orphanFiles) > 0 {
log.Infof("Finished removing %d orphaned ingest files", len(orphanFiles))
}
go s.orphanedFileSweeper.Clear(s.ctx, expectedFiles)
}
}
Loading

0 comments on commit 250a7c4

Please sign in to comment.