Skip to content

Commit

Permalink
Datapipe State Transition Hardening and ADCS Post Processing Fixes (#363
Browse files Browse the repository at this point in the history
)

* chore+fix: add a guard for a job state transition race and push parallel operation creation in adcs down to avoid opening too many connections

* chore: testing
  • Loading branch information
zinic authored Jan 30, 2024
1 parent 1be5827 commit 3b4d780
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 44 deletions.
30 changes: 8 additions & 22 deletions cmd/api/src/daemons/datapipe/datapipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ func (s *Daemon) analyze() {

if err := RunAnalysisOperations(s.ctx, s.db, s.graphdb, s.cfg); err != nil {
log.Errorf("Graph analysis failed: %v", err)
s.failJobsUnderAnalysis()
FailAnalyzedFileUploadJobs(s.ctx, s.db)

s.status.Update(model.DatapipeStatusIdle, false)
} else {
s.completeJobsUnderAnalysis()
CompleteAnalyzedFileUploadJobs(s.ctx, s.db)

if entityPanelCachingFlag, err := s.db.GetFlagByKey(appcfg.FeatureEntityPanelCaching); err != nil {
log.Errorf("Error retrieving entity panel caching flag: %v", err)
Expand Down Expand Up @@ -140,18 +140,6 @@ func (s *Daemon) ingestAvailableTasks() {
}
}

func (s *Daemon) getNumJobsWaitingForAnalysis() (int, error) {
numJobsWaitingForAnalysis := 0

if fileUploadJobsUnderAnalysis, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil {
return 0, err
} else {
numJobsWaitingForAnalysis += len(fileUploadJobsUnderAnalysis)
}

return numJobsWaitingForAnalysis, nil
}

func (s *Daemon) Start() {
var (
datapipeLoopTimer = time.NewTimer(s.tickInterval)
Expand All @@ -173,15 +161,15 @@ func (s *Daemon) Start() {
s.ingestAvailableTasks()

// Manage time-out state progression for file upload jobs
fileupload.ProcessStaleFileUploadJobs(s.db)
fileupload.ProcessStaleFileUploadJobs(s.ctx, s.db)

// Manage nominal state transitions for file upload jobs
s.processIngestedFileUploadJobs()
ProcessIngestedFileUploadJobs(s.ctx, s.db)

// If there are completed file upload jobs or if analysis was user-requested, perform analysis.
if numJobsWaitingForAnalysis, err := s.getNumJobsWaitingForAnalysis(); err != nil {
if hasJobsWaitingForAnalysis, err := HasFileUploadJobsWaitingForAnalysis(s.db); err != nil {
log.Errorf("Failed looking up jobs waiting for analysis: %v", err)
} else if numJobsWaitingForAnalysis > 0 || s.getAnalysisRequested() {
} else if hasJobsWaitingForAnalysis || s.getAnalysisRequested() {
s.analyze()
}

Expand Down Expand Up @@ -228,11 +216,9 @@ func (s *Daemon) clearOrphanedData() {
log.Errorf("Failed removing file: %s", fullPath)
}

// Check to see if we need to shutdown after every file deletion
select {
case <-s.ctx.Done():
// Check to see if we need to exit after every file deletion
if s.ctx.Err() != nil {
return
default:
}
}

Expand Down
57 changes: 43 additions & 14 deletions cmd/api/src/daemons/datapipe/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package datapipe

import (
"context"
"github.com/specterops/bloodhound/src/database"
"os"

"github.com/specterops/bloodhound/dawgs/graph"
Expand All @@ -26,37 +27,67 @@ import (
"github.com/specterops/bloodhound/src/services/fileupload"
)

func (s *Daemon) failJobsUnderAnalysis() {
if fileUploadJobsUnderAnalysis, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil {
func HasFileUploadJobsWaitingForAnalysis(db database.Database) (bool, error) {
if fileUploadJobsUnderAnalysis, err := db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil {
return false, err
} else {
return len(fileUploadJobsUnderAnalysis) > 0, nil
}
}

func FailAnalyzedFileUploadJobs(ctx context.Context, db database.Database) {
// Because our database interfaces do not yet accept contexts this is a best-effort check to ensure that we do not
// commit state transitions when we are shutting down.
if ctx.Err() != nil {
return
}

if fileUploadJobsUnderAnalysis, err := db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil {
log.Errorf("Failed to load file upload jobs under analysis: %v", err)
} else {
for _, job := range fileUploadJobsUnderAnalysis {
if err := fileupload.FailFileUploadJob(s.db, job.ID, "Analysis failed"); err != nil {
if err := fileupload.UpdateFileUploadJobStatus(db, job, model.JobStatusFailed, "Analysis failed"); err != nil {
log.Errorf("Failed updating file upload job %d to failed status: %v", job.ID, err)
}
}
}
}

func (s *Daemon) completeJobsUnderAnalysis() {
if fileUploadJobsUnderAnalysis, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil {
func CompleteAnalyzedFileUploadJobs(ctx context.Context, db database.Database) {
// Because our database interfaces do not yet accept contexts this is a best-effort check to ensure that we do not
// commit state transitions when we are shutting down.
if ctx.Err() != nil {
return
}

if fileUploadJobsUnderAnalysis, err := db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil {
log.Errorf("Failed to load file upload jobs under analysis: %v", err)
} else {
for _, job := range fileUploadJobsUnderAnalysis {
if err := fileupload.UpdateFileUploadJobStatus(s.db, job, model.JobStatusComplete, "Complete"); err != nil {
if err := fileupload.UpdateFileUploadJobStatus(db, job, model.JobStatusComplete, "Complete"); err != nil {
log.Errorf("Error updating fileupload job %d: %v", job.ID, err)
}
}
}
}

func (s *Daemon) processIngestedFileUploadJobs() {
if ingestedFileUploadJobs, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusIngesting); err != nil {
func ProcessIngestedFileUploadJobs(ctx context.Context, db database.Database) {
// Because our database interfaces do not yet accept contexts this is a best-effort check to ensure that we do not
// commit state transitions when shutting down.
if ctx.Err() != nil {
return
}

if ingestingFileUploadJobs, err := db.GetFileUploadJobsWithStatus(model.JobStatusIngesting); err != nil {
log.Errorf("Failed to look up finished file upload jobs: %v", err)
} else {
for _, ingestedFileUploadJob := range ingestedFileUploadJobs {
if err := fileupload.UpdateFileUploadJobStatus(s.db, ingestedFileUploadJob, model.JobStatusAnalyzing, "Analyzing"); err != nil {
log.Errorf("Error updating fileupload job %d: %v", ingestedFileUploadJob.ID, err)
for _, ingestingFileUploadJob := range ingestingFileUploadJobs {
if remainingIngestTasks, err := db.GetIngestTasksForJob(ingestingFileUploadJob.ID); err != nil {
log.Errorf("Failed looking up remaining ingest tasks for file upload job %d: %v", ingestingFileUploadJob.ID, err)
} else if len(remainingIngestTasks) == 0 {
if err := fileupload.UpdateFileUploadJobStatus(db, ingestingFileUploadJob, model.JobStatusAnalyzing, "Analyzing"); err != nil {
log.Errorf("Error updating fileupload job %d: %v", ingestingFileUploadJob.ID, err)
}
}
}
}
Expand Down Expand Up @@ -97,10 +128,8 @@ func (s *Daemon) processIngestTasks(ctx context.Context, ingestTasks model.Inges
for _, ingestTask := range ingestTasks {
// Check the context to see if we should continue processing ingest tasks. This has to be explicit since error
// handling assumes that all failures should be logged and not returned.
select {
case <-ctx.Done():
if ctx.Err() != nil {
return
default:
}

if err := s.processIngestFile(ctx, ingestTask.FileName); err != nil {
Expand Down
133 changes: 133 additions & 0 deletions cmd/api/src/daemons/datapipe/jobs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package datapipe_test

import (
"context"
"github.com/specterops/bloodhound/src/daemons/datapipe"
"github.com/specterops/bloodhound/src/database/mocks"
"github.com/specterops/bloodhound/src/model"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"testing"
)

func TestHasJobsWaitingForAnalysis(t *testing.T) {
var (
mockCtrl = gomock.NewController(t)
dbMock = mocks.NewMockDatabase(mockCtrl)
)

defer mockCtrl.Finish()

t.Run("Has Jobs Waiting for Analysis", func(t *testing.T) {
dbMock.EXPECT().GetFileUploadJobsWithStatus(model.JobStatusAnalyzing).Return([]model.FileUploadJob{{}}, nil)

hasJobs, err := datapipe.HasFileUploadJobsWaitingForAnalysis(dbMock)

require.True(t, hasJobs)
require.Nil(t, err)
})

t.Run("Has No Jobs Waiting for Analysis", func(t *testing.T) {
dbMock.EXPECT().GetFileUploadJobsWithStatus(model.JobStatusAnalyzing).Return([]model.FileUploadJob{}, nil)

hasJobs, err := datapipe.HasFileUploadJobsWaitingForAnalysis(dbMock)

require.False(t, hasJobs)
require.Nil(t, err)
})
}

func TestFailAnalyzedFileUploadJobs(t *testing.T) {
const jobID int64 = 1

var (
mockCtrl = gomock.NewController(t)
dbMock = mocks.NewMockDatabase(mockCtrl)
)

defer mockCtrl.Finish()

t.Run("Fail Analyzed File Upload Jobs", func(t *testing.T) {
dbMock.EXPECT().GetFileUploadJobsWithStatus(model.JobStatusAnalyzing).Return([]model.FileUploadJob{{
BigSerial: model.BigSerial{
ID: jobID,
},
Status: model.JobStatusAnalyzing,
}}, nil)

dbMock.EXPECT().UpdateFileUploadJob(gomock.Any()).DoAndReturn(func(fileUploadJob model.FileUploadJob) error {
require.Equal(t, model.JobStatusFailed, fileUploadJob.Status)
return nil
})

datapipe.FailAnalyzedFileUploadJobs(context.Background(), dbMock)
})
}

func TestCompleteAnalyzedFileUploadJobs(t *testing.T) {
const jobID int64 = 1

var (
mockCtrl = gomock.NewController(t)
dbMock = mocks.NewMockDatabase(mockCtrl)
)

defer mockCtrl.Finish()

t.Run("Complete Analyzed File Upload Jobs", func(t *testing.T) {
dbMock.EXPECT().GetFileUploadJobsWithStatus(model.JobStatusAnalyzing).Return([]model.FileUploadJob{{
BigSerial: model.BigSerial{
ID: jobID,
},
Status: model.JobStatusAnalyzing,
}}, nil)

dbMock.EXPECT().UpdateFileUploadJob(gomock.Any()).DoAndReturn(func(fileUploadJob model.FileUploadJob) error {
require.Equal(t, model.JobStatusComplete, fileUploadJob.Status)
return nil
})

datapipe.CompleteAnalyzedFileUploadJobs(context.Background(), dbMock)
})
}

func TestProcessIngestedFileUploadJobs(t *testing.T) {
const jobID int64 = 1

var (
mockCtrl = gomock.NewController(t)
dbMock = mocks.NewMockDatabase(mockCtrl)
)

defer mockCtrl.Finish()

t.Run("Transition Jobs with No Remaining Ingest Tasks", func(t *testing.T) {
dbMock.EXPECT().GetFileUploadJobsWithStatus(model.JobStatusIngesting).Return([]model.FileUploadJob{{
BigSerial: model.BigSerial{
ID: jobID,
},
Status: model.JobStatusIngesting,
}}, nil)

dbMock.EXPECT().GetIngestTasksForJob(jobID).Return([]model.IngestTask{}, nil)
dbMock.EXPECT().UpdateFileUploadJob(gomock.Any()).DoAndReturn(func(fileUploadJob model.FileUploadJob) error {
require.Equal(t, model.JobStatusAnalyzing, fileUploadJob.Status)
return nil
})

datapipe.ProcessIngestedFileUploadJobs(context.Background(), dbMock)
})

t.Run("Don't Transition Jobs with Remaining Ingest Tasks", func(t *testing.T) {
dbMock.EXPECT().GetFileUploadJobsWithStatus(model.JobStatusIngesting).Return([]model.FileUploadJob{{
BigSerial: model.BigSerial{
ID: jobID,
},
Status: model.JobStatusIngesting,
}}, nil)

dbMock.EXPECT().GetIngestTasksForJob(jobID).Return([]model.IngestTask{{}}, nil)

datapipe.ProcessIngestedFileUploadJobs(context.Background(), dbMock)
})
}
9 changes: 8 additions & 1 deletion cmd/api/src/services/fileupload/file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package fileupload

import (
"context"
"errors"
"fmt"
"io"
Expand All @@ -42,7 +43,13 @@ type FileUploadData interface {
GetFileUploadJobsWithStatus(status model.JobStatus) ([]model.FileUploadJob, error)
}

func ProcessStaleFileUploadJobs(db FileUploadData) {
func ProcessStaleFileUploadJobs(ctx context.Context, db FileUploadData) {
// Because our database interfaces do not yet accept contexts this is a best-effort check to ensure that we do not
// commit state transitions when shutting down.
if ctx.Err() != nil {
return
}

var (
now = time.Now().UTC()
threshold = now.Add(-jobActivityTimeout)
Expand Down
10 changes: 3 additions & 7 deletions packages/go/analysis/ad/adcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,29 +449,25 @@ func PostADCS(ctx context.Context, db graph.Database, groupExpansions impact.Pat
if !adcsEnabled {
return &analysis.AtomicPostProcessingStats{}, nil
}
operation := analysis.NewPostRelationshipOperation(ctx, db, "ADCS Post Processing")

if enterpriseCertAuthorities, err := FetchNodesByKind(ctx, db, ad.EnterpriseCA); err != nil {
operation.Done()
return &analysis.AtomicPostProcessingStats{}, fmt.Errorf("failed fetching enterpriseCA nodes: %w", err)
} else if rootCertAuthorities, err := FetchNodesByKind(ctx, db, ad.RootCA); err != nil {
operation.Done()
return &analysis.AtomicPostProcessingStats{}, fmt.Errorf("failed fetching rootCA nodes: %w", err)
} else if certTemplates, err := FetchNodesByKind(ctx, db, ad.CertTemplate); err != nil {
operation.Done()
return &analysis.AtomicPostProcessingStats{}, fmt.Errorf("failed fetching cert template nodes: %w", err)
} else if domains, err := FetchNodesByKind(ctx, db, ad.Domain); err != nil {
operation.Done()
return &analysis.AtomicPostProcessingStats{}, fmt.Errorf("failed fetching domain nodes: %w", err)
} else if step1Stats, err := postADCSPreProcessStep1(ctx, db, enterpriseCertAuthorities, rootCertAuthorities); err != nil {
operation.Done()
return &analysis.AtomicPostProcessingStats{}, fmt.Errorf("failed adcs pre-processing step 1: %w", err)
} else if step2Stats, err := postADCSPreProcessStep2(ctx, db, certTemplates); err != nil {
operation.Done()
return &analysis.AtomicPostProcessingStats{}, fmt.Errorf("failed adcs pre-processing step 2: %w", err)
} else {
operation := analysis.NewPostRelationshipOperation(ctx, db, "ADCS Post Processing")

operation.Stats.Merge(step1Stats)
operation.Stats.Merge(step2Stats)

var cache = NewADCSCache()
cache.BuildCache(ctx, db, enterpriseCertAuthorities, certTemplates)

Expand Down

0 comments on commit 3b4d780

Please sign in to comment.