Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into populate-audit-log-fi…
Browse files Browse the repository at this point in the history
…elds
  • Loading branch information
superlinkx committed Jan 29, 2024
2 parents 3ac13c1 + a232198 commit fd32507
Show file tree
Hide file tree
Showing 32 changed files with 1,642 additions and 286 deletions.
63 changes: 61 additions & 2 deletions cmd/api/src/analysis/ad/adcs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func TestADCSESC3(t *testing.T) {
for _, enterpriseCA := range enterpriseCAs {
if cache.DoesCAChainProperlyToDomain(enterpriseCA, innerDomain) {
if err := ad2.PostADCSESC3(ctx, tx, outC, groupExpansions, enterpriseCA, innerDomain, cache); err != nil {
t.Logf("failed post processing for %s: %v", ad.ADCSESC1.String(), err)
t.Logf("failed post processing for %s: %v", ad.ADCSESC3.String(), err)
} else {
return nil
}
Expand Down Expand Up @@ -546,7 +546,7 @@ func TestADCSESC3(t *testing.T) {
for _, enterpriseCA := range enterpriseCAs {
if cache.DoesCAChainProperlyToDomain(enterpriseCA, innerDomain) {
if err := ad2.PostADCSESC3(ctx, tx, outC, groupExpansions, enterpriseCA, innerDomain, cache); err != nil {
t.Logf("failed post processing for %s: %v", ad.ADCSESC1.String(), err)
t.Logf("failed post processing for %s: %v", ad.ADCSESC3.String(), err)
} else {
return nil
}
Expand Down Expand Up @@ -584,6 +584,65 @@ func TestADCSESC3(t *testing.T) {
})
}

func TestADCSESC9a(t *testing.T) {
testContext := integration.NewGraphTestContext(t, graphschema.DefaultGraphSchema())
testContext.DatabaseTestWithSetup(func(harness *integration.HarnessDetails) error {
harness.ESC9AHarness.Setup(testContext)
return nil
}, func(harness integration.HarnessDetails, db graph.Database) {
operation := analysis.NewPostRelationshipOperation(context.Background(), db, "ADCS Post Process Test - ESC9a")

groupExpansions, err := ad2.ExpandAllRDPLocalGroups(context.Background(), db)
require.Nil(t, err)
enterpriseCertAuthorities, err := ad2.FetchNodesByKind(context.Background(), db, ad.EnterpriseCA)
require.Nil(t, err)
certTemplates, err := ad2.FetchNodesByKind(context.Background(), db, ad.CertTemplate)
require.Nil(t, err)
domains, err := ad2.FetchNodesByKind(context.Background(), db, ad.Domain)
require.Nil(t, err)

cache := ad2.NewADCSCache()
cache.BuildCache(context.Background(), db, enterpriseCertAuthorities, certTemplates)

for _, domain := range domains {
innerDomain := domain

operation.Operation.SubmitReader(func(ctx context.Context, tx graph.Transaction, outC chan<- analysis.CreatePostRelationshipJob) error {
if enterpriseCAs, err := ad2.FetchEnterpriseCAsTrustedForNTAuthToDomain(tx, innerDomain); err != nil {
return err
} else {
for _, enterpriseCA := range enterpriseCAs {
if cache.DoesCAChainProperlyToDomain(enterpriseCA, innerDomain) {
if err := ad2.PostADCSESC9a(ctx, tx, outC, groupExpansions, enterpriseCA, innerDomain, cache); err != nil {
t.Logf("failed post processing for %s: %v", ad.ADCSESC9a.String(), err)
} else {
return nil
}
}
}
}
return nil
})
}
operation.Done()

db.ReadTransaction(context.Background(), func(tx graph.Transaction) error {
if results, err := ops.FetchStartNodes(tx.Relationships().Filterf(func() graph.Criteria {
return query.Kind(query.Relationship(), ad.ADCSESC9a)
})); err != nil {
t.Fatalf("error fetching esc9a edges in integration test; %v", err)
} else {
assert.Equal(t, 1, len(results))

require.True(t, results.Contains(harness.ESC9AHarness.Attacker))

}
return nil
})
})

}

func TestADCSESC6a(t *testing.T) {
testContext := integration.NewGraphTestContext(t, graphschema.DefaultGraphSchema())

Expand Down
3 changes: 1 addition & 2 deletions cmd/api/src/api/v2/file_uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,9 @@ func (s Resources) EndFileUploadJob(response http.ResponseWriter, request *http.
api.HandleDatabaseError(request, response, err)
} else if fileUploadJob.Status != model.JobStatusRunning {
api.WriteErrorResponse(request.Context(), api.BuildErrorResponse(http.StatusBadRequest, "job must be in running status to end", request), response)
} else if fileUploadJob, err := fileupload.EndFileUploadJob(s.DB, fileUploadJob); err != nil {
} else if err := fileupload.EndFileUploadJob(s.DB, fileUploadJob); err != nil {
api.HandleDatabaseError(request, response, err)
} else {
s.TaskNotifier.NotifyOfFileUploadJobStatus(fileUploadJob)
response.WriteHeader(http.StatusOK)
}
}
9 changes: 4 additions & 5 deletions cmd/api/src/api/v2/file_uploads_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
// Copyright 2023 Specter Ops, Inc.
//
//
// Licensed under the Apache License, Version 2.0
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
// SPDX-License-Identifier: Apache-2.0

package v2_test
Expand Down Expand Up @@ -216,7 +216,6 @@ func TestResources_EndFileUploadJob(t *testing.T) {
Status: model.JobStatusRunning,
}, nil)
mockDB.EXPECT().UpdateFileUploadJob(gomock.Any()).Return(nil)
mockTasker.EXPECT().NotifyOfFileUploadJobStatus(gomock.Any())
},
Test: func(output apitest.Output) {
apitest.StatusCode(output, http.StatusOK)
Expand Down
90 changes: 50 additions & 40 deletions cmd/api/src/daemons/datapipe/datapipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/specterops/bloodhound/cache"
Expand All @@ -40,24 +41,19 @@ const (
)

type Tasker interface {
NotifyOfFileUploadJobStatus(task model.FileUploadJob)
RequestAnalysis()
GetStatus() model.DatapipeStatusWrapper
}

type Daemon struct {
db database.Database
graphdb graph.Database
cache cache.Cache
cfg config.Configuration
analysisRequested bool
tickInterval time.Duration
status model.DatapipeStatusWrapper
ctx context.Context
fileUploadJobIDsUnderAnalysis []int64
completedFileUploadJobIDs []int64

lock *sync.Mutex
db database.Database
graphdb graph.Database
cache cache.Cache
cfg config.Configuration
analysisRequested *atomic.Bool
tickInterval time.Duration
status model.DatapipeStatusWrapper
ctx context.Context
clearOrphanedFilesLock *sync.Mutex
}

Expand All @@ -67,14 +63,12 @@ func (s *Daemon) Name() string {

func NewDaemon(ctx context.Context, cfg config.Configuration, connections bootstrap.DatabaseConnections[*database.BloodhoundDB, *graph.DatabaseSwitch], cache cache.Cache, tickInterval time.Duration) *Daemon {
return &Daemon{
db: connections.RDMS,
graphdb: connections.Graph,
cache: cache,
cfg: cfg,
ctx: ctx,

analysisRequested: false,
lock: &sync.Mutex{},
db: connections.RDMS,
graphdb: connections.Graph,
cache: cache,
cfg: cfg,
ctx: ctx,
analysisRequested: &atomic.Bool{},
clearOrphanedFilesLock: &sync.Mutex{},
tickInterval: tickInterval,
status: model.DatapipeStatusWrapper{
Expand All @@ -93,42 +87,41 @@ func (s *Daemon) GetStatus() model.DatapipeStatusWrapper {
}

func (s *Daemon) getAnalysisRequested() bool {
s.lock.Lock()
defer s.lock.Unlock()
return s.analysisRequested
return s.analysisRequested.Load()
}

func (s *Daemon) setAnalysisRequested(requested bool) {
s.lock.Lock()
defer s.lock.Unlock()
s.analysisRequested = requested
s.analysisRequested.Store(requested)
}

func (s *Daemon) analyze() {
// Ensure that the user-requested analysis switch is flipped back to false. This is done at the beginning of the
// function so that any re-analysis requests are caught while analysis is in-progress.
s.setAnalysisRequested(false)

if s.cfg.DisableAnalysis {
return
}

s.status.Update(model.DatapipeStatusAnalyzing, false)
log.Measure(log.LevelInfo, "Starting analysis")()
log.LogAndMeasure(log.LevelInfo, "Graph Analysis")()

if err := RunAnalysisOperations(s.ctx, s.db, s.graphdb, s.cfg); err != nil {
log.Errorf("Analysis failed: %v", err)
log.Errorf("Graph analysis failed: %v", err)
s.failJobsUnderAnalysis()

s.status.Update(model.DatapipeStatusIdle, false)
} else {
s.completeJobsUnderAnalysis()

if entityPanelCachingFlag, err := s.db.GetFlagByKey(appcfg.FeatureEntityPanelCaching); err != nil {
log.Errorf("Error retrieving entity panel caching flag: %v", err)
} else {
resetCache(s.cache, entityPanelCachingFlag.Enabled)
}
s.clearJobsFromAnalysis()
log.Measure(log.LevelInfo, "Analysis run finished")()

s.status.Update(model.DatapipeStatusIdle, true)
}

s.setAnalysisRequested(false)
}

func resetCache(cacher cache.Cache, cacheEnabled bool) {
Expand All @@ -143,10 +136,22 @@ func (s *Daemon) ingestAvailableTasks() {
if ingestTasks, err := s.db.GetAllIngestTasks(); err != nil {
log.Errorf("Failed fetching available ingest tasks: %v", err)
} else {
s.processIngestTasks(ingestTasks)
s.processIngestTasks(s.ctx, ingestTasks)
}
}

func (s *Daemon) getNumJobsWaitingForAnalysis() (int, error) {
numJobsWaitingForAnalysis := 0

if fileUploadJobsUnderAnalysis, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil {
return 0, err
} else {
numJobsWaitingForAnalysis += len(fileUploadJobsUnderAnalysis)
}

return numJobsWaitingForAnalysis, nil
}

func (s *Daemon) Start() {
var (
datapipeLoopTimer = time.NewTimer(s.tickInterval)
Expand All @@ -164,15 +169,20 @@ func (s *Daemon) Start() {
s.clearOrphanedData()

case <-datapipeLoopTimer.C:
// Ingest all available ingest tasks
s.ingestAvailableTasks()

// Manage time-out state progression for file upload jobs
fileupload.ProcessStaleFileUploadJobs(s.db)

if s.numAvailableCompletedFileUploadJobs() > 0 {
s.processCompletedFileUploadJobs()
s.analyze()
} else if s.getAnalysisRequested() {
// Manage nominal state transitions for file upload jobs
s.processIngestedFileUploadJobs()

// If there are completed file upload jobs or if analysis was user-requested, perform analysis.
if numJobsWaitingForAnalysis, err := s.getNumJobsWaitingForAnalysis(); err != nil {
log.Errorf("Failed looking up jobs waiting for analysis: %v", err)
} else if numJobsWaitingForAnalysis > 0 || s.getAnalysisRequested() {
s.analyze()
} else {
s.ingestAvailableTasks()
}

datapipeLoopTimer.Reset(s.tickInterval)
Expand Down
Loading

0 comments on commit fd32507

Please sign in to comment.