Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into BED-4032
Browse files Browse the repository at this point in the history
  • Loading branch information
rvazarkar committed Jan 29, 2024
2 parents db3ccf1 + a232198 commit 4d7ddb6
Show file tree
Hide file tree
Showing 22 changed files with 682 additions and 243 deletions.
3 changes: 1 addition & 2 deletions cmd/api/src/api/v2/file_uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,9 @@ func (s Resources) EndFileUploadJob(response http.ResponseWriter, request *http.
api.HandleDatabaseError(request, response, err)
} else if fileUploadJob.Status != model.JobStatusRunning {
api.WriteErrorResponse(request.Context(), api.BuildErrorResponse(http.StatusBadRequest, "job must be in running status to end", request), response)
} else if fileUploadJob, err := fileupload.EndFileUploadJob(s.DB, fileUploadJob); err != nil {
} else if err := fileupload.EndFileUploadJob(s.DB, fileUploadJob); err != nil {
api.HandleDatabaseError(request, response, err)
} else {
s.TaskNotifier.NotifyOfFileUploadJobStatus(fileUploadJob)
response.WriteHeader(http.StatusOK)
}
}
9 changes: 4 additions & 5 deletions cmd/api/src/api/v2/file_uploads_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
// Copyright 2023 Specter Ops, Inc.
//
//
// Licensed under the Apache License, Version 2.0
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
// SPDX-License-Identifier: Apache-2.0

package v2_test
Expand Down Expand Up @@ -216,7 +216,6 @@ func TestResources_EndFileUploadJob(t *testing.T) {
Status: model.JobStatusRunning,
}, nil)
mockDB.EXPECT().UpdateFileUploadJob(gomock.Any()).Return(nil)
mockTasker.EXPECT().NotifyOfFileUploadJobStatus(gomock.Any())
},
Test: func(output apitest.Output) {
apitest.StatusCode(output, http.StatusOK)
Expand Down
90 changes: 50 additions & 40 deletions cmd/api/src/daemons/datapipe/datapipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/specterops/bloodhound/cache"
Expand All @@ -40,24 +41,19 @@ const (
)

type Tasker interface {
NotifyOfFileUploadJobStatus(task model.FileUploadJob)
RequestAnalysis()
GetStatus() model.DatapipeStatusWrapper
}

type Daemon struct {
db database.Database
graphdb graph.Database
cache cache.Cache
cfg config.Configuration
analysisRequested bool
tickInterval time.Duration
status model.DatapipeStatusWrapper
ctx context.Context
fileUploadJobIDsUnderAnalysis []int64
completedFileUploadJobIDs []int64

lock *sync.Mutex
db database.Database
graphdb graph.Database
cache cache.Cache
cfg config.Configuration
analysisRequested *atomic.Bool
tickInterval time.Duration
status model.DatapipeStatusWrapper
ctx context.Context
clearOrphanedFilesLock *sync.Mutex
}

Expand All @@ -67,14 +63,12 @@ func (s *Daemon) Name() string {

func NewDaemon(ctx context.Context, cfg config.Configuration, connections bootstrap.DatabaseConnections[*database.BloodhoundDB, *graph.DatabaseSwitch], cache cache.Cache, tickInterval time.Duration) *Daemon {
return &Daemon{
db: connections.RDMS,
graphdb: connections.Graph,
cache: cache,
cfg: cfg,
ctx: ctx,

analysisRequested: false,
lock: &sync.Mutex{},
db: connections.RDMS,
graphdb: connections.Graph,
cache: cache,
cfg: cfg,
ctx: ctx,
analysisRequested: &atomic.Bool{},
clearOrphanedFilesLock: &sync.Mutex{},
tickInterval: tickInterval,
status: model.DatapipeStatusWrapper{
Expand All @@ -93,42 +87,41 @@ func (s *Daemon) GetStatus() model.DatapipeStatusWrapper {
}

func (s *Daemon) getAnalysisRequested() bool {
s.lock.Lock()
defer s.lock.Unlock()
return s.analysisRequested
return s.analysisRequested.Load()
}

func (s *Daemon) setAnalysisRequested(requested bool) {
s.lock.Lock()
defer s.lock.Unlock()
s.analysisRequested = requested
s.analysisRequested.Store(requested)
}

func (s *Daemon) analyze() {
// Ensure that the user-requested analysis switch is flipped back to false. This is done at the beginning of the
// function so that any re-analysis requests are caught while analysis is in-progress.
s.setAnalysisRequested(false)

if s.cfg.DisableAnalysis {
return
}

s.status.Update(model.DatapipeStatusAnalyzing, false)
log.Measure(log.LevelInfo, "Starting analysis")()
log.LogAndMeasure(log.LevelInfo, "Graph Analysis")()

if err := RunAnalysisOperations(s.ctx, s.db, s.graphdb, s.cfg); err != nil {
log.Errorf("Analysis failed: %v", err)
log.Errorf("Graph analysis failed: %v", err)
s.failJobsUnderAnalysis()

s.status.Update(model.DatapipeStatusIdle, false)
} else {
s.completeJobsUnderAnalysis()

if entityPanelCachingFlag, err := s.db.GetFlagByKey(appcfg.FeatureEntityPanelCaching); err != nil {
log.Errorf("Error retrieving entity panel caching flag: %v", err)
} else {
resetCache(s.cache, entityPanelCachingFlag.Enabled)
}
s.clearJobsFromAnalysis()
log.Measure(log.LevelInfo, "Analysis run finished")()

s.status.Update(model.DatapipeStatusIdle, true)
}

s.setAnalysisRequested(false)
}

func resetCache(cacher cache.Cache, cacheEnabled bool) {
Expand All @@ -143,10 +136,22 @@ func (s *Daemon) ingestAvailableTasks() {
if ingestTasks, err := s.db.GetAllIngestTasks(); err != nil {
log.Errorf("Failed fetching available ingest tasks: %v", err)
} else {
s.processIngestTasks(ingestTasks)
s.processIngestTasks(s.ctx, ingestTasks)
}
}

func (s *Daemon) getNumJobsWaitingForAnalysis() (int, error) {
numJobsWaitingForAnalysis := 0

if fileUploadJobsUnderAnalysis, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil {
return 0, err
} else {
numJobsWaitingForAnalysis += len(fileUploadJobsUnderAnalysis)
}

return numJobsWaitingForAnalysis, nil
}

func (s *Daemon) Start() {
var (
datapipeLoopTimer = time.NewTimer(s.tickInterval)
Expand All @@ -164,15 +169,20 @@ func (s *Daemon) Start() {
s.clearOrphanedData()

case <-datapipeLoopTimer.C:
// Ingest all available ingest tasks
s.ingestAvailableTasks()

// Manage time-out state progression for file upload jobs
fileupload.ProcessStaleFileUploadJobs(s.db)

if s.numAvailableCompletedFileUploadJobs() > 0 {
s.processCompletedFileUploadJobs()
s.analyze()
} else if s.getAnalysisRequested() {
// Manage nominal state transitions for file upload jobs
s.processIngestedFileUploadJobs()

// If there are completed file upload jobs or if analysis was user-requested, perform analysis.
if numJobsWaitingForAnalysis, err := s.getNumJobsWaitingForAnalysis(); err != nil {
log.Errorf("Failed looking up jobs waiting for analysis: %v", err)
} else if numJobsWaitingForAnalysis > 0 || s.getAnalysisRequested() {
s.analyze()
} else {
s.ingestAvailableTasks()
}

datapipeLoopTimer.Reset(s.tickInterval)
Expand Down
117 changes: 60 additions & 57 deletions cmd/api/src/daemons/datapipe/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package datapipe

import (
"context"
"os"

"github.com/specterops/bloodhound/dawgs/graph"
Expand All @@ -25,78 +26,88 @@ import (
"github.com/specterops/bloodhound/src/services/fileupload"
)

func (s *Daemon) numAvailableCompletedFileUploadJobs() int {
s.lock.Lock()
defer s.lock.Unlock()

return len(s.completedFileUploadJobIDs)
}

func (s *Daemon) failJobsUnderAnalysis() {
for _, jobID := range s.fileUploadJobIDsUnderAnalysis {
if err := fileupload.FailFileUploadJob(s.db, jobID, "Analysis failed"); err != nil {
log.Errorf("Failed updating job %d to failed status: %v", jobID, err)
if fileUploadJobsUnderAnalysis, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil {
log.Errorf("Failed to load file upload jobs under analysis: %v", err)
} else {
for _, job := range fileUploadJobsUnderAnalysis {
if err := fileupload.FailFileUploadJob(s.db, job.ID, "Analysis failed"); err != nil {
log.Errorf("Failed updating file upload job %d to failed status: %v", job.ID, err)
}
}
}

s.clearJobsFromAnalysis()
}

func (s *Daemon) clearJobsFromAnalysis() {
s.lock.Lock()
s.fileUploadJobIDsUnderAnalysis = s.fileUploadJobIDsUnderAnalysis[:0]
s.lock.Unlock()
}

func (s *Daemon) processCompletedFileUploadJobs() {
completedJobIDs := s.getAndTransitionCompletedJobIDs()

for _, id := range completedJobIDs {
if ingestTasks, err := s.db.GetIngestTasksForJob(id); err != nil {
log.Errorf("Failed fetching available ingest tasks: %v", err)
} else {
s.processIngestTasks(ingestTasks)
func (s *Daemon) completeJobsUnderAnalysis() {
if fileUploadJobsUnderAnalysis, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil {
log.Errorf("Failed to load file upload jobs under analysis: %v", err)
} else {
for _, job := range fileUploadJobsUnderAnalysis {
if err := fileupload.UpdateFileUploadJobStatus(s.db, job, model.JobStatusComplete, "Complete"); err != nil {
log.Errorf("Error updating fileupload job %d: %v", job.ID, err)
}
}
}
}

if err := fileupload.UpdateFileUploadJobStatus(s.db, id, model.JobStatusComplete, "Complete"); err != nil {
log.Errorf("Error updating fileupload job %d: %v", id, err)
func (s *Daemon) processIngestedFileUploadJobs() {
if ingestedFileUploadJobs, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusIngesting); err != nil {
log.Errorf("Failed to look up finished file upload jobs: %v", err)
} else {
for _, ingestedFileUploadJob := range ingestedFileUploadJobs {
if err := fileupload.UpdateFileUploadJobStatus(s.db, ingestedFileUploadJob, model.JobStatusAnalyzing, "Analyzing"); err != nil {
log.Errorf("Error updating fileupload job %d: %v", ingestedFileUploadJob.ID, err)
}
}
}
}

func (s *Daemon) getAndTransitionCompletedJobIDs() []int64 {
s.lock.Lock()
defer s.lock.Unlock()
// clearFileTask removes a generic file upload task for ingested data.
func (s *Daemon) clearFileTask(ingestTask model.IngestTask) {
if err := s.db.DeleteIngestTask(ingestTask); err != nil {
log.Errorf("Error removing file upload task from db: %v", err)
}
}

// transition completed jobs to analysis
s.fileUploadJobIDsUnderAnalysis = append(s.fileUploadJobIDsUnderAnalysis, s.completedFileUploadJobIDs...)
s.completedFileUploadJobIDs = s.completedFileUploadJobIDs[:0]
func (s *Daemon) processIngestFile(ctx context.Context, path string) error {
if jsonFile, err := os.Open(path); err != nil {
return err
} else {
defer func() {
if err := jsonFile.Close(); err != nil {
log.Errorf("Failed closing ingest file %s: %v", path, err)
}
}()

return s.fileUploadJobIDsUnderAnalysis
return s.graphdb.BatchOperation(ctx, func(batch graph.Batch) error {
if err := s.ReadWrapper(batch, jsonFile); err != nil {
return err
} else {
return nil
}
})
}
}

func (s *Daemon) processIngestTasks(ingestTasks model.IngestTasks) {
// processIngestTasks covers the generic file upload case for ingested data.
func (s *Daemon) processIngestTasks(ctx context.Context, ingestTasks model.IngestTasks) {
s.status.Update(model.DatapipeStatusIngesting, false)
defer s.status.Update(model.DatapipeStatusIdle, false)

for _, ingestTask := range ingestTasks {
jsonFile, err := os.Open(ingestTask.FileName)
if err != nil {
log.Errorf("Error reading file for ingest task %v: %v", ingestTask.ID, err)
// Check the context to see if we should continue processing ingest tasks. This has to be explicit since error
// handling assumes that all failures should be logged and not returned.
select {
case <-ctx.Done():
return
default:
}

if err = s.graphdb.BatchOperation(s.ctx, func(batch graph.Batch) error {
if err := s.ReadWrapper(batch, jsonFile); err != nil {
return err
} else {
return nil
}
}); err != nil {
log.Errorf("Error processing ingest task %v: %v", ingestTask.ID, err)
if err := s.processIngestFile(ctx, ingestTask.FileName); err != nil {
log.Errorf("Failed processing ingest task %d with file %s: %v", ingestTask.ID, ingestTask.FileName, err)
}

s.clearTask(ingestTask)
jsonFile.Close()
s.clearFileTask(ingestTask)
}
}

Expand All @@ -105,11 +116,3 @@ func (s *Daemon) clearTask(ingestTask model.IngestTask) {
log.Errorf("Error removing task from db: %v", err)
}
}

func (s *Daemon) NotifyOfFileUploadJobStatus(job model.FileUploadJob) {
if job.Status == model.JobStatusIngesting {
s.lock.Lock()
s.completedFileUploadJobIDs = append(s.completedFileUploadJobIDs, job.ID)
s.lock.Unlock()
}
}
12 changes: 0 additions & 12 deletions cmd/api/src/daemons/datapipe/mocks/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4d7ddb6

Please sign in to comment.