Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: simplify datapipe and test orphan file cleanup #391

Merged
merged 1 commit into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions cmd/api/src/daemons/datapipe/cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2024 Specter Ops, Inc.
//
// Licensed under the Apache License, Version 2.0
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package datapipe

//go:generate go run go.uber.org/mock/mockgen -copyright_file=../../../../../LICENSE.header -destination=./mocks/cleanup.go -package=mocks . FileOperations

import (
"context"
"github.com/specterops/bloodhound/log"
"os"
"path/filepath"
"sync"
)

// FileOperations is an interface for describing filesystem actions. This implementation exists due to deficiencies in
// the fs package where the fs.FS interface does not support destructive operations like RemoveAll.
type FileOperations interface {
ReadDir(path string) ([]os.DirEntry, error)
RemoveAll(path string) error
}

// osFileOperations is a passthrough implementation of the FileOperations interface that uses the os package
// functions.
type osFileOperations struct{}

func NewOSFileOperations() FileOperations {
return osFileOperations{}
}

func (s osFileOperations) ReadDir(path string) ([]os.DirEntry, error) {
return os.ReadDir(path)
}

func (s osFileOperations) RemoveAll(path string) error {
return os.RemoveAll(path)
}

// OrphanFileSweeper is a file cleanup utility that allows only one goroutine to attempt file cleanup at a time.
type OrphanFileSweeper struct {
lock *sync.Mutex
fileOps FileOperations
tempDirectoryRootPath string
}

func NewOrphanFileSweeper(fileOps FileOperations, tempDirectoryRootPath string) *OrphanFileSweeper {
return &OrphanFileSweeper{
lock: &sync.Mutex{},
fileOps: fileOps,
tempDirectoryRootPath: tempDirectoryRootPath,
}
}

// Clear takes a context and a list of expected file names. The function will list all directory entries within the
// configured tempDirectoryRootPath that the sweeper was instantiated with and compare the resulting list against the
// passed expected file names. The function will then call RemoveAll on each directory entry not found in the expected
// file name slice.
func (s *OrphanFileSweeper) Clear(ctx context.Context, expectedFileNames []string) {
// Only allow one background thread to run for clearing orphaned data/
if !s.lock.TryLock() {
return
}

// Release the lock once finished
defer s.lock.Unlock()

if dirEntries, err := s.fileOps.ReadDir(s.tempDirectoryRootPath); err != nil {
log.Errorf("Failed reading work directory %s: %v", s.tempDirectoryRootPath, err)
} else {
numDeleted := 0

// Remove expected files from the deletion list
for _, expectedFileName := range expectedFileNames {
for idx, dirEntry := range dirEntries {
if expectedFileName == dirEntry.Name() {
dirEntries = append(dirEntries[:idx], dirEntries[idx+1:]...)
}
}
}

for _, orphanedDirEntry := range dirEntries {
// Check for context cancellation before each file deletion
if ctx.Err() != nil {
break
}

fullPath := filepath.Join(s.tempDirectoryRootPath, orphanedDirEntry.Name())

if err := s.fileOps.RemoveAll(fullPath); err != nil {
log.Errorf("Failed removing orphaned file %s: %v", fullPath, err)
}

numDeleted += 1
}

if numDeleted > 0 {
log.Infof("Finished removing %d orphaned ingest files", numDeleted)
}
}
}
159 changes: 159 additions & 0 deletions cmd/api/src/daemons/datapipe/cleanup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2024 Specter Ops, Inc.
//
// Licensed under the Apache License, Version 2.0
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package datapipe_test

import (
"context"
"github.com/specterops/bloodhound/src/daemons/datapipe"
"github.com/specterops/bloodhound/src/daemons/datapipe/mocks"
"go.uber.org/mock/gomock"
"io/fs"
"os"
"path/filepath"
"sync"
"testing"
)

type dirEntry struct {
name string
isDir bool
mode fs.FileMode
info fs.FileInfo
infoErr error
}

func (s dirEntry) Name() string {
return s.name
}

func (s dirEntry) IsDir() bool {
return s.isDir
}

func (s dirEntry) Type() fs.FileMode {
return s.mode
}

func (s dirEntry) Info() (fs.FileInfo, error) {
return s.info, s.infoErr
}

func TestOrphanFileSweeper_Clear(t *testing.T) {
const workDir = "/fake/work/dir"

t.Run("Allow Only One Goroutine", func(t *testing.T) {
var (
mockCtrl = gomock.NewController(t)
mockFileOps = mocks.NewMockFileOperations(mockCtrl)
sweeper = datapipe.NewOrphanFileSweeper(mockFileOps, workDir)
wgCoordination = &sync.WaitGroup{}
wgReadDir = &sync.WaitGroup{}
)

defer mockCtrl.Finish()

// Prep the wait groups for coordination
wgCoordination.Add(1)
wgReadDir.Add(1)

mockFileOps.EXPECT().ReadDir(workDir).DoAndReturn(func(path string) ([]os.DirEntry, error) {
// Release the coordination wait group
wgCoordination.Done()

// Block on the readDir wait group
wgReadDir.Wait()

return nil, nil
})

// Launch the clear function in a goroutine. The wait group will cause this call to block
go sweeper.Clear(context.Background(), []string{})

// Wait for the go routine to reach the ReadDir function
wgCoordination.Wait()

// Run the clear function in the current thread context as this should exit without blocking
sweeper.Clear(context.Background(), []string{})

// Release the wait group to complete the test
wgReadDir.Done()
})

t.Run("Clear Orphan Files", func(t *testing.T) {
var (
mockCtrl = gomock.NewController(t)
mockFileOps = mocks.NewMockFileOperations(mockCtrl)
sweeper = datapipe.NewOrphanFileSweeper(mockFileOps, workDir)
)

defer mockCtrl.Finish()

mockFileOps.EXPECT().ReadDir(workDir).Return([]os.DirEntry{
dirEntry{
name: "1",
},
}, nil)

mockFileOps.EXPECT().RemoveAll(filepath.Join(workDir, "1")).Return(nil)

sweeper.Clear(context.Background(), []string{})
})

t.Run("Exclude Expected Files", func(t *testing.T) {
var (
mockCtrl = gomock.NewController(t)
mockFileOps = mocks.NewMockFileOperations(mockCtrl)
sweeper = datapipe.NewOrphanFileSweeper(mockFileOps, workDir)
)

defer mockCtrl.Finish()

mockFileOps.EXPECT().ReadDir(workDir).Return([]os.DirEntry{
dirEntry{
name: "1",
},
}, nil)

// This one is a negative assertion. Because we're passing in "1" to be excluded the listed dirEntries will be
// empty and therefore the sweeper MUST NOT call RemoveAll() on the FileOperations mock. If RemoveAll() is
// called then this test MUST fail.
sweeper.Clear(context.Background(), []string{"1"})
})

t.Run("Exit on Context Cancellation", func(t *testing.T) {
var (
mockCtrl = gomock.NewController(t)
mockFileOps = mocks.NewMockFileOperations(mockCtrl)
sweeper = datapipe.NewOrphanFileSweeper(mockFileOps, workDir)
)

defer mockCtrl.Finish()

mockFileOps.EXPECT().ReadDir(workDir).Return([]os.DirEntry{
dirEntry{
name: "1",
},
}, nil)

// Create a cancellable context and cancel it right away
ctx, done := context.WithCancel(context.Background())
done()

// When passed in with the cancelled context the sweeper should not call os.RemoveAll("1")
sweeper.Clear(ctx, []string{})
})
}
82 changes: 25 additions & 57 deletions cmd/api/src/daemons/datapipe/datapipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,13 @@
//
// SPDX-License-Identifier: Apache-2.0

//go:generate go run go.uber.org/mock/mockgen -copyright_file=../../../../../LICENSE.header -destination=./mocks/mock.go -package=mocks . Tasker
//go:generate go run go.uber.org/mock/mockgen -copyright_file=../../../../../LICENSE.header -destination=./mocks/tasker.go -package=mocks . Tasker
package datapipe

import (
"context"
"errors"
"github.com/specterops/bloodhound/src/bootstrap"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

Expand All @@ -47,15 +44,15 @@ type Tasker interface {
}

type Daemon struct {
db database.Database
graphdb graph.Database
cache cache.Cache
cfg config.Configuration
analysisRequested *atomic.Bool
tickInterval time.Duration
status model.DatapipeStatusWrapper
ctx context.Context
clearOrphanedFilesLock *sync.Mutex
db database.Database
graphdb graph.Database
cache cache.Cache
cfg config.Configuration
analysisRequested *atomic.Bool
tickInterval time.Duration
status model.DatapipeStatusWrapper
ctx context.Context
orphanedFileSweeper *OrphanFileSweeper
}

func (s *Daemon) Name() string {
Expand All @@ -64,14 +61,14 @@ func (s *Daemon) Name() string {

func NewDaemon(ctx context.Context, cfg config.Configuration, connections bootstrap.DatabaseConnections[*database.BloodhoundDB, *graph.DatabaseSwitch], cache cache.Cache, tickInterval time.Duration) *Daemon {
return &Daemon{
db: connections.RDMS,
graphdb: connections.Graph,
cache: cache,
cfg: cfg,
ctx: ctx,
analysisRequested: &atomic.Bool{},
clearOrphanedFilesLock: &sync.Mutex{},
tickInterval: tickInterval,
db: connections.RDMS,
graphdb: connections.Graph,
cache: cache,
cfg: cfg,
ctx: ctx,
analysisRequested: &atomic.Bool{},
orphanedFileSweeper: NewOrphanFileSweeper(NewOSFileOperations(), cfg.TempDirectory()),
tickInterval: tickInterval,
status: model.DatapipeStatusWrapper{
Status: model.DatapipeStatusIdle,
UpdatedAt: time.Now().UTC(),
Expand Down Expand Up @@ -105,7 +102,7 @@ func (s *Daemon) analyze() {
}

s.status.Update(model.DatapipeStatusAnalyzing, false)
log.LogAndMeasure(log.LevelInfo, "Graph Analysis")()
defer log.LogAndMeasure(log.LevelInfo, "Graph Analysis")()

if err := RunAnalysisOperations(s.ctx, s.db, s.graphdb, s.cfg); err != nil {
if errors.Is(err, ErrAnalysisFailed) {
Expand Down Expand Up @@ -190,44 +187,15 @@ func (s *Daemon) Stop(ctx context.Context) error {
}

func (s *Daemon) clearOrphanedData() {
// Only allow one background thread to run for clearing orphaned data
if !s.clearOrphanedFilesLock.TryLock() {
return
}

// Release the lock once finished
defer s.clearOrphanedFilesLock.Unlock()

relativeTmpDir := s.cfg.TempDirectory()

if orphanFiles, err := os.ReadDir(s.cfg.TempDirectory()); err != nil {
log.Errorf("Failed fetching available files: %v", err)
} else if ingestTasks, err := s.db.GetAllIngestTasks(); err != nil {
log.Errorf("Failed fetching available ingest tasks: %v", err)
if ingestTasks, err := s.db.GetAllIngestTasks(); err != nil {
log.Errorf("Failed fetching available file upload ingest tasks: %v", err)
} else {
for _, ingestTask := range ingestTasks {
for idx, orphanFile := range orphanFiles {
if ingestTask.FileName == filepath.Join(relativeTmpDir, orphanFile.Name()) {
orphanFiles = append(orphanFiles[:idx], orphanFiles[idx+1:]...)
}
}
}
expectedFiles := make([]string, len(ingestTasks))

for _, orphanFile := range orphanFiles {
fullPath := filepath.Join(relativeTmpDir, orphanFile.Name())

if err := os.RemoveAll(fullPath); err != nil {
log.Errorf("Failed removing file: %s", fullPath)
}

// Check to see if we need to exit after every file deletion
if s.ctx.Err() != nil {
return
}
for idx, ingestTask := range ingestTasks {
expectedFiles[idx] = ingestTask.FileName
}

if len(orphanFiles) > 0 {
log.Infof("Finished removing %d orphaned ingest files", len(orphanFiles))
}
go s.orphanedFileSweeper.Clear(s.ctx, expectedFiles)
}
}
Loading
Loading