From 691668700a998886ee80947ed19bf447b89da749 Mon Sep 17 00:00:00 2001 From: John Hopper Date: Wed, 24 Jan 2024 14:35:35 -0800 Subject: [PATCH] chore: remove unnecessary complexity in the data pipeline --- cmd/api/src/api/v2/file_uploads.go | 3 +- cmd/api/src/api/v2/file_uploads_test.go | 9 ++- cmd/api/src/daemons/datapipe/datapipe.go | 56 +++++++++++-------- cmd/api/src/daemons/datapipe/jobs.go | 54 +++--------------- cmd/api/src/daemons/datapipe/mocks/mock.go | 12 ---- .../src/services/fileupload/file_upload.go | 9 +-- 6 files changed, 52 insertions(+), 91 deletions(-) diff --git a/cmd/api/src/api/v2/file_uploads.go b/cmd/api/src/api/v2/file_uploads.go index 0f73c1a94a..8a9512fa2e 100644 --- a/cmd/api/src/api/v2/file_uploads.go +++ b/cmd/api/src/api/v2/file_uploads.go @@ -153,10 +153,9 @@ func (s Resources) EndFileUploadJob(response http.ResponseWriter, request *http. api.HandleDatabaseError(request, response, err) } else if fileUploadJob.Status != model.JobStatusRunning { api.WriteErrorResponse(request.Context(), api.BuildErrorResponse(http.StatusBadRequest, "job must be in running status to end", request), response) - } else if fileUploadJob, err := fileupload.EndFileUploadJob(s.DB, fileUploadJob); err != nil { + } else if err := fileupload.EndFileUploadJob(s.DB, fileUploadJob); err != nil { api.HandleDatabaseError(request, response, err) } else { - s.TaskNotifier.NotifyOfFileUploadJobStatus(fileUploadJob) response.WriteHeader(http.StatusOK) } } diff --git a/cmd/api/src/api/v2/file_uploads_test.go b/cmd/api/src/api/v2/file_uploads_test.go index 571b6bce0a..f48aca60ed 100644 --- a/cmd/api/src/api/v2/file_uploads_test.go +++ b/cmd/api/src/api/v2/file_uploads_test.go @@ -1,17 +1,17 @@ // Copyright 2023 Specter Ops, Inc. -// +// // Licensed under the Apache License, Version 2.0 // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -// +// // SPDX-License-Identifier: Apache-2.0 package v2_test @@ -216,7 +216,6 @@ func TestResources_EndFileUploadJob(t *testing.T) { Status: model.JobStatusRunning, }, nil) mockDB.EXPECT().UpdateFileUploadJob(gomock.Any()).Return(nil) - mockTasker.EXPECT().NotifyOfFileUploadJobStatus(gomock.Any()) }, Test: func(output apitest.Output) { apitest.StatusCode(output, http.StatusOK) diff --git a/cmd/api/src/daemons/datapipe/datapipe.go b/cmd/api/src/daemons/datapipe/datapipe.go index 2baf804def..ab3d4f5a0e 100644 --- a/cmd/api/src/daemons/datapipe/datapipe.go +++ b/cmd/api/src/daemons/datapipe/datapipe.go @@ -40,22 +40,20 @@ const ( ) type Tasker interface { - NotifyOfFileUploadJobStatus(task model.FileUploadJob) RequestAnalysis() GetStatus() model.DatapipeStatusWrapper } type Daemon struct { - db database.Database - graphdb graph.Database - cache cache.Cache - cfg config.Configuration - analysisRequested bool - tickInterval time.Duration - status model.DatapipeStatusWrapper - ctx context.Context - fileUploadJobIDsUnderAnalysis []int64 - completedFileUploadJobIDs []int64 + db database.Database + graphdb graph.Database + cache cache.Cache + cfg config.Configuration + analysisRequested bool + tickInterval time.Duration + status model.DatapipeStatusWrapper + ctx context.Context + fileUploadJobsUnderAnalysisByID []int64 lock *sync.Mutex clearOrphanedFilesLock *sync.Mutex @@ -104,16 +102,28 @@ func (s *Daemon) setAnalysisRequested(requested bool) { s.analysisRequested = requested } +func (s *Daemon) finishAnalysis() { + // Clear in-memory tracking of any file upload jobs that made it to this analysis run + s.fileUploadJobsUnderAnalysisByID = s.fileUploadJobsUnderAnalysisByID[:0] +} + func (s *Daemon) analyze() { + // Ensure that the user-requested analysis switch is flipped back to false. This is done at the beginning of the + // function so that any re-analysis requests are caught while analysis is in-progress. + s.setAnalysisRequested(false) + + // Make sure to clean up in-memory state on exit of this function. + defer s.finishAnalysis() + if s.cfg.DisableAnalysis { return } s.status.Update(model.DatapipeStatusAnalyzing, false) - log.Measure(log.LevelInfo, "Starting analysis")() + log.LogAndMeasure(log.LevelInfo, "Graph Analysis")() if err := RunAnalysisOperations(s.ctx, s.db, s.graphdb, s.cfg); err != nil { - log.Errorf("Analysis failed: %v", err) + log.Errorf("Graph analysis failed: %v", err) s.failJobsUnderAnalysis() s.status.Update(model.DatapipeStatusIdle, false) @@ -123,12 +133,9 @@ func (s *Daemon) analyze() { } else { resetCache(s.cache, entityPanelCachingFlag.Enabled) } - s.clearJobsFromAnalysis() - log.Measure(log.LevelInfo, "Analysis run finished")() + s.status.Update(model.DatapipeStatusIdle, true) } - - s.setAnalysisRequested(false) } func resetCache(cacher cache.Cache, cacheEnabled bool) { @@ -164,15 +171,18 @@ func (s *Daemon) Start() { s.clearOrphanedData() case <-datapipeLoopTimer.C: + // Ingest all available ingest tasks + s.ingestAvailableTasks() + + // Manage time-out state progression for file upload jobs fileupload.ProcessStaleFileUploadJobs(s.db) - if s.numAvailableCompletedFileUploadJobs() > 0 { - s.processCompletedFileUploadJobs() - s.analyze() - } else if s.getAnalysisRequested() { + // Manage nominal state transitions for file upload jobs + s.processCompletedFileUploadJobs() + + // If there are completed client or file upload jobs or if analysis was user-requested, perform analysis + if len(s.fileUploadJobsUnderAnalysisByID) > 0 || s.getAnalysisRequested() { s.analyze() - } else { - s.ingestAvailableTasks() } datapipeLoopTimer.Reset(s.tickInterval) diff --git a/cmd/api/src/daemons/datapipe/jobs.go b/cmd/api/src/daemons/datapipe/jobs.go index 908edd3a41..130ce5746c 100644 --- a/cmd/api/src/daemons/datapipe/jobs.go +++ b/cmd/api/src/daemons/datapipe/jobs.go @@ -25,56 +25,28 @@ import ( "github.com/specterops/bloodhound/src/services/fileupload" ) -func (s *Daemon) numAvailableCompletedFileUploadJobs() int { - s.lock.Lock() - defer s.lock.Unlock() - - return len(s.completedFileUploadJobIDs) -} - func (s *Daemon) failJobsUnderAnalysis() { - for _, jobID := range s.fileUploadJobIDsUnderAnalysis { + for _, jobID := range s.fileUploadJobsUnderAnalysisByID { if err := fileupload.FailFileUploadJob(s.db, jobID, "Analysis failed"); err != nil { log.Errorf("Failed updating job %d to failed status: %v", jobID, err) } } - - s.clearJobsFromAnalysis() -} - -func (s *Daemon) clearJobsFromAnalysis() { - s.lock.Lock() - s.fileUploadJobIDsUnderAnalysis = s.fileUploadJobIDsUnderAnalysis[:0] - s.lock.Unlock() } func (s *Daemon) processCompletedFileUploadJobs() { - completedJobIDs := s.getAndTransitionCompletedJobIDs() - - for _, id := range completedJobIDs { - if ingestTasks, err := s.db.GetIngestTasksForJob(id); err != nil { - log.Errorf("Failed fetching available ingest tasks: %v", err) - } else { - s.processIngestTasks(ingestTasks) - } + if completedFileUploadJobs, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusIngesting); err != nil { + log.Errorf("Failed to look up finished file upload jobs: %v", err) + } else { + for _, completedFileUploadJob := range completedFileUploadJobs { + if err := fileupload.UpdateFileUploadJobStatus(s.db, completedFileUploadJob.ID, model.JobStatusComplete, "Complete"); err != nil { + log.Errorf("Error updating fileupload job %d: %v", completedFileUploadJob.ID, err) + } - if err := fileupload.UpdateFileUploadJobStatus(s.db, id, model.JobStatusComplete, "Complete"); err != nil { - log.Errorf("Error updating fileupload job %d: %v", id, err) + s.fileUploadJobsUnderAnalysisByID = append(s.fileUploadJobsUnderAnalysisByID, completedFileUploadJob.ID) } } } -func (s *Daemon) getAndTransitionCompletedJobIDs() []int64 { - s.lock.Lock() - defer s.lock.Unlock() - - // transition completed jobs to analysis - s.fileUploadJobIDsUnderAnalysis = append(s.fileUploadJobIDsUnderAnalysis, s.completedFileUploadJobIDs...) - s.completedFileUploadJobIDs = s.completedFileUploadJobIDs[:0] - - return s.fileUploadJobIDsUnderAnalysis -} - func (s *Daemon) processIngestTasks(ingestTasks model.IngestTasks) { s.status.Update(model.DatapipeStatusIngesting, false) defer s.status.Update(model.DatapipeStatusIdle, false) @@ -105,11 +77,3 @@ func (s *Daemon) clearTask(ingestTask model.IngestTask) { log.Errorf("Error removing task from db: %v", err) } } - -func (s *Daemon) NotifyOfFileUploadJobStatus(job model.FileUploadJob) { - if job.Status == model.JobStatusIngesting { - s.lock.Lock() - s.completedFileUploadJobIDs = append(s.completedFileUploadJobIDs, job.ID) - s.lock.Unlock() - } -} diff --git a/cmd/api/src/daemons/datapipe/mocks/mock.go b/cmd/api/src/daemons/datapipe/mocks/mock.go index 8148722ec5..2c9a4d61a9 100644 --- a/cmd/api/src/daemons/datapipe/mocks/mock.go +++ b/cmd/api/src/daemons/datapipe/mocks/mock.go @@ -64,18 +64,6 @@ func (mr *MockTaskerMockRecorder) GetStatus() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStatus", reflect.TypeOf((*MockTasker)(nil).GetStatus)) } -// NotifyOfFileUploadJobStatus mocks base method. -func (m *MockTasker) NotifyOfFileUploadJobStatus(arg0 model.FileUploadJob) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "NotifyOfFileUploadJobStatus", arg0) -} - -// NotifyOfFileUploadJobStatus indicates an expected call of NotifyOfFileUploadJobStatus. -func (mr *MockTaskerMockRecorder) NotifyOfFileUploadJobStatus(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyOfFileUploadJobStatus", reflect.TypeOf((*MockTasker)(nil).NotifyOfFileUploadJobStatus), arg0) -} - // RequestAnalysis mocks base method. func (m *MockTasker) RequestAnalysis() { m.ctrl.T.Helper() diff --git a/cmd/api/src/services/fileupload/file_upload.go b/cmd/api/src/services/fileupload/file_upload.go index f1b7769cb5..09fd37be7c 100644 --- a/cmd/api/src/services/fileupload/file_upload.go +++ b/cmd/api/src/services/fileupload/file_upload.go @@ -110,13 +110,14 @@ func TouchFileUploadJobLastIngest(db FileUploadData, fileUploadJob model.FileUpl return db.UpdateFileUploadJob(fileUploadJob) } -func EndFileUploadJob(db FileUploadData, job model.FileUploadJob) (model.FileUploadJob, error) { +func EndFileUploadJob(db FileUploadData, job model.FileUploadJob) error { job.Status = model.JobStatusIngesting + if err := db.UpdateFileUploadJob(job); err != nil { - return job, fmt.Errorf("error ending file upload job: %w", err) - } else { - return job, nil + return fmt.Errorf("error ending file upload job: %w", err) } + + return nil } func UpdateFileUploadJobStatus(db FileUploadData, jobID int64, status model.JobStatus, message string) error {