From 8e4173616aeab2aa051a4e5b88eefee33d1d8e36 Mon Sep 17 00:00:00 2001 From: Wesley Robert Maffly-Kipp Date: Tue, 26 Mar 2024 15:53:18 -0700 Subject: [PATCH 01/14] added integration tests for db switch endpoints --- cmd/api/src/api/tools/pg_test.go | 119 ++++++++++++++++++++++ cmd/api/src/test/integration/harnesses.go | 18 ++++ docker-compose.testing.yml | 2 +- 3 files changed, 138 insertions(+), 1 deletion(-) create mode 100644 cmd/api/src/api/tools/pg_test.go diff --git a/cmd/api/src/api/tools/pg_test.go b/cmd/api/src/api/tools/pg_test.go new file mode 100644 index 0000000000..73e8cd1889 --- /dev/null +++ b/cmd/api/src/api/tools/pg_test.go @@ -0,0 +1,119 @@ +package tools + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/specterops/bloodhound/dawgs/drivers/neo4j" + "github.com/specterops/bloodhound/dawgs/drivers/pg" + "github.com/specterops/bloodhound/dawgs/graph" + graph_mocks "github.com/specterops/bloodhound/dawgs/graph/mocks" + "github.com/specterops/bloodhound/graphschema" + "github.com/specterops/bloodhound/src/test/integration/utils" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +// func TestPGMigrator(t *testing.T) { +// var ( +// mockCtrl = gomock.NewController(t) +// ctx = context.Background() +// schema = graphschema.DefaultGraphSchema() +// graphDB = graph_mocks.NewMockDatabase(mockCtrl) +// dbSwitch = graph.NewDatabaseSwitch(ctx, graphDB) +// ) + +// integration.SetupDB(t) + +// cfg, err := utils.LoadIntegrationTestConfig() +// require.Nil(t, err) + +// migrator := NewPGMigrator(ctx, cfg, schema, dbSwitch) + +// if err := migrator.startMigration(); err != nil { +// log.Errorf("migration failed to start with error: %w", err) +// } + +// for i := 0; i < 5; i++ { +// log.Infof("migration state: %v", migrator.state) +// time.Sleep(1000 * 1000 * 500) +// } + +// require.Nil(t, true) +// } + +func setupTestMigrator(t *testing.T, ctx context.Context) (*PGMigrator, error) { + var ( + mockCtrl = gomock.NewController(t) + schema = graphschema.DefaultGraphSchema() + graphDB = graph_mocks.NewMockDatabase(mockCtrl) + dbSwitch = graph.NewDatabaseSwitch(ctx, graphDB) + ) + + if cfg, err := utils.LoadIntegrationTestConfig(); err != nil { + return nil, err + } else { + return NewPGMigrator(ctx, cfg, schema, dbSwitch), nil + } +} + +func TestSwitchPostgreSQL(t *testing.T) { + var ( + request = httptest.NewRequest(http.MethodPut, "/graph-db/switch/pg", nil) + recorder = httptest.NewRecorder() + ctx = request.Context() + ) + + migrator, err := setupTestMigrator(t, ctx) + require.Nil(t, err) + + err = SetGraphDriver(migrator.serverCtx, migrator.cfg, neo4j.DriverName) + require.Nil(t, err) + + migrator.SwitchPostgreSQL(recorder, request) + + response := recorder.Result() + defer response.Body.Close() + + driver, err := LookupGraphDriver(migrator.serverCtx, migrator.cfg) + require.Nil(t, err) + require.Equal(t, pg.DriverName, driver) +} + +func TestSwitchNeo4j(t *testing.T) { + var ( + request = httptest.NewRequest(http.MethodPut, "/graph-db/switch/neo4j", nil) + recorder = httptest.NewRecorder() + ctx = request.Context() + ) + + migrator, err := setupTestMigrator(t, ctx) + require.Nil(t, err) + + err = SetGraphDriver(migrator.serverCtx, migrator.cfg, pg.DriverName) + require.Nil(t, err) + + migrator.SwitchNeo4j(recorder, request) + + response := recorder.Result() + defer response.Body.Close() + + driver, err := LookupGraphDriver(migrator.serverCtx, migrator.cfg) + require.Nil(t, err) + require.Equal(t, neo4j.DriverName, driver) +} + +// basic steps for runbook: +// +// 1. GET request to /pg-migration/status should return { "state": "idle" } +// +// 2. PUT request to /pg-migration/start starts the migration process +// +// 3. Poll with GET request to /pg-migration/status to see when migration has finished +// - should return { "state": "migrating" } if process has not completed yet +// - currently, errors will only surface in the API logs +// +// 4. Once migration has completed, switch db driver to postgres with PUT to /graph-db/switch/pg +// - Possible to toggle back to neo4j with PUT to /graph-db/switch/neo4j diff --git a/cmd/api/src/test/integration/harnesses.go b/cmd/api/src/test/integration/harnesses.go index 24c2d92d88..3fa62f2f31 100644 --- a/cmd/api/src/test/integration/harnesses.go +++ b/cmd/api/src/test/integration/harnesses.go @@ -6243,6 +6243,23 @@ func (s *ESC4ECA) Setup(graphTestContext *GraphTestContext) { graphTestContext.NewRelationship(s.Computer7, s.CertTemplate7, ad.GenericAll) } +type DBMigrateHarness struct { + Group1 *graph.Node + Computer1 *graph.Node + User1 *graph.Node +} + +func (s *DBMigrateHarness) Setup(graphTestContext *GraphTestContext) { + sid := RandomDomainSID() + s.Group1 = graphTestContext.NewActiveDirectoryGroup("GROUP ONE", sid) + s.Computer1 = graphTestContext.NewActiveDirectoryComputer("COMPUTER ONE", sid) + s.User1 = graphTestContext.NewActiveDirectoryUser("USER ONE", sid, false) + + graphTestContext.NewRelationship(s.Group1, s.Computer1, ad.GenericAll) + graphTestContext.NewRelationship(s.Computer1, s.User1, ad.HasSession) + graphTestContext.NewRelationship(s.User1, s.Group1, ad.MemberOf) +} + type HarnessDetails struct { RDP RDPHarness RDPB RDPHarness2 @@ -6316,4 +6333,5 @@ type HarnessDetails struct { ESC4Template3 ESC4Template3 ESC4Template4 ESC4Template4 ESC4ECA ESC4ECA + DBMigrateHarness DBMigrateHarness } diff --git a/docker-compose.testing.yml b/docker-compose.testing.yml index cc052ce963..b5a4cb9cd6 100644 --- a/docker-compose.testing.yml +++ b/docker-compose.testing.yml @@ -18,7 +18,7 @@ version: '3' services: testdb: restart: unless-stopped - image: docker.io/library/postgres:13.2 + image: docker.io/library/postgres:16.2 command: "-c log_statement=all" environment: - POSTGRES_USER=bloodhound From 68be1fbf5fd60dc3fbe2a257f89f8151bf36e87e Mon Sep 17 00:00:00 2001 From: Wesley Robert Maffly-Kipp Date: Thu, 28 Mar 2024 16:54:26 -0700 Subject: [PATCH 02/14] testing for 3 api handlers --- cmd/api/src/api/tools/pg_test.go | 159 +++++++++++++++++++------------ 1 file changed, 97 insertions(+), 62 deletions(-) diff --git a/cmd/api/src/api/tools/pg_test.go b/cmd/api/src/api/tools/pg_test.go index 73e8cd1889..0a5c8c7710 100644 --- a/cmd/api/src/api/tools/pg_test.go +++ b/cmd/api/src/api/tools/pg_test.go @@ -2,118 +2,153 @@ package tools import ( "context" + "encoding/json" "net/http" "net/http/httptest" "testing" + "time" "github.com/specterops/bloodhound/dawgs/drivers/neo4j" "github.com/specterops/bloodhound/dawgs/drivers/pg" "github.com/specterops/bloodhound/dawgs/graph" graph_mocks "github.com/specterops/bloodhound/dawgs/graph/mocks" "github.com/specterops/bloodhound/graphschema" + "github.com/specterops/bloodhound/log" + "github.com/specterops/bloodhound/src/test/integration" "github.com/specterops/bloodhound/src/test/integration/utils" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" ) -// func TestPGMigrator(t *testing.T) { -// var ( -// mockCtrl = gomock.NewController(t) -// ctx = context.Background() -// schema = graphschema.DefaultGraphSchema() -// graphDB = graph_mocks.NewMockDatabase(mockCtrl) -// dbSwitch = graph.NewDatabaseSwitch(ctx, graphDB) -// ) - -// integration.SetupDB(t) - -// cfg, err := utils.LoadIntegrationTestConfig() -// require.Nil(t, err) - -// migrator := NewPGMigrator(ctx, cfg, schema, dbSwitch) - -// if err := migrator.startMigration(); err != nil { -// log.Errorf("migration failed to start with error: %w", err) -// } - -// for i := 0; i < 5; i++ { -// log.Infof("migration state: %v", migrator.state) -// time.Sleep(1000 * 1000 * 500) -// } - -// require.Nil(t, true) -// } - -func setupTestMigrator(t *testing.T, ctx context.Context) (*PGMigrator, error) { +func TestSwitchPostgreSQL(t *testing.T) { var ( mockCtrl = gomock.NewController(t) - schema = graphschema.DefaultGraphSchema() graphDB = graph_mocks.NewMockDatabase(mockCtrl) - dbSwitch = graph.NewDatabaseSwitch(ctx, graphDB) - ) - - if cfg, err := utils.LoadIntegrationTestConfig(); err != nil { - return nil, err - } else { - return NewPGMigrator(ctx, cfg, schema, dbSwitch), nil - } -} - -func TestSwitchPostgreSQL(t *testing.T) { - var ( request = httptest.NewRequest(http.MethodPut, "/graph-db/switch/pg", nil) recorder = httptest.NewRecorder() ctx = request.Context() ) - migrator, err := setupTestMigrator(t, ctx) - require.Nil(t, err) + migrator := setupTestMigrator(t, ctx, graphDB) - err = SetGraphDriver(migrator.serverCtx, migrator.cfg, neo4j.DriverName) + // lookup sets the driver from config and creates the database_switch table if needed + driver, err := LookupGraphDriver(migrator.serverCtx, migrator.cfg) require.Nil(t, err) + // Set starting value to neo4j + if driver != neo4j.DriverName { + err = SetGraphDriver(migrator.serverCtx, migrator.cfg, neo4j.DriverName) + require.Nil(t, err) + } + migrator.SwitchPostgreSQL(recorder, request) response := recorder.Result() defer response.Body.Close() - driver, err := LookupGraphDriver(migrator.serverCtx, migrator.cfg) + driver, err = LookupGraphDriver(migrator.serverCtx, migrator.cfg) require.Nil(t, err) require.Equal(t, pg.DriverName, driver) } func TestSwitchNeo4j(t *testing.T) { var ( + mockCtrl = gomock.NewController(t) + graphDB = graph_mocks.NewMockDatabase(mockCtrl) request = httptest.NewRequest(http.MethodPut, "/graph-db/switch/neo4j", nil) recorder = httptest.NewRecorder() ctx = request.Context() ) - migrator, err := setupTestMigrator(t, ctx) - require.Nil(t, err) + migrator := setupTestMigrator(t, ctx, graphDB) - err = SetGraphDriver(migrator.serverCtx, migrator.cfg, pg.DriverName) + // lookup sets the driver from config and creates the database_switch table if needed + driver, err := LookupGraphDriver(migrator.serverCtx, migrator.cfg) require.Nil(t, err) + // Set starting value to pg + if driver != pg.DriverName { + err = SetGraphDriver(migrator.serverCtx, migrator.cfg, pg.DriverName) + require.Nil(t, err) + } + migrator.SwitchNeo4j(recorder, request) response := recorder.Result() defer response.Body.Close() - driver, err := LookupGraphDriver(migrator.serverCtx, migrator.cfg) + driver, err = LookupGraphDriver(migrator.serverCtx, migrator.cfg) require.Nil(t, err) require.Equal(t, neo4j.DriverName, driver) } -// basic steps for runbook: -// -// 1. GET request to /pg-migration/status should return { "state": "idle" } -// -// 2. PUT request to /pg-migration/start starts the migration process -// -// 3. Poll with GET request to /pg-migration/status to see when migration has finished -// - should return { "state": "migrating" } if process has not completed yet -// - currently, errors will only surface in the API logs -// -// 4. Once migration has completed, switch db driver to postgres with PUT to /graph-db/switch/pg -// - Possible to toggle back to neo4j with PUT to /graph-db/switch/neo4j +func TestPGMigrator(t *testing.T) { + testContext := integration.NewGraphTestContext(t, graphschema.DefaultGraphSchema()) + integration.SetupDB(t) + + testContext.DatabaseTestWithSetup(func(harness *integration.HarnessDetails) error { + harness.DBMigrateHarness.Setup(testContext) + return nil + }, func(harness integration.HarnessDetails, graphDB graph.Database) { + var ( + request = httptest.NewRequest(http.MethodPut, "/pg-migrate/start", nil) + recorder = httptest.NewRecorder() + migrator = setupTestMigrator(t, testContext.Context(), graphDB) + ) + + // Start migration process + migrator.MigrationStart(recorder, request) + response := recorder.Result() + defer response.Body.Close() + + require.Equal(t, http.StatusAccepted, response.StatusCode) + + // Poll migration status handler until we see an "idle" status + for { + if migratorState := checkMigrationStatus(t, migrator); migratorState == stateMigrating { + log.Infof("Migration in progress, waiting 1 second...") + time.Sleep(1000 * 1000 * 100) + } else if migratorState == stateIdle { + break + } else { + t.Fatalf("Encountered invalid migration status: %s", migratorState) + } + } + + // TODO: validate nodes/edges/types in pg + }) +} + +func checkMigrationStatus(t *testing.T, migrator *PGMigrator) MigratorState { + var ( + request = httptest.NewRequest(http.MethodGet, "/pg-migrate/status", nil) + recorder = httptest.NewRecorder() + body struct{ State MigratorState } + ) + + migrator.MigrationStatus(recorder, request) + + response := recorder.Result() + defer response.Body.Close() + + require.Equal(t, http.StatusOK, response.StatusCode) + + if err := json.NewDecoder(response.Body).Decode(&body); err != nil { + t.Fatal("failed to decode json") + } + require.NotEmpty(t, body.State) + + return body.State +} + +func setupTestMigrator(t *testing.T, ctx context.Context, graphDB graph.Database) *PGMigrator { + var ( + schema = graphschema.DefaultGraphSchema() + dbSwitch = graph.NewDatabaseSwitch(ctx, graphDB) + ) + + cfg, err := utils.LoadIntegrationTestConfig() + require.Nil(t, err) + + return NewPGMigrator(ctx, cfg, schema, dbSwitch) +} From 44ce2a08a51e60277940fbcad07e83c59ca61aa6 Mon Sep 17 00:00:00 2001 From: Wesley Robert Maffly-Kipp Date: Mon, 1 Apr 2024 12:38:50 -0700 Subject: [PATCH 03/14] added cancellation test --- cmd/api/src/api/tools/pg_test.go | 66 +++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 13 deletions(-) diff --git a/cmd/api/src/api/tools/pg_test.go b/cmd/api/src/api/tools/pg_test.go index 0a5c8c7710..9c1581d292 100644 --- a/cmd/api/src/api/tools/pg_test.go +++ b/cmd/api/src/api/tools/pg_test.go @@ -8,10 +8,12 @@ import ( "testing" "time" + "github.com/specterops/bloodhound/dawgs" "github.com/specterops/bloodhound/dawgs/drivers/neo4j" "github.com/specterops/bloodhound/dawgs/drivers/pg" "github.com/specterops/bloodhound/dawgs/graph" graph_mocks "github.com/specterops/bloodhound/dawgs/graph/mocks" + "github.com/specterops/bloodhound/dawgs/util/size" "github.com/specterops/bloodhound/graphschema" "github.com/specterops/bloodhound/log" "github.com/specterops/bloodhound/src/test/integration" @@ -27,15 +29,13 @@ func TestSwitchPostgreSQL(t *testing.T) { request = httptest.NewRequest(http.MethodPut, "/graph-db/switch/pg", nil) recorder = httptest.NewRecorder() ctx = request.Context() + migrator = setupTestMigrator(t, ctx, graphDB) ) - migrator := setupTestMigrator(t, ctx, graphDB) - - // lookup sets the driver from config and creates the database_switch table if needed + // lookup creates the database_switch table if needed driver, err := LookupGraphDriver(migrator.serverCtx, migrator.cfg) require.Nil(t, err) - // Set starting value to neo4j if driver != neo4j.DriverName { err = SetGraphDriver(migrator.serverCtx, migrator.cfg, neo4j.DriverName) require.Nil(t, err) @@ -58,15 +58,12 @@ func TestSwitchNeo4j(t *testing.T) { request = httptest.NewRequest(http.MethodPut, "/graph-db/switch/neo4j", nil) recorder = httptest.NewRecorder() ctx = request.Context() + migrator = setupTestMigrator(t, ctx, graphDB) ) - migrator := setupTestMigrator(t, ctx, graphDB) - - // lookup sets the driver from config and creates the database_switch table if needed driver, err := LookupGraphDriver(migrator.serverCtx, migrator.cfg) require.Nil(t, err) - // Set starting value to pg if driver != pg.DriverName { err = SetGraphDriver(migrator.serverCtx, migrator.cfg, pg.DriverName) require.Nil(t, err) @@ -82,6 +79,30 @@ func TestSwitchNeo4j(t *testing.T) { require.Equal(t, neo4j.DriverName, driver) } +func TestCancelMigration(t *testing.T) { + var ( + mockCtrl = gomock.NewController(t) + graphDB = graph_mocks.NewMockDatabase(mockCtrl) + request = httptest.NewRequest(http.MethodPut, "/pg-migrate/cancel", nil) + recorder = httptest.NewRecorder() + ctx, cancelFunc = context.WithCancel(request.Context()) + migrator = setupTestMigrator(t, ctx, graphDB) + ) + + // This seems kinda hacky + migrator.migrationCancelFunc = cancelFunc + migrator.advanceState(stateMigrating, stateIdle) + + migrator.MigrationCancel(recorder, request) + + response := recorder.Result() + defer response.Body.Close() + + require.Equal(t, http.StatusAccepted, response.StatusCode) + + require.Nil(t, true) +} + func TestPGMigrator(t *testing.T) { testContext := integration.NewGraphTestContext(t, graphschema.DefaultGraphSchema()) integration.SetupDB(t) @@ -89,11 +110,11 @@ func TestPGMigrator(t *testing.T) { testContext.DatabaseTestWithSetup(func(harness *integration.HarnessDetails) error { harness.DBMigrateHarness.Setup(testContext) return nil - }, func(harness integration.HarnessDetails, graphDB graph.Database) { + }, func(harness integration.HarnessDetails, neo4jDB graph.Database) { var ( request = httptest.NewRequest(http.MethodPut, "/pg-migrate/start", nil) recorder = httptest.NewRecorder() - migrator = setupTestMigrator(t, testContext.Context(), graphDB) + migrator = setupTestMigrator(t, testContext.Context(), neo4jDB) ) // Start migration process @@ -106,8 +127,8 @@ func TestPGMigrator(t *testing.T) { // Poll migration status handler until we see an "idle" status for { if migratorState := checkMigrationStatus(t, migrator); migratorState == stateMigrating { - log.Infof("Migration in progress, waiting 1 second...") - time.Sleep(1000 * 1000 * 100) + log.Infof("Migration in progress, waiting...") + time.Sleep(1000 * 1000 * 100) // 1/10th of a second } else if migratorState == stateIdle { break } else { @@ -115,7 +136,26 @@ func TestPGMigrator(t *testing.T) { } } - // TODO: validate nodes/edges/types in pg + // WIP: validate nodes/edges/types in pg + + pgDB, err := dawgs.Open(migrator.serverCtx, pg.DriverName, dawgs.Config{ + TraversalMemoryLimit: size.Gibibyte, + DriverCfg: migrator.cfg.Database.PostgreSQLConnectionString(), + }) + require.Nil(t, err) + + pgDB.ReadTransaction(migrator.serverCtx, func(tx graph.Transaction) error { + return tx.Nodes().Fetch(func(cursor graph.Cursor[*graph.Node]) error { + for next := range cursor.Chan() { + log.Infof("confirming node: %+v", next) + return nil + } + + return cursor.Error() + }) + + }) + require.Nil(t, true) }) } From fedffcc913a8240fd2a8bdc64311d55208b3f63b Mon Sep 17 00:00:00 2001 From: Wesley Robert Maffly-Kipp Date: Wed, 24 Apr 2024 11:51:46 -0700 Subject: [PATCH 04/14] updated tests, minor refactor of migrator --- cmd/api/src/api/tools/pg.go | 152 ++++++++++--------- cmd/api/src/api/tools/pg_test.go | 169 +++++++++++----------- cmd/api/src/test/integration/harnesses.go | 44 ++++-- 3 files changed, 206 insertions(+), 159 deletions(-) diff --git a/cmd/api/src/api/tools/pg.go b/cmd/api/src/api/tools/pg.go index 3a0a4da0fe..fe3dd650bc 100644 --- a/cmd/api/src/api/tools/pg.go +++ b/cmd/api/src/api/tools/pg.go @@ -19,6 +19,9 @@ package tools import ( "context" "fmt" + "net/http" + "sync" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/dbtype" "github.com/specterops/bloodhound/dawgs" "github.com/specterops/bloodhound/dawgs/drivers/neo4j" @@ -28,16 +31,14 @@ import ( "github.com/specterops/bloodhound/log" "github.com/specterops/bloodhound/src/api" "github.com/specterops/bloodhound/src/config" - "net/http" - "sync" ) type MigratorState string const ( - stateIdle MigratorState = "idle" - stateMigrating MigratorState = "migrating" - stateCanceling MigratorState = "canceling" + StateIdle MigratorState = "idle" + StateMigrating MigratorState = "migrating" + StateCanceling MigratorState = "canceling" ) func migrateTypes(ctx context.Context, neoDB, pgDB graph.Database) error { @@ -49,42 +50,59 @@ func migrateTypes(ctx context.Context, neoDB, pgDB graph.Database) error { ) if err := neoDB.ReadTransaction(ctx, func(tx graph.Transaction) error { - var ( - nextKindStr string - result = tx.Raw("call db.labels();", nil) - ) + if nodeKinds, err := GetNeo4jNodeKinds(ctx, tx); err != nil { + return err + } else if edgeKinds, err := GetNeo4jEdgeKinds(ctx, tx); err != nil { + return err + } else { + neoNodeKinds = nodeKinds + neoEdgeKinds = edgeKinds + return nil + } + }); err != nil { + return err + } - for result.Next() { - if err := result.Scan(&nextKindStr); err != nil { - return err - } + return pgDB.WriteTransaction(ctx, func(tx graph.Transaction) error { + _, err := pgDB.(*pg.Driver).KindMapper().AssertKinds(tx, append(neoNodeKinds, neoEdgeKinds...)) + return err + }) +} - neoNodeKinds = append(neoNodeKinds, graph.StringKind(nextKindStr)) - } +func GetNeo4jNodeKinds(ctx context.Context, tx graph.Transaction) (graph.Kinds, error) { + var ( + nextKindStr string + kinds graph.Kinds + result = tx.Raw("call db.labels();", nil) + ) - if err := result.Error(); err != nil { - return err + for result.Next() { + if err := result.Scan(&nextKindStr); err != nil { + return nil, err } - result = tx.Raw("call db.relationshipTypes();", nil) + kinds = append(kinds, graph.StringKind(nextKindStr)) + } - for result.Next() { - if err := result.Scan(&nextKindStr); err != nil { - return err - } + return kinds, result.Error() +} - neoEdgeKinds = append(neoEdgeKinds, graph.StringKind(nextKindStr)) +func GetNeo4jEdgeKinds(ctx context.Context, tx graph.Transaction) (graph.Kinds, error) { + var ( + nextKindStr string + kinds graph.Kinds + result = tx.Raw("call db.relationshipTypes();", nil) + ) + + for result.Next() { + if err := result.Scan(&nextKindStr); err != nil { + return nil, err } - return nil - }); err != nil { - return err + kinds = append(kinds, graph.StringKind(nextKindStr)) } - return pgDB.WriteTransaction(ctx, func(tx graph.Transaction) error { - _, err := pgDB.(*pg.Driver).KindMapper().AssertKinds(tx, append(neoNodeKinds, neoEdgeKinds...)) - return err - }) + return kinds, result.Error() } func convertNeo4jProperties(properties *graph.Properties) error { @@ -188,21 +206,21 @@ func migrateEdges(ctx context.Context, neoDB, pgDB graph.Database, nodeIDMapping type PGMigrator struct { graphSchema graph.Schema graphDBSwitch *graph.DatabaseSwitch - serverCtx context.Context + ServerCtx context.Context migrationCancelFunc func() - state MigratorState + State MigratorState lock *sync.Mutex - cfg config.Configuration + Cfg config.Configuration } func NewPGMigrator(serverCtx context.Context, cfg config.Configuration, graphSchema graph.Schema, graphDBSwitch *graph.DatabaseSwitch) *PGMigrator { return &PGMigrator{ graphSchema: graphSchema, graphDBSwitch: graphDBSwitch, - serverCtx: serverCtx, - state: stateIdle, + ServerCtx: serverCtx, + State: StateIdle, lock: &sync.Mutex{}, - cfg: cfg, + Cfg: cfg, } } @@ -213,29 +231,26 @@ func (s *PGMigrator) advanceState(next MigratorState, validTransitions ...Migrat isValid := false for _, validTransition := range validTransitions { - if s.state == validTransition { + if s.State == validTransition { isValid = true break } } if !isValid { - return fmt.Errorf("migrator state is %s but expected one of: %v", s.state, validTransitions) + return fmt.Errorf("migrator state is %s but expected one of: %v", s.State, validTransitions) } - s.state = next + s.State = next return nil } func (s *PGMigrator) SwitchPostgreSQL(response http.ResponseWriter, request *http.Request) { - if pgDB, err := dawgs.Open(s.serverCtx, pg.DriverName, dawgs.Config{ - TraversalMemoryLimit: size.Gibibyte, - DriverCfg: s.cfg.Database.PostgreSQLConnectionString(), - }); err != nil { + if pgDB, err := s.OpenPostgresGraphConnection(); err != nil { api.WriteJSONResponse(request.Context(), map[string]any{ "error": fmt.Errorf("failed connecting to PostgreSQL: %w", err), }, http.StatusInternalServerError, response) - } else if err := SetGraphDriver(request.Context(), s.cfg, pg.DriverName); err != nil { + } else if err := SetGraphDriver(request.Context(), s.Cfg, pg.DriverName); err != nil { api.WriteJSONResponse(request.Context(), map[string]any{ "error": fmt.Errorf("failed updating graph database driver preferences: %w", err), }, http.StatusInternalServerError, response) @@ -248,14 +263,11 @@ func (s *PGMigrator) SwitchPostgreSQL(response http.ResponseWriter, request *htt } func (s *PGMigrator) SwitchNeo4j(response http.ResponseWriter, request *http.Request) { - if neo4jDB, err := dawgs.Open(s.serverCtx, neo4j.DriverName, dawgs.Config{ - TraversalMemoryLimit: size.Gibibyte, - DriverCfg: s.cfg.Neo4J.Neo4jConnectionString(), - }); err != nil { + if neo4jDB, err := s.OpenNeo4jGraphConnection(); err != nil { api.WriteJSONResponse(request.Context(), map[string]any{ "error": fmt.Errorf("failed connecting to Neo4j: %w", err), }, http.StatusInternalServerError, response) - } else if err := SetGraphDriver(request.Context(), s.cfg, neo4j.DriverName); err != nil { + } else if err := SetGraphDriver(request.Context(), s.Cfg, neo4j.DriverName); err != nil { api.WriteJSONResponse(request.Context(), map[string]any{ "error": fmt.Errorf("failed updating graph database driver preferences: %w", err), }, http.StatusInternalServerError, response) @@ -267,23 +279,17 @@ func (s *PGMigrator) SwitchNeo4j(response http.ResponseWriter, request *http.Req } } -func (s *PGMigrator) startMigration() error { - if err := s.advanceState(stateMigrating, stateIdle); err != nil { +func (s *PGMigrator) StartMigration() error { + if err := s.advanceState(StateMigrating, StateIdle); err != nil { return fmt.Errorf("database migration state error: %w", err) - } else if neo4jDB, err := dawgs.Open(s.serverCtx, neo4j.DriverName, dawgs.Config{ - TraversalMemoryLimit: size.Gibibyte, - DriverCfg: s.cfg.Neo4J.Neo4jConnectionString(), - }); err != nil { + } else if neo4jDB, err := s.OpenNeo4jGraphConnection(); err != nil { return fmt.Errorf("failed connecting to Neo4j: %w", err) - } else if pgDB, err := dawgs.Open(s.serverCtx, pg.DriverName, dawgs.Config{ - TraversalMemoryLimit: size.Gibibyte, - DriverCfg: s.cfg.Database.PostgreSQLConnectionString(), - }); err != nil { + } else if pgDB, err := s.OpenPostgresGraphConnection(); err != nil { return fmt.Errorf("failed connecting to PostgreSQL: %w", err) } else { log.Infof("Dispatching live migration from Neo4j to PostgreSQL") - migrationCtx, migrationCancelFunc := context.WithCancel(s.serverCtx) + migrationCtx, migrationCancelFunc := context.WithCancel(s.ServerCtx) s.migrationCancelFunc = migrationCancelFunc go func(ctx context.Context) { @@ -303,7 +309,7 @@ func (s *PGMigrator) startMigration() error { log.Infof("Migration to PostgreSQL completed successfully") } - if err := s.advanceState(stateIdle, stateMigrating, stateCanceling); err != nil { + if err := s.advanceState(StateIdle, StateMigrating, StateCanceling); err != nil { log.Errorf("Database migration state management error: %v", err) } }(migrationCtx) @@ -313,7 +319,7 @@ func (s *PGMigrator) startMigration() error { } func (s *PGMigrator) MigrationStart(response http.ResponseWriter, request *http.Request) { - if err := s.startMigration(); err != nil { + if err := s.StartMigration(); err != nil { api.WriteJSONResponse(request.Context(), map[string]any{ "error": err.Error(), }, http.StatusInternalServerError, response) @@ -322,8 +328,8 @@ func (s *PGMigrator) MigrationStart(response http.ResponseWriter, request *http. } } -func (s *PGMigrator) cancelMigration() error { - if err := s.advanceState(stateCanceling, stateMigrating); err != nil { +func (s *PGMigrator) CancelMigration() error { + if err := s.advanceState(StateCanceling, StateMigrating); err != nil { return err } @@ -333,7 +339,7 @@ func (s *PGMigrator) cancelMigration() error { } func (s *PGMigrator) MigrationCancel(response http.ResponseWriter, request *http.Request) { - if err := s.cancelMigration(); err != nil { + if err := s.CancelMigration(); err != nil { api.WriteJSONResponse(request.Context(), map[string]any{ "error": err.Error(), }, http.StatusInternalServerError, response) @@ -344,6 +350,20 @@ func (s *PGMigrator) MigrationCancel(response http.ResponseWriter, request *http func (s *PGMigrator) MigrationStatus(response http.ResponseWriter, request *http.Request) { api.WriteJSONResponse(request.Context(), map[string]any{ - "state": s.state, + "state": s.State, }, http.StatusOK, response) } + +func (s *PGMigrator) OpenPostgresGraphConnection() (graph.Database, error) { + return dawgs.Open(s.ServerCtx, pg.DriverName, dawgs.Config{ + TraversalMemoryLimit: size.Gibibyte, + DriverCfg: s.Cfg.Database.PostgreSQLConnectionString(), + }) +} + +func (s *PGMigrator) OpenNeo4jGraphConnection() (graph.Database, error) { + return dawgs.Open(s.ServerCtx, neo4j.DriverName, dawgs.Config{ + TraversalMemoryLimit: size.Gibibyte, + DriverCfg: s.Cfg.Neo4J.Neo4jConnectionString(), + }) +} diff --git a/cmd/api/src/api/tools/pg_test.go b/cmd/api/src/api/tools/pg_test.go index 9c1581d292..c2b3702bb5 100644 --- a/cmd/api/src/api/tools/pg_test.go +++ b/cmd/api/src/api/tools/pg_test.go @@ -1,21 +1,22 @@ -package tools +package tools_test import ( "context" - "encoding/json" "net/http" "net/http/httptest" "testing" "time" - "github.com/specterops/bloodhound/dawgs" "github.com/specterops/bloodhound/dawgs/drivers/neo4j" "github.com/specterops/bloodhound/dawgs/drivers/pg" + pg_query "github.com/specterops/bloodhound/dawgs/drivers/pg/query" "github.com/specterops/bloodhound/dawgs/graph" graph_mocks "github.com/specterops/bloodhound/dawgs/graph/mocks" - "github.com/specterops/bloodhound/dawgs/util/size" + "github.com/specterops/bloodhound/dawgs/ops" + "github.com/specterops/bloodhound/dawgs/query" "github.com/specterops/bloodhound/graphschema" - "github.com/specterops/bloodhound/log" + "github.com/specterops/bloodhound/graphschema/common" + "github.com/specterops/bloodhound/src/api/tools" "github.com/specterops/bloodhound/src/test/integration" "github.com/specterops/bloodhound/src/test/integration/utils" "github.com/stretchr/testify/require" @@ -33,11 +34,11 @@ func TestSwitchPostgreSQL(t *testing.T) { ) // lookup creates the database_switch table if needed - driver, err := LookupGraphDriver(migrator.serverCtx, migrator.cfg) + driver, err := tools.LookupGraphDriver(migrator.ServerCtx, migrator.Cfg) require.Nil(t, err) if driver != neo4j.DriverName { - err = SetGraphDriver(migrator.serverCtx, migrator.cfg, neo4j.DriverName) + err = tools.SetGraphDriver(migrator.ServerCtx, migrator.Cfg, neo4j.DriverName) require.Nil(t, err) } @@ -46,7 +47,9 @@ func TestSwitchPostgreSQL(t *testing.T) { response := recorder.Result() defer response.Body.Close() - driver, err = LookupGraphDriver(migrator.serverCtx, migrator.cfg) + require.Equal(t, http.StatusOK, response.StatusCode) + + driver, err = tools.LookupGraphDriver(migrator.ServerCtx, migrator.Cfg) require.Nil(t, err) require.Equal(t, pg.DriverName, driver) } @@ -61,11 +64,11 @@ func TestSwitchNeo4j(t *testing.T) { migrator = setupTestMigrator(t, ctx, graphDB) ) - driver, err := LookupGraphDriver(migrator.serverCtx, migrator.cfg) + driver, err := tools.LookupGraphDriver(migrator.ServerCtx, migrator.Cfg) require.Nil(t, err) if driver != pg.DriverName { - err = SetGraphDriver(migrator.serverCtx, migrator.cfg, pg.DriverName) + err = tools.SetGraphDriver(migrator.ServerCtx, migrator.Cfg, pg.DriverName) require.Nil(t, err) } @@ -74,35 +77,13 @@ func TestSwitchNeo4j(t *testing.T) { response := recorder.Result() defer response.Body.Close() - driver, err = LookupGraphDriver(migrator.serverCtx, migrator.cfg) + require.Equal(t, http.StatusOK, response.StatusCode) + + driver, err = tools.LookupGraphDriver(migrator.ServerCtx, migrator.Cfg) require.Nil(t, err) require.Equal(t, neo4j.DriverName, driver) } -func TestCancelMigration(t *testing.T) { - var ( - mockCtrl = gomock.NewController(t) - graphDB = graph_mocks.NewMockDatabase(mockCtrl) - request = httptest.NewRequest(http.MethodPut, "/pg-migrate/cancel", nil) - recorder = httptest.NewRecorder() - ctx, cancelFunc = context.WithCancel(request.Context()) - migrator = setupTestMigrator(t, ctx, graphDB) - ) - - // This seems kinda hacky - migrator.migrationCancelFunc = cancelFunc - migrator.advanceState(stateMigrating, stateIdle) - - migrator.MigrationCancel(recorder, request) - - response := recorder.Result() - defer response.Body.Close() - - require.Equal(t, http.StatusAccepted, response.StatusCode) - - require.Nil(t, true) -} - func TestPGMigrator(t *testing.T) { testContext := integration.NewGraphTestContext(t, graphschema.DefaultGraphSchema()) integration.SetupDB(t) @@ -112,76 +93,96 @@ func TestPGMigrator(t *testing.T) { return nil }, func(harness integration.HarnessDetails, neo4jDB graph.Database) { var ( - request = httptest.NewRequest(http.MethodPut, "/pg-migrate/start", nil) - recorder = httptest.NewRecorder() - migrator = setupTestMigrator(t, testContext.Context(), neo4jDB) + migrator = setupTestMigrator(t, testContext.Context(), neo4jDB) + testID = harness.DBMigrateHarness.TestID.String() + sourceNodes []*graph.Node + sourceRels []*graph.Relationship + neoNodeKinds graph.Kinds + neoEdgeKinds graph.Kinds + err error ) - // Start migration process - migrator.MigrationStart(recorder, request) - response := recorder.Result() - defer response.Body.Close() - - require.Equal(t, http.StatusAccepted, response.StatusCode) + migrator.StartMigration() - // Poll migration status handler until we see an "idle" status + // wait until migration status returns to "idle" for { - if migratorState := checkMigrationStatus(t, migrator); migratorState == stateMigrating { - log.Infof("Migration in progress, waiting...") - time.Sleep(1000 * 1000 * 100) // 1/10th of a second - } else if migratorState == stateIdle { + if migrator.State == tools.StateMigrating { + time.Sleep(time.Second / 10) + } else if migrator.State == tools.StateIdle { break } else { - t.Fatalf("Encountered invalid migration status: %s", migratorState) + t.Fatalf("Encountered invalid migration status: %s", migrator.State) } } - // WIP: validate nodes/edges/types in pg + // query nodes/relationships/types in neo4j + neo4jDB.ReadTransaction(testContext.Context(), func(tx graph.Transaction) error { + sourceNodes, err = ops.FetchNodes(tx.Nodes()) + require.Nil(t, err) - pgDB, err := dawgs.Open(migrator.serverCtx, pg.DriverName, dawgs.Config{ - TraversalMemoryLimit: size.Gibibyte, - DriverCfg: migrator.cfg.Database.PostgreSQLConnectionString(), - }) - require.Nil(t, err) + sourceRels, err = ops.FetchRelationships(tx.Relationships()) + require.Nil(t, err) - pgDB.ReadTransaction(migrator.serverCtx, func(tx graph.Transaction) error { - return tx.Nodes().Fetch(func(cursor graph.Cursor[*graph.Node]) error { - for next := range cursor.Chan() { - log.Infof("confirming node: %+v", next) - return nil - } + neoNodeKinds, err = tools.GetNeo4jNodeKinds(testContext.Context(), tx) + require.Nil(t, err) - return cursor.Error() - }) + neoEdgeKinds, err = tools.GetNeo4jEdgeKinds(testContext.Context(), tx) + require.Nil(t, err) + return nil }) - require.Nil(t, true) - }) -} -func checkMigrationStatus(t *testing.T, migrator *PGMigrator) MigratorState { - var ( - request = httptest.NewRequest(http.MethodGet, "/pg-migrate/status", nil) - recorder = httptest.NewRecorder() - body struct{ State MigratorState } - ) + // get reference to pg graph db + pgDB, err := migrator.OpenPostgresGraphConnection() + require.Nil(t, err) - migrator.MigrationStatus(recorder, request) + // confirm that all the data from neo4j made it to pg + pgDB.ReadTransaction(testContext.Context(), func(tx graph.Transaction) error { + + // check nodes + for _, sourceNode := range sourceNodes { + id, err := sourceNode.Properties.Get(testID).String() + require.Nil(t, err) + + if targetNode, err := tx.Nodes().Filterf(func() graph.Criteria { + return query.Equals(query.NodeProperty(testID), id) + }).First(); err != nil { + t.Fatalf("Could not find migrated node with '%s' == %s", testID, id) + } else { + require.Equal(t, sourceNode.Kinds, targetNode.Kinds) + require.Equal(t, sourceNode.Properties.Get(common.Name.String()), targetNode.Properties.Get(common.Name.String())) + require.Equal(t, sourceNode.Properties.Get(common.ObjectID.String()), targetNode.Properties.Get(common.ObjectID.String())) + } + } - response := recorder.Result() - defer response.Body.Close() + // check edges + for _, sourceRel := range sourceRels { + id, err := sourceRel.Properties.Get(testID).String() + require.Nil(t, err) + + if targetRel, err := tx.Relationships().Filterf(func() graph.Criteria { + return query.Equals(query.RelationshipProperty(testID), id) + }).First(); err != nil { + t.Fatalf("Could not find migrated relationship with '%s' == %s", testID, id) + } else { + require.Equal(t, sourceRel.Kind, targetRel.Kind) + } + } - require.Equal(t, http.StatusOK, response.StatusCode) + // check kinds + targetKinds, err := pg_query.On(tx).SelectKinds() + require.Nil(t, err) - if err := json.NewDecoder(response.Body).Decode(&body); err != nil { - t.Fatal("failed to decode json") - } - require.NotEmpty(t, body.State) + for _, kind := range append(neoNodeKinds, neoEdgeKinds...) { + require.NotNil(t, targetKinds[kind]) + } - return body.State + return nil + }) + }) } -func setupTestMigrator(t *testing.T, ctx context.Context, graphDB graph.Database) *PGMigrator { +func setupTestMigrator(t *testing.T, ctx context.Context, graphDB graph.Database) *tools.PGMigrator { var ( schema = graphschema.DefaultGraphSchema() dbSwitch = graph.NewDatabaseSwitch(ctx, graphDB) @@ -190,5 +191,5 @@ func setupTestMigrator(t *testing.T, ctx context.Context, graphDB graph.Database cfg, err := utils.LoadIntegrationTestConfig() require.Nil(t, err) - return NewPGMigrator(ctx, cfg, schema, dbSwitch) + return tools.NewPGMigrator(ctx, cfg, schema, dbSwitch) } diff --git a/cmd/api/src/test/integration/harnesses.go b/cmd/api/src/test/integration/harnesses.go index 84a74a352f..5ca9bea76a 100644 --- a/cmd/api/src/test/integration/harnesses.go +++ b/cmd/api/src/test/integration/harnesses.go @@ -6280,21 +6280,47 @@ func (s *ESC4ECA) Setup(graphTestContext *GraphTestContext) { graphTestContext.NewRelationship(s.Computer7, s.CertTemplate7, ad.GenericAll) } +// Use this alias to set our custom test property in the harness +type Property string + +func (p Property) String() string { + return string(p) +} + type DBMigrateHarness struct { - Group1 *graph.Node - Computer1 *graph.Node - User1 *graph.Node + Group1 *graph.Node + Computer1 *graph.Node + User1 *graph.Node + GenericAll1 *graph.Relationship + HasSession1 *graph.Relationship + MemberOf1 *graph.Relationship + TestID Property } func (s *DBMigrateHarness) Setup(graphTestContext *GraphTestContext) { sid := RandomDomainSID() - s.Group1 = graphTestContext.NewActiveDirectoryGroup("GROUP ONE", sid) - s.Computer1 = graphTestContext.NewActiveDirectoryComputer("COMPUTER ONE", sid) - s.User1 = graphTestContext.NewActiveDirectoryUser("USER ONE", sid, false) + s.TestID = "testing_id" - graphTestContext.NewRelationship(s.Group1, s.Computer1, ad.GenericAll) - graphTestContext.NewRelationship(s.Computer1, s.User1, ad.HasSession) - graphTestContext.NewRelationship(s.User1, s.Group1, ad.MemberOf) + s.Group1 = graphTestContext.NewActiveDirectoryGroup("Group1", sid) + s.Computer1 = graphTestContext.NewActiveDirectoryComputer("Computer1", sid) + s.User1 = graphTestContext.NewActiveDirectoryUser("User1", sid, false) + s.Group1.Properties.Set(s.TestID.String(), RandomObjectID(graphTestContext.testCtx)) + s.Computer1.Properties.Set(s.TestID.String(), RandomObjectID(graphTestContext.testCtx)) + s.User1.Properties.Set(s.TestID.String(), RandomObjectID(graphTestContext.testCtx)) + // Log our property values next + graphTestContext.UpdateNode(s.Group1) + graphTestContext.UpdateNode(s.Computer1) + graphTestContext.UpdateNode(s.User1) + + s.GenericAll1 = graphTestContext.NewRelationship(s.Group1, s.Computer1, ad.GenericAll, graph.AsProperties(graph.PropertyMap{ + s.TestID: RandomObjectID(graphTestContext.testCtx), + })) + s.HasSession1 = graphTestContext.NewRelationship(s.Computer1, s.User1, ad.HasSession, graph.AsProperties(graph.PropertyMap{ + s.TestID: RandomObjectID(graphTestContext.testCtx), + })) + s.MemberOf1 = graphTestContext.NewRelationship(s.User1, s.Group1, ad.MemberOf, graph.AsProperties(graph.PropertyMap{ + s.TestID: RandomObjectID(graphTestContext.testCtx), + })) } type HarnessDetails struct { From 0a0e5b967a9354a248ad79f67d57151814ad7373 Mon Sep 17 00:00:00 2001 From: Wesley Robert Maffly-Kipp Date: Wed, 24 Apr 2024 12:08:33 -0700 Subject: [PATCH 05/14] update variable names --- cmd/api/src/api/tools/pg_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/cmd/api/src/api/tools/pg_test.go b/cmd/api/src/api/tools/pg_test.go index c2b3702bb5..eaf101125c 100644 --- a/cmd/api/src/api/tools/pg_test.go +++ b/cmd/api/src/api/tools/pg_test.go @@ -93,13 +93,13 @@ func TestPGMigrator(t *testing.T) { return nil }, func(harness integration.HarnessDetails, neo4jDB graph.Database) { var ( - migrator = setupTestMigrator(t, testContext.Context(), neo4jDB) - testID = harness.DBMigrateHarness.TestID.String() - sourceNodes []*graph.Node - sourceRels []*graph.Relationship - neoNodeKinds graph.Kinds - neoEdgeKinds graph.Kinds - err error + migrator = setupTestMigrator(t, testContext.Context(), neo4jDB) + testID = harness.DBMigrateHarness.TestID.String() + sourceNodes []*graph.Node + sourceRels []*graph.Relationship + sourceNodeKinds graph.Kinds + sourceEdgeKinds graph.Kinds + err error ) migrator.StartMigration() @@ -123,10 +123,10 @@ func TestPGMigrator(t *testing.T) { sourceRels, err = ops.FetchRelationships(tx.Relationships()) require.Nil(t, err) - neoNodeKinds, err = tools.GetNeo4jNodeKinds(testContext.Context(), tx) + sourceNodeKinds, err = tools.GetNeo4jNodeKinds(testContext.Context(), tx) require.Nil(t, err) - neoEdgeKinds, err = tools.GetNeo4jEdgeKinds(testContext.Context(), tx) + sourceEdgeKinds, err = tools.GetNeo4jEdgeKinds(testContext.Context(), tx) require.Nil(t, err) return nil @@ -173,7 +173,7 @@ func TestPGMigrator(t *testing.T) { targetKinds, err := pg_query.On(tx).SelectKinds() require.Nil(t, err) - for _, kind := range append(neoNodeKinds, neoEdgeKinds...) { + for _, kind := range append(sourceNodeKinds, sourceEdgeKinds...) { require.NotNil(t, targetKinds[kind]) } From c983cb0ed2680cefc29b2054598fc5ec9429eb8f Mon Sep 17 00:00:00 2001 From: Wesley Robert Maffly-Kipp Date: Thu, 25 Apr 2024 11:21:39 -0700 Subject: [PATCH 06/14] refactor kinds check --- cmd/api/src/api/tools/pg.go | 67 +++++++++-------------- cmd/api/src/api/tools/pg_test.go | 39 ++++++++----- cmd/api/src/test/integration/harnesses.go | 7 +-- 3 files changed, 52 insertions(+), 61 deletions(-) diff --git a/cmd/api/src/api/tools/pg.go b/cmd/api/src/api/tools/pg.go index fe3dd650bc..75ed1bf7d4 100644 --- a/cmd/api/src/api/tools/pg.go +++ b/cmd/api/src/api/tools/pg.go @@ -50,59 +50,42 @@ func migrateTypes(ctx context.Context, neoDB, pgDB graph.Database) error { ) if err := neoDB.ReadTransaction(ctx, func(tx graph.Transaction) error { - if nodeKinds, err := GetNeo4jNodeKinds(ctx, tx); err != nil { - return err - } else if edgeKinds, err := GetNeo4jEdgeKinds(ctx, tx); err != nil { - return err - } else { - neoNodeKinds = nodeKinds - neoEdgeKinds = edgeKinds - return nil - } - }); err != nil { - return err - } + var ( + nextKindStr string + result = tx.Raw("call db.labels();", nil) + ) - return pgDB.WriteTransaction(ctx, func(tx graph.Transaction) error { - _, err := pgDB.(*pg.Driver).KindMapper().AssertKinds(tx, append(neoNodeKinds, neoEdgeKinds...)) - return err - }) -} - -func GetNeo4jNodeKinds(ctx context.Context, tx graph.Transaction) (graph.Kinds, error) { - var ( - nextKindStr string - kinds graph.Kinds - result = tx.Raw("call db.labels();", nil) - ) + for result.Next() { + if err := result.Scan(&nextKindStr); err != nil { + return err + } - for result.Next() { - if err := result.Scan(&nextKindStr); err != nil { - return nil, err + neoNodeKinds = append(neoNodeKinds, graph.StringKind(nextKindStr)) } - kinds = append(kinds, graph.StringKind(nextKindStr)) - } + if err := result.Error(); err != nil { + return err + } - return kinds, result.Error() -} + result = tx.Raw("call db.relationshipTypes();", nil) -func GetNeo4jEdgeKinds(ctx context.Context, tx graph.Transaction) (graph.Kinds, error) { - var ( - nextKindStr string - kinds graph.Kinds - result = tx.Raw("call db.relationshipTypes();", nil) - ) + for result.Next() { + if err := result.Scan(&nextKindStr); err != nil { + return err + } - for result.Next() { - if err := result.Scan(&nextKindStr); err != nil { - return nil, err + neoEdgeKinds = append(neoEdgeKinds, graph.StringKind(nextKindStr)) } - kinds = append(kinds, graph.StringKind(nextKindStr)) + return nil + }); err != nil { + return err } - return kinds, result.Error() + return pgDB.WriteTransaction(ctx, func(tx graph.Transaction) error { + _, err := pgDB.(*pg.Driver).KindMapper().AssertKinds(tx, append(neoNodeKinds, neoEdgeKinds...)) + return err + }) } func convertNeo4jProperties(properties *graph.Properties) error { diff --git a/cmd/api/src/api/tools/pg_test.go b/cmd/api/src/api/tools/pg_test.go index eaf101125c..43390201a9 100644 --- a/cmd/api/src/api/tools/pg_test.go +++ b/cmd/api/src/api/tools/pg_test.go @@ -4,6 +4,7 @@ import ( "context" "net/http" "net/http/httptest" + "slices" "testing" "time" @@ -85,8 +86,11 @@ func TestSwitchNeo4j(t *testing.T) { } func TestPGMigrator(t *testing.T) { - testContext := integration.NewGraphTestContext(t, graphschema.DefaultGraphSchema()) - integration.SetupDB(t) + var ( + schema = graphschema.DefaultGraphSchema() + testContext = integration.NewGraphTestContext(t, schema) + _ = integration.SetupDB(t) + ) testContext.DatabaseTestWithSetup(func(harness *integration.HarnessDetails) error { harness.DBMigrateHarness.Setup(testContext) @@ -95,10 +99,10 @@ func TestPGMigrator(t *testing.T) { var ( migrator = setupTestMigrator(t, testContext.Context(), neo4jDB) testID = harness.DBMigrateHarness.TestID.String() - sourceNodes []*graph.Node - sourceRels []*graph.Relationship sourceNodeKinds graph.Kinds sourceEdgeKinds graph.Kinds + sourceNodes []*graph.Node + sourceEdges []*graph.Relationship err error ) @@ -115,23 +119,28 @@ func TestPGMigrator(t *testing.T) { } } - // query nodes/relationships/types in neo4j + // query nodes/relationships in neo4j neo4jDB.ReadTransaction(testContext.Context(), func(tx graph.Transaction) error { sourceNodes, err = ops.FetchNodes(tx.Nodes()) require.Nil(t, err) - sourceRels, err = ops.FetchRelationships(tx.Relationships()) - require.Nil(t, err) - - sourceNodeKinds, err = tools.GetNeo4jNodeKinds(testContext.Context(), tx) - require.Nil(t, err) - - sourceEdgeKinds, err = tools.GetNeo4jEdgeKinds(testContext.Context(), tx) + sourceEdges, err = ops.FetchRelationships(tx.Relationships()) require.Nil(t, err) return nil }) + // grab source kinds + // NOTE: the call to db.labels() in our migrator returns all possible node kinds in neo4j, while db.relationshipTypes() + // returns just those edge kinds that have an associated edge in the db, so that is the behavior we are testing here + sourceNodeKinds = schema.DefaultGraph.Nodes + + for _, edge := range sourceEdges { + if !slices.Contains(sourceEdgeKinds, edge.Kind) { + sourceEdgeKinds = append(sourceEdgeKinds, edge.Kind) + } + } + // get reference to pg graph db pgDB, err := migrator.OpenPostgresGraphConnection() require.Nil(t, err) @@ -156,8 +165,8 @@ func TestPGMigrator(t *testing.T) { } // check edges - for _, sourceRel := range sourceRels { - id, err := sourceRel.Properties.Get(testID).String() + for _, sourceEdge := range sourceEdges { + id, err := sourceEdge.Properties.Get(testID).String() require.Nil(t, err) if targetRel, err := tx.Relationships().Filterf(func() graph.Criteria { @@ -165,7 +174,7 @@ func TestPGMigrator(t *testing.T) { }).First(); err != nil { t.Fatalf("Could not find migrated relationship with '%s' == %s", testID, id) } else { - require.Equal(t, sourceRel.Kind, targetRel.Kind) + require.Equal(t, sourceEdge.Kind, targetRel.Kind) } } diff --git a/cmd/api/src/test/integration/harnesses.go b/cmd/api/src/test/integration/harnesses.go index b4d6a6c0fd..e0d3dd3de9 100644 --- a/cmd/api/src/test/integration/harnesses.go +++ b/cmd/api/src/test/integration/harnesses.go @@ -6289,11 +6289,11 @@ func (s *ESC4ECA) Setup(graphTestContext *GraphTestContext) { graphTestContext.NewRelationship(s.Computer7, s.CertTemplate7, ad.GenericAll) } -// Use this alias to set our custom test property in the harness +// Use this to set our custom test property in the migration harness type Property string -func (p Property) String() string { - return string(p) +func (s Property) String() string { + return string(s) } type DBMigrateHarness struct { @@ -6316,7 +6316,6 @@ func (s *DBMigrateHarness) Setup(graphTestContext *GraphTestContext) { s.Group1.Properties.Set(s.TestID.String(), RandomObjectID(graphTestContext.testCtx)) s.Computer1.Properties.Set(s.TestID.String(), RandomObjectID(graphTestContext.testCtx)) s.User1.Properties.Set(s.TestID.String(), RandomObjectID(graphTestContext.testCtx)) - // Log our property values next graphTestContext.UpdateNode(s.Group1) graphTestContext.UpdateNode(s.Computer1) graphTestContext.UpdateNode(s.User1) From 7834c1e503292008c0d3e9dc8577962df431fc64 Mon Sep 17 00:00:00 2001 From: Wesley Robert Maffly-Kipp Date: Tue, 7 May 2024 11:06:12 -0700 Subject: [PATCH 07/14] added integration test header --- cmd/api/src/api/tools/pg_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/cmd/api/src/api/tools/pg_test.go b/cmd/api/src/api/tools/pg_test.go index 43390201a9..e9e87a4938 100644 --- a/cmd/api/src/api/tools/pg_test.go +++ b/cmd/api/src/api/tools/pg_test.go @@ -1,3 +1,22 @@ +// Copyright 2024 Specter Ops, Inc. +// +// Licensed under the Apache License, Version 2.0 +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//go:build integration +// +build integration + package tools_test import ( From e34cfbf169a2de4e7ccc812dc214568c5f75867e Mon Sep 17 00:00:00 2001 From: Wesley Robert Maffly-Kipp Date: Tue, 7 May 2024 11:40:11 -0700 Subject: [PATCH 08/14] bump ci From 9792907b293313ab9c0a71d743ca779fa054d66f Mon Sep 17 00:00:00 2001 From: Wesley Robert Maffly-Kipp Date: Tue, 7 May 2024 16:27:28 -0700 Subject: [PATCH 09/14] added step to drop pg graph schema before migrating, added documentation --- cmd/api/src/api/tools/PG_MIGRATE.md | 20 ++++++++++++++++++++ cmd/api/src/api/tools/pg.go | 12 +++++++++++- cmd/api/src/api/tools/pg_test.go | 1 - 3 files changed, 31 insertions(+), 2 deletions(-) create mode 100644 cmd/api/src/api/tools/PG_MIGRATE.md diff --git a/cmd/api/src/api/tools/PG_MIGRATE.md b/cmd/api/src/api/tools/PG_MIGRATE.md new file mode 100644 index 0000000000..bb5d7998c6 --- /dev/null +++ b/cmd/api/src/api/tools/PG_MIGRATE.md @@ -0,0 +1,20 @@ +## Migrating Graph Data to Postgres + +### Endpoints +| Endpoint | HTTP Request | Usage | Expected Response | +| --- | --- | --- | --- | +| `/pg-migration/status/` | `GET` | Returns a status indicating whether the migrator is currently running. | **Status:** `200 OK`
{
  "state": "idle" \| "migrating" \| "canceling"
}
| +| `/pg-migration/start/` | `PUT` | Kicks off the migration process from neo4j to postgres. | **Status:** `202 Accepted` | +| `/pg-migration/cancel/` | `PUT` | Cancels the currently running migration. | **Status:** `202 Accepted` | +| `/graph-db/switch/pg/` | `PUT` | Switches the current graph database driver to postgres. | **Status"** `200 OK` | +| `/graph-db/switch/ne04j/` | `PUT` | Switches the current graph database driver to ne04j. | **Status"** `200 OK` | + +### Running a Migration +1. Confirm the migration status is currently "idle" before running a migration with the `/pg-migration/status/` endpoint. The migration will run in the same direction regardless of the currently selected graph driver. +2. Start the migration process using the `/pg-migration/start/` endpoint. Since the migration occurs asynchronously, you will want to monitor the API logs to see information regarding the currently running migration. + - When the migration starts, there should be a log with the message `"Dispatching live migration from Neo4j to PostgreSQL"` + - Upon completion, you should see the message `"Migration to PostgreSQL completed successfully"` + - Any errors that occur during the migration process will also surface here + - You can also poll the `/pg-migration/status/` endpoint and wait for an `"idle"` status to indicate the migration has completed + - An in-progess migration can be cancelled with the `pg-migration/cancel/` endpoint and run again at any time +3. Once you are ready to switch over to the postgres graph driver, you can use the `/graph-db/switch/pg/` endpoint. \ No newline at end of file diff --git a/cmd/api/src/api/tools/pg.go b/cmd/api/src/api/tools/pg.go index 8c5c0b9c4a..cdb9f0be82 100644 --- a/cmd/api/src/api/tools/pg.go +++ b/cmd/api/src/api/tools/pg.go @@ -22,10 +22,12 @@ import ( "net/http" "sync" + "github.com/jackc/pgx/v5" "github.com/neo4j/neo4j-go-driver/v5/neo4j/dbtype" "github.com/specterops/bloodhound/dawgs" "github.com/specterops/bloodhound/dawgs/drivers/neo4j" "github.com/specterops/bloodhound/dawgs/drivers/pg" + "github.com/specterops/bloodhound/dawgs/drivers/pg/query" "github.com/specterops/bloodhound/dawgs/graph" "github.com/specterops/bloodhound/dawgs/util/size" "github.com/specterops/bloodhound/log" @@ -280,7 +282,9 @@ func (s *PGMigrator) StartMigration() error { log.Infof("Starting live migration from Neo4j to PostgreSQL") - if err := pgDB.AssertSchema(ctx, s.graphSchema); err != nil { + if err := dropCurrentGraphSchema(ctx, pgDB); err != nil { + log.Errorf("Unable to drop graph schema in PostgreSQL: %v", err) + } else if err := pgDB.AssertSchema(ctx, s.graphSchema); err != nil { log.Errorf("Unable to assert graph schema in PostgreSQL: %v", err) } else if err := migrateTypes(ctx, neo4jDB, pgDB); err != nil { log.Errorf("Unable to migrate Neo4j kinds to PostgreSQL: %v", err) @@ -350,3 +354,9 @@ func (s *PGMigrator) OpenNeo4jGraphConnection() (graph.Database, error) { DriverCfg: s.Cfg.Neo4J.Neo4jConnectionString(), }) } + +func dropCurrentGraphSchema(ctx context.Context, db graph.Database) error { + return db.WriteTransaction(ctx, func(tx graph.Transaction) error { + return query.On(tx).DropSchema() + }, pg.OptionSetQueryExecMode(pgx.QueryExecModeSimpleProtocol)) +} diff --git a/cmd/api/src/api/tools/pg_test.go b/cmd/api/src/api/tools/pg_test.go index e9e87a4938..76b72fbb21 100644 --- a/cmd/api/src/api/tools/pg_test.go +++ b/cmd/api/src/api/tools/pg_test.go @@ -108,7 +108,6 @@ func TestPGMigrator(t *testing.T) { var ( schema = graphschema.DefaultGraphSchema() testContext = integration.NewGraphTestContext(t, schema) - _ = integration.SetupDB(t) ) testContext.DatabaseTestWithSetup(func(harness *integration.HarnessDetails) error { From b71ca25f0f49d1ce3a76d898dcff0686dd7d514d Mon Sep 17 00:00:00 2001 From: Wesley Robert Maffly-Kipp Date: Tue, 7 May 2024 16:28:20 -0700 Subject: [PATCH 10/14] fixed docs typo --- cmd/api/src/api/tools/PG_MIGRATE.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/api/src/api/tools/PG_MIGRATE.md b/cmd/api/src/api/tools/PG_MIGRATE.md index bb5d7998c6..ad293f12f5 100644 --- a/cmd/api/src/api/tools/PG_MIGRATE.md +++ b/cmd/api/src/api/tools/PG_MIGRATE.md @@ -6,8 +6,8 @@ | `/pg-migration/status/` | `GET` | Returns a status indicating whether the migrator is currently running. | **Status:** `200 OK`
{
  "state": "idle" \| "migrating" \| "canceling"
}
| | `/pg-migration/start/` | `PUT` | Kicks off the migration process from neo4j to postgres. | **Status:** `202 Accepted` | | `/pg-migration/cancel/` | `PUT` | Cancels the currently running migration. | **Status:** `202 Accepted` | -| `/graph-db/switch/pg/` | `PUT` | Switches the current graph database driver to postgres. | **Status"** `200 OK` | -| `/graph-db/switch/ne04j/` | `PUT` | Switches the current graph database driver to ne04j. | **Status"** `200 OK` | +| `/graph-db/switch/pg/` | `PUT` | Switches the current graph database driver to postgres. | **Status:** `200 OK` | +| `/graph-db/switch/ne04j/` | `PUT` | Switches the current graph database driver to ne04j. | **Status:** `200 OK` | ### Running a Migration 1. Confirm the migration status is currently "idle" before running a migration with the `/pg-migration/status/` endpoint. The migration will run in the same direction regardless of the currently selected graph driver. From d6a5492b064ce251905402564e5f1fa970dbfbe1 Mon Sep 17 00:00:00 2001 From: Wesley Robert Maffly-Kipp Date: Tue, 7 May 2024 16:29:27 -0700 Subject: [PATCH 11/14] fixed docs formatting --- cmd/api/src/api/tools/PG_MIGRATE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/api/src/api/tools/PG_MIGRATE.md b/cmd/api/src/api/tools/PG_MIGRATE.md index ad293f12f5..91ac1ad9d9 100644 --- a/cmd/api/src/api/tools/PG_MIGRATE.md +++ b/cmd/api/src/api/tools/PG_MIGRATE.md @@ -3,7 +3,7 @@ ### Endpoints | Endpoint | HTTP Request | Usage | Expected Response | | --- | --- | --- | --- | -| `/pg-migration/status/` | `GET` | Returns a status indicating whether the migrator is currently running. | **Status:** `200 OK`
{
  "state": "idle" \| "migrating" \| "canceling"
}
| +| `/pg-migration/status/` | `GET` | Returns a status indicating whether the migrator is currently running. | **Status:** `200 OK`

{
  "state": "idle" \| "migrating" \| "canceling"
}
| | `/pg-migration/start/` | `PUT` | Kicks off the migration process from neo4j to postgres. | **Status:** `202 Accepted` | | `/pg-migration/cancel/` | `PUT` | Cancels the currently running migration. | **Status:** `202 Accepted` | | `/graph-db/switch/pg/` | `PUT` | Switches the current graph database driver to postgres. | **Status:** `200 OK` | From 2bb2d299f93e25120f46bd6d6a8811d5f7b48b98 Mon Sep 17 00:00:00 2001 From: Wesley Robert Maffly-Kipp Date: Tue, 7 May 2024 16:34:13 -0700 Subject: [PATCH 12/14] updated docs --- cmd/api/src/api/tools/PG_MIGRATE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/api/src/api/tools/PG_MIGRATE.md b/cmd/api/src/api/tools/PG_MIGRATE.md index 91ac1ad9d9..ff0d70e0ff 100644 --- a/cmd/api/src/api/tools/PG_MIGRATE.md +++ b/cmd/api/src/api/tools/PG_MIGRATE.md @@ -1,4 +1,4 @@ -## Migrating Graph Data to Postgres +## Migrating Graph Data from Neo4j to Postgres ### Endpoints | Endpoint | HTTP Request | Usage | Expected Response | From 63fcc87a6a8cc9747987490b80dbb9d57fadbcd2 Mon Sep 17 00:00:00 2001 From: Wesley Robert Maffly-Kipp Date: Tue, 7 May 2024 18:07:17 -0700 Subject: [PATCH 13/14] bump ci From 3ffa5b9b43326c95dcb2c227c33a6b90f5b48e9e Mon Sep 17 00:00:00 2001 From: Wesley Robert Maffly-Kipp Date: Thu, 16 May 2024 14:16:17 -0700 Subject: [PATCH 14/14] switched pg testing db to correct version --- docker-compose.testing.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.testing.yml b/docker-compose.testing.yml index b4196658e2..2e5f798078 100644 --- a/docker-compose.testing.yml +++ b/docker-compose.testing.yml @@ -17,7 +17,7 @@ services: testdb: restart: unless-stopped - image: docker.io/library/postgres:16.2 + image: docker.io/library/postgres:16.1 command: "-c log_statement=all" environment: - POSTGRES_USER=bloodhound