Skip to content

Commit

Permalink
chore: remove unnecessary complexity in the data pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
zinic committed Jan 26, 2024
1 parent f099e37 commit 6916687
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 91 deletions.
3 changes: 1 addition & 2 deletions cmd/api/src/api/v2/file_uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,9 @@ func (s Resources) EndFileUploadJob(response http.ResponseWriter, request *http.
api.HandleDatabaseError(request, response, err)
} else if fileUploadJob.Status != model.JobStatusRunning {
api.WriteErrorResponse(request.Context(), api.BuildErrorResponse(http.StatusBadRequest, "job must be in running status to end", request), response)
} else if fileUploadJob, err := fileupload.EndFileUploadJob(s.DB, fileUploadJob); err != nil {
} else if err := fileupload.EndFileUploadJob(s.DB, fileUploadJob); err != nil {
api.HandleDatabaseError(request, response, err)
} else {
s.TaskNotifier.NotifyOfFileUploadJobStatus(fileUploadJob)
response.WriteHeader(http.StatusOK)
}
}
9 changes: 4 additions & 5 deletions cmd/api/src/api/v2/file_uploads_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
// Copyright 2023 Specter Ops, Inc.
//
//
// Licensed under the Apache License, Version 2.0
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
// SPDX-License-Identifier: Apache-2.0

package v2_test
Expand Down Expand Up @@ -216,7 +216,6 @@ func TestResources_EndFileUploadJob(t *testing.T) {
Status: model.JobStatusRunning,
}, nil)
mockDB.EXPECT().UpdateFileUploadJob(gomock.Any()).Return(nil)
mockTasker.EXPECT().NotifyOfFileUploadJobStatus(gomock.Any())
},
Test: func(output apitest.Output) {
apitest.StatusCode(output, http.StatusOK)
Expand Down
56 changes: 33 additions & 23 deletions cmd/api/src/daemons/datapipe/datapipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,20 @@ const (
)

type Tasker interface {
NotifyOfFileUploadJobStatus(task model.FileUploadJob)
RequestAnalysis()
GetStatus() model.DatapipeStatusWrapper
}

type Daemon struct {
db database.Database
graphdb graph.Database
cache cache.Cache
cfg config.Configuration
analysisRequested bool
tickInterval time.Duration
status model.DatapipeStatusWrapper
ctx context.Context
fileUploadJobIDsUnderAnalysis []int64
completedFileUploadJobIDs []int64
db database.Database
graphdb graph.Database
cache cache.Cache
cfg config.Configuration
analysisRequested bool
tickInterval time.Duration
status model.DatapipeStatusWrapper
ctx context.Context
fileUploadJobsUnderAnalysisByID []int64

lock *sync.Mutex
clearOrphanedFilesLock *sync.Mutex
Expand Down Expand Up @@ -104,16 +102,28 @@ func (s *Daemon) setAnalysisRequested(requested bool) {
s.analysisRequested = requested
}

func (s *Daemon) finishAnalysis() {
// Clear in-memory tracking of any file upload jobs that made it to this analysis run
s.fileUploadJobsUnderAnalysisByID = s.fileUploadJobsUnderAnalysisByID[:0]
}

func (s *Daemon) analyze() {
// Ensure that the user-requested analysis switch is flipped back to false. This is done at the beginning of the
// function so that any re-analysis requests are caught while analysis is in-progress.
s.setAnalysisRequested(false)

// Make sure to clean up in-memory state on exit of this function.
defer s.finishAnalysis()

if s.cfg.DisableAnalysis {
return
}

s.status.Update(model.DatapipeStatusAnalyzing, false)
log.Measure(log.LevelInfo, "Starting analysis")()
log.LogAndMeasure(log.LevelInfo, "Graph Analysis")()

if err := RunAnalysisOperations(s.ctx, s.db, s.graphdb, s.cfg); err != nil {
log.Errorf("Analysis failed: %v", err)
log.Errorf("Graph analysis failed: %v", err)
s.failJobsUnderAnalysis()

s.status.Update(model.DatapipeStatusIdle, false)
Expand All @@ -123,12 +133,9 @@ func (s *Daemon) analyze() {
} else {
resetCache(s.cache, entityPanelCachingFlag.Enabled)
}
s.clearJobsFromAnalysis()
log.Measure(log.LevelInfo, "Analysis run finished")()

s.status.Update(model.DatapipeStatusIdle, true)
}

s.setAnalysisRequested(false)
}

func resetCache(cacher cache.Cache, cacheEnabled bool) {
Expand Down Expand Up @@ -164,15 +171,18 @@ func (s *Daemon) Start() {
s.clearOrphanedData()

case <-datapipeLoopTimer.C:
// Ingest all available ingest tasks
s.ingestAvailableTasks()

// Manage time-out state progression for file upload jobs
fileupload.ProcessStaleFileUploadJobs(s.db)

if s.numAvailableCompletedFileUploadJobs() > 0 {
s.processCompletedFileUploadJobs()
s.analyze()
} else if s.getAnalysisRequested() {
// Manage nominal state transitions for file upload jobs
s.processCompletedFileUploadJobs()

// If there are completed client or file upload jobs or if analysis was user-requested, perform analysis
if len(s.fileUploadJobsUnderAnalysisByID) > 0 || s.getAnalysisRequested() {
s.analyze()
} else {
s.ingestAvailableTasks()
}

datapipeLoopTimer.Reset(s.tickInterval)
Expand Down
54 changes: 9 additions & 45 deletions cmd/api/src/daemons/datapipe/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,56 +25,28 @@ import (
"github.com/specterops/bloodhound/src/services/fileupload"
)

func (s *Daemon) numAvailableCompletedFileUploadJobs() int {
s.lock.Lock()
defer s.lock.Unlock()

return len(s.completedFileUploadJobIDs)
}

func (s *Daemon) failJobsUnderAnalysis() {
for _, jobID := range s.fileUploadJobIDsUnderAnalysis {
for _, jobID := range s.fileUploadJobsUnderAnalysisByID {
if err := fileupload.FailFileUploadJob(s.db, jobID, "Analysis failed"); err != nil {
log.Errorf("Failed updating job %d to failed status: %v", jobID, err)
}
}

s.clearJobsFromAnalysis()
}

func (s *Daemon) clearJobsFromAnalysis() {
s.lock.Lock()
s.fileUploadJobIDsUnderAnalysis = s.fileUploadJobIDsUnderAnalysis[:0]
s.lock.Unlock()
}

func (s *Daemon) processCompletedFileUploadJobs() {
completedJobIDs := s.getAndTransitionCompletedJobIDs()

for _, id := range completedJobIDs {
if ingestTasks, err := s.db.GetIngestTasksForJob(id); err != nil {
log.Errorf("Failed fetching available ingest tasks: %v", err)
} else {
s.processIngestTasks(ingestTasks)
}
if completedFileUploadJobs, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusIngesting); err != nil {
log.Errorf("Failed to look up finished file upload jobs: %v", err)
} else {
for _, completedFileUploadJob := range completedFileUploadJobs {
if err := fileupload.UpdateFileUploadJobStatus(s.db, completedFileUploadJob.ID, model.JobStatusComplete, "Complete"); err != nil {
log.Errorf("Error updating fileupload job %d: %v", completedFileUploadJob.ID, err)
}

if err := fileupload.UpdateFileUploadJobStatus(s.db, id, model.JobStatusComplete, "Complete"); err != nil {
log.Errorf("Error updating fileupload job %d: %v", id, err)
s.fileUploadJobsUnderAnalysisByID = append(s.fileUploadJobsUnderAnalysisByID, completedFileUploadJob.ID)
}
}
}

func (s *Daemon) getAndTransitionCompletedJobIDs() []int64 {
s.lock.Lock()
defer s.lock.Unlock()

// transition completed jobs to analysis
s.fileUploadJobIDsUnderAnalysis = append(s.fileUploadJobIDsUnderAnalysis, s.completedFileUploadJobIDs...)
s.completedFileUploadJobIDs = s.completedFileUploadJobIDs[:0]

return s.fileUploadJobIDsUnderAnalysis
}

func (s *Daemon) processIngestTasks(ingestTasks model.IngestTasks) {
s.status.Update(model.DatapipeStatusIngesting, false)
defer s.status.Update(model.DatapipeStatusIdle, false)
Expand Down Expand Up @@ -105,11 +77,3 @@ func (s *Daemon) clearTask(ingestTask model.IngestTask) {
log.Errorf("Error removing task from db: %v", err)
}
}

func (s *Daemon) NotifyOfFileUploadJobStatus(job model.FileUploadJob) {
if job.Status == model.JobStatusIngesting {
s.lock.Lock()
s.completedFileUploadJobIDs = append(s.completedFileUploadJobIDs, job.ID)
s.lock.Unlock()
}
}
12 changes: 0 additions & 12 deletions cmd/api/src/daemons/datapipe/mocks/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions cmd/api/src/services/fileupload/file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,14 @@ func TouchFileUploadJobLastIngest(db FileUploadData, fileUploadJob model.FileUpl
return db.UpdateFileUploadJob(fileUploadJob)
}

func EndFileUploadJob(db FileUploadData, job model.FileUploadJob) (model.FileUploadJob, error) {
func EndFileUploadJob(db FileUploadData, job model.FileUploadJob) error {
job.Status = model.JobStatusIngesting

if err := db.UpdateFileUploadJob(job); err != nil {
return job, fmt.Errorf("error ending file upload job: %w", err)
} else {
return job, nil
return fmt.Errorf("error ending file upload job: %w", err)
}

return nil
}

func UpdateFileUploadJobStatus(db FileUploadData, jobID int64, status model.JobStatus, message string) error {
Expand Down

0 comments on commit 6916687

Please sign in to comment.