diff --git a/core/services/job/job_orm_test.go b/core/services/job/job_orm_test.go index bc6ec0d6ea2..0aa79f2f0b2 100644 --- a/core/services/job/job_orm_test.go +++ b/core/services/job/job_orm_test.go @@ -155,7 +155,7 @@ func TestORM(t *testing.T) { require.NoError(t, err) require.Len(t, dbSpecs, 3) - err = orm.DeleteJob(testutils.Context(t), jb.ID) + err = orm.DeleteJob(testutils.Context(t), jb.ID, jb.Type) require.NoError(t, err) dbSpecs = []job.Job{} @@ -312,7 +312,7 @@ func TestORM(t *testing.T) { require.Equal(t, bhsJob.BlockhashStoreSpec.RunTimeout, savedJob.BlockhashStoreSpec.RunTimeout) require.Equal(t, bhsJob.BlockhashStoreSpec.EVMChainID, savedJob.BlockhashStoreSpec.EVMChainID) require.Equal(t, bhsJob.BlockhashStoreSpec.FromAddresses, savedJob.BlockhashStoreSpec.FromAddresses) - err = orm.DeleteJob(ctx, bhsJob.ID) + err = orm.DeleteJob(ctx, bhsJob.ID, bhsJob.Type) require.NoError(t, err) _, err = orm.FindJob(testutils.Context(t), bhsJob.ID) require.Error(t, err) @@ -344,7 +344,7 @@ func TestORM(t *testing.T) { require.Equal(t, bhsJob.BlockHeaderFeederSpec.FromAddresses, savedJob.BlockHeaderFeederSpec.FromAddresses) require.Equal(t, bhsJob.BlockHeaderFeederSpec.GetBlockhashesBatchSize, savedJob.BlockHeaderFeederSpec.GetBlockhashesBatchSize) require.Equal(t, bhsJob.BlockHeaderFeederSpec.StoreBlockhashesBatchSize, savedJob.BlockHeaderFeederSpec.StoreBlockhashesBatchSize) - err = orm.DeleteJob(ctx, bhsJob.ID) + err = orm.DeleteJob(ctx, bhsJob.ID, bhsJob.Type) require.NoError(t, err) _, err = orm.FindJob(testutils.Context(t), bhsJob.ID) require.Error(t, err) @@ -387,7 +387,7 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) { cltest.AssertCount(t, db, "ocr_oracle_specs", 1) cltest.AssertCount(t, db, "pipeline_specs", 1) - err = jobORM.DeleteJob(ctx, jb.ID) + err = jobORM.DeleteJob(ctx, jb.ID, jb.Type) require.NoError(t, err) cltest.AssertCount(t, db, "ocr_oracle_specs", 0) cltest.AssertCount(t, db, "pipeline_specs", 0) @@ -403,7 +403,7 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) { cltest.AssertCount(t, db, "keeper_registries", 1) cltest.AssertCount(t, db, "upkeep_registrations", 1) - err := jobORM.DeleteJob(ctx, keeperJob.ID) + err := jobORM.DeleteJob(ctx, keeperJob.ID, keeperJob.Type) require.NoError(t, err) cltest.AssertCount(t, db, "keeper_specs", 0) cltest.AssertCount(t, db, "keeper_registries", 0) @@ -423,7 +423,7 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) { require.NoError(t, err) cltest.AssertCount(t, db, "vrf_specs", 1) cltest.AssertCount(t, db, "jobs", 1) - err = jobORM.DeleteJob(ctx, jb.ID) + err = jobORM.DeleteJob(ctx, jb.ID, jb.Type) require.NoError(t, err) cltest.AssertCount(t, db, "vrf_specs", 0) cltest.AssertCount(t, db, "jobs", 0) @@ -436,7 +436,7 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) { _, err := db.Exec(`INSERT INTO external_initiator_webhook_specs (external_initiator_id, webhook_spec_id, spec) VALUES ($1,$2,$3)`, ei.ID, webhookSpec.ID, `{"ei": "foo", "name": "webhookSpecTwoEIs"}`) require.NoError(t, err) - err = jobORM.DeleteJob(ctx, jb.ID) + err = jobORM.DeleteJob(ctx, jb.ID, jb.Type) require.NoError(t, err) cltest.AssertCount(t, db, "webhook_specs", 0) cltest.AssertCount(t, db, "external_initiator_webhook_specs", 0) @@ -523,7 +523,7 @@ func TestORM_CreateJob_VRFV2(t *testing.T) { var vrfOwnerAddress evmtypes.EIP55Address require.NoError(t, db.Get(&vrfOwnerAddress, `SELECT vrf_owner_address FROM vrf_specs LIMIT 1`)) require.Equal(t, "0x32891BD79647DC9136Fc0a59AAB48c7825eb624c", vrfOwnerAddress.Address().String()) - require.NoError(t, jobORM.DeleteJob(ctx, jb.ID)) + require.NoError(t, jobORM.DeleteJob(ctx, jb.ID, jb.Type)) cltest.AssertCount(t, db, "vrf_specs", 0) cltest.AssertCount(t, db, "jobs", 0) @@ -536,7 +536,7 @@ func TestORM_CreateJob_VRFV2(t *testing.T) { require.Equal(t, int64(0), requestedConfsDelay) require.NoError(t, db.Get(&requestTimeout, `SELECT request_timeout FROM vrf_specs LIMIT 1`)) require.Equal(t, 1*time.Hour, requestTimeout) - require.NoError(t, jobORM.DeleteJob(ctx, jb.ID)) + require.NoError(t, jobORM.DeleteJob(ctx, jb.ID, jb.Type)) cltest.AssertCount(t, db, "vrf_specs", 0) cltest.AssertCount(t, db, "jobs", 0) } @@ -607,7 +607,7 @@ func TestORM_CreateJob_VRFV2Plus(t *testing.T) { require.ElementsMatch(t, fromAddresses, actual) var vrfOwnerAddress evmtypes.EIP55Address require.Error(t, db.Get(&vrfOwnerAddress, `SELECT vrf_owner_address FROM vrf_specs LIMIT 1`)) - require.NoError(t, jobORM.DeleteJob(ctx, jb.ID)) + require.NoError(t, jobORM.DeleteJob(ctx, jb.ID, jb.Type)) cltest.AssertCount(t, db, "vrf_specs", 0) cltest.AssertCount(t, db, "jobs", 0) @@ -624,7 +624,7 @@ func TestORM_CreateJob_VRFV2Plus(t *testing.T) { require.Equal(t, int64(0), requestedConfsDelay) require.NoError(t, db.Get(&requestTimeout, `SELECT request_timeout FROM vrf_specs LIMIT 1`)) require.Equal(t, 1*time.Hour, requestTimeout) - require.NoError(t, jobORM.DeleteJob(ctx, jb.ID)) + require.NoError(t, jobORM.DeleteJob(ctx, jb.ID, jb.Type)) cltest.AssertCount(t, db, "vrf_specs", 0) cltest.AssertCount(t, db, "jobs", 0) } @@ -652,7 +652,7 @@ func TestORM_CreateJob_OCRBootstrap(t *testing.T) { require.NoError(t, db.Get(&relay, `SELECT relay FROM bootstrap_specs LIMIT 1`)) require.Equal(t, "evm", relay) - require.NoError(t, jobORM.DeleteJob(ctx, jb.ID)) + require.NoError(t, jobORM.DeleteJob(ctx, jb.ID, jb.Type)) cltest.AssertCount(t, db, "bootstrap_specs", 0) cltest.AssertCount(t, db, "jobs", 0) } diff --git a/core/services/job/kv_orm_test.go b/core/services/job/kv_orm_test.go index 0f229f09d88..f943a4bb6b4 100644 --- a/core/services/job/kv_orm_test.go +++ b/core/services/job/kv_orm_test.go @@ -73,5 +73,5 @@ func TestJobKVStore(t *testing.T) { require.NoError(t, err) require.Equal(t, td2, fetchedBytes) - require.NoError(t, jobORM.DeleteJob(ctx, jobID)) + require.NoError(t, jobORM.DeleteJob(ctx, jobID, jb.Type)) } diff --git a/core/services/job/mocks/orm.go b/core/services/job/mocks/orm.go index 38bf08ab3fd..89426b55a21 100644 --- a/core/services/job/mocks/orm.go +++ b/core/services/job/mocks/orm.go @@ -277,17 +277,17 @@ func (_c *ORM_DataSource_Call) RunAndReturn(run func() sqlutil.DataSource) *ORM_ return _c } -// DeleteJob provides a mock function with given fields: ctx, id -func (_m *ORM) DeleteJob(ctx context.Context, id int32) error { - ret := _m.Called(ctx, id) +// DeleteJob provides a mock function with given fields: ctx, id, jobType +func (_m *ORM) DeleteJob(ctx context.Context, id int32, jobType job.Type) error { + ret := _m.Called(ctx, id, jobType) if len(ret) == 0 { panic("no return value specified for DeleteJob") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, int32) error); ok { - r0 = rf(ctx, id) + if rf, ok := ret.Get(0).(func(context.Context, int32, job.Type) error); ok { + r0 = rf(ctx, id, jobType) } else { r0 = ret.Error(0) } @@ -303,13 +303,14 @@ type ORM_DeleteJob_Call struct { // DeleteJob is a helper method to define mock.On call // - ctx context.Context // - id int32 -func (_e *ORM_Expecter) DeleteJob(ctx interface{}, id interface{}) *ORM_DeleteJob_Call { - return &ORM_DeleteJob_Call{Call: _e.mock.On("DeleteJob", ctx, id)} +// - jobType job.Type +func (_e *ORM_Expecter) DeleteJob(ctx interface{}, id interface{}, jobType interface{}) *ORM_DeleteJob_Call { + return &ORM_DeleteJob_Call{Call: _e.mock.On("DeleteJob", ctx, id, jobType)} } -func (_c *ORM_DeleteJob_Call) Run(run func(ctx context.Context, id int32)) *ORM_DeleteJob_Call { +func (_c *ORM_DeleteJob_Call) Run(run func(ctx context.Context, id int32, jobType job.Type)) *ORM_DeleteJob_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int32)) + run(args[0].(context.Context), args[1].(int32), args[2].(job.Type)) }) return _c } @@ -319,7 +320,7 @@ func (_c *ORM_DeleteJob_Call) Return(_a0 error) *ORM_DeleteJob_Call { return _c } -func (_c *ORM_DeleteJob_Call) RunAndReturn(run func(context.Context, int32) error) *ORM_DeleteJob_Call { +func (_c *ORM_DeleteJob_Call) RunAndReturn(run func(context.Context, int32, job.Type) error) *ORM_DeleteJob_Call { _c.Call.Return(run) return _c } diff --git a/core/services/job/orm.go b/core/services/job/orm.go index a86da5f7111..1e1d8a98700 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -52,7 +52,7 @@ type ORM interface { FindJobIDByAddress(ctx context.Context, address evmtypes.EIP55Address, evmChainID *big.Big) (int32, error) FindOCR2JobIDByAddress(ctx context.Context, contractID string, feedID *common.Hash) (int32, error) FindJobIDsWithBridge(ctx context.Context, name string) ([]int32, error) - DeleteJob(ctx context.Context, id int32) error + DeleteJob(ctx context.Context, id int32, jobType Type) error RecordError(ctx context.Context, jobID int32, description string) error // TryRecordError is a helper which calls RecordError and logs the returned error if present. TryRecordError(ctx context.Context, jobID int32, description string) @@ -709,14 +709,35 @@ func (o *orm) InsertJob(ctx context.Context, job *Job) error { } // DeleteJob removes a job -func (o *orm) DeleteJob(ctx context.Context, id int32) error { +func (o *orm) DeleteJob(ctx context.Context, id int32, jobType Type) error { o.lggr.Debugw("Deleting job", "jobID", id) + queries := map[Type]string{ + DirectRequest: `DELETE FROM direct_request_specs WHERE id IN (SELECT direct_request_spec_id FROM deleted_jobs)`, + FluxMonitor: `DELETE FROM flux_monitor_specs WHERE id IN (SELECT flux_monitor_spec_id FROM deleted_jobs)`, + OffchainReporting: `DELETE FROM ocr_oracle_specs WHERE id IN (SELECT ocr_oracle_spec_id FROM deleted_jobs)`, + OffchainReporting2: `DELETE FROM ocr2_oracle_specs WHERE id IN (SELECT ocr2_oracle_spec_id FROM deleted_jobs)`, + Keeper: `DELETE FROM keeper_specs WHERE id IN (SELECT keeper_spec_id FROM deleted_jobs)`, + Cron: `DELETE FROM cron_specs WHERE id IN (SELECT cron_spec_id FROM deleted_jobs)`, + VRF: `DELETE FROM vrf_specs WHERE id IN (SELECT vrf_spec_id FROM deleted_jobs)`, + Webhook: `DELETE FROM webhook_specs WHERE id IN (SELECT webhook_spec_id FROM deleted_jobs)`, + BlockhashStore: `DELETE FROM blockhash_store_specs WHERE id IN (SELECT blockhash_store_spec_id FROM deleted_jobs)`, + Bootstrap: `DELETE FROM bootstrap_specs WHERE id IN (SELECT bootstrap_spec_id FROM deleted_jobs)`, + BlockHeaderFeeder: `DELETE FROM block_header_feeder_specs WHERE id IN (SELECT block_header_feeder_spec_id FROM deleted_jobs)`, + Gateway: `DELETE FROM gateway_specs WHERE id IN (SELECT gateway_spec_id FROM deleted_jobs)`, + Workflow: `DELETE FROM workflow_specs WHERE id in (SELECT workflow_spec_id FROM deleted_jobs)`, + StandardCapabilities: `DELETE FROM standardcapabilities_specs WHERE id in (SELECT standard_capabilities_spec_id FROM deleted_jobs)`, + CCIP: `DELETE FROM ccip_specs WHERE id in (SELECT ccip_spec_id FROM deleted_jobs)`, + } + q, ok := queries[jobType] + if !ok { + return errors.Errorf("job type %s not supported", jobType) + } // Added a 1-minute timeout to this query since this can take a long time as data increases. // This was added specifically due to an issue with a database that had a million of pipeline_runs and pipeline_task_runs // and this query was taking ~40secs. ctx, cancel := context.WithTimeout(sqlutil.WithoutDefaultTimeout(ctx), time.Minute) defer cancel() - query := ` + query := fmt.Sprintf(` WITH deleted_jobs AS ( DELETE FROM jobs WHERE id = $1 RETURNING id, @@ -736,55 +757,13 @@ func (o *orm) DeleteJob(ctx context.Context, id int32) error { standard_capabilities_spec_id, ccip_spec_id ), - deleted_oracle_specs AS ( - DELETE FROM ocr_oracle_specs WHERE id IN (SELECT ocr_oracle_spec_id FROM deleted_jobs) - ), - deleted_oracle2_specs AS ( - DELETE FROM ocr2_oracle_specs WHERE id IN (SELECT ocr2_oracle_spec_id FROM deleted_jobs) - ), - deleted_keeper_specs AS ( - DELETE FROM keeper_specs WHERE id IN (SELECT keeper_spec_id FROM deleted_jobs) - ), - deleted_cron_specs AS ( - DELETE FROM cron_specs WHERE id IN (SELECT cron_spec_id FROM deleted_jobs) - ), - deleted_fm_specs AS ( - DELETE FROM flux_monitor_specs WHERE id IN (SELECT flux_monitor_spec_id FROM deleted_jobs) - ), - deleted_vrf_specs AS ( - DELETE FROM vrf_specs WHERE id IN (SELECT vrf_spec_id FROM deleted_jobs) - ), - deleted_webhook_specs AS ( - DELETE FROM webhook_specs WHERE id IN (SELECT webhook_spec_id FROM deleted_jobs) - ), - deleted_dr_specs AS ( - DELETE FROM direct_request_specs WHERE id IN (SELECT direct_request_spec_id FROM deleted_jobs) - ), - deleted_blockhash_store_specs AS ( - DELETE FROM blockhash_store_specs WHERE id IN (SELECT blockhash_store_spec_id FROM deleted_jobs) - ), - deleted_bootstrap_specs AS ( - DELETE FROM bootstrap_specs WHERE id IN (SELECT bootstrap_spec_id FROM deleted_jobs) - ), - deleted_block_header_feeder_specs AS ( - DELETE FROM block_header_feeder_specs WHERE id IN (SELECT block_header_feeder_spec_id FROM deleted_jobs) - ), - deleted_gateway_specs AS ( - DELETE FROM gateway_specs WHERE id IN (SELECT gateway_spec_id FROM deleted_jobs) - ), - deleted_workflow_specs AS ( - DELETE FROM workflow_specs WHERE id in (SELECT workflow_spec_id FROM deleted_jobs) - ), - deleted_standardcapabilities_specs AS ( - DELETE FROM standardcapabilities_specs WHERE id in (SELECT standard_capabilities_spec_id FROM deleted_jobs) - ), - deleted_ccip_specs AS ( - DELETE FROM ccip_specs WHERE id in (SELECT ccip_spec_id FROM deleted_jobs) + deleted_specific_specs AS ( + %s ), deleted_job_pipeline_specs AS ( DELETE FROM job_pipeline_specs WHERE job_id IN (SELECT id FROM deleted_jobs) RETURNING pipeline_spec_id ) - DELETE FROM pipeline_specs WHERE id IN (SELECT pipeline_spec_id FROM deleted_job_pipeline_specs)` + DELETE FROM pipeline_specs WHERE id IN (SELECT pipeline_spec_id FROM deleted_job_pipeline_specs)`, q) res, err := o.ds.ExecContext(ctx, query, id) if err != nil { return errors.Wrap(err, "DeleteJob failed to delete job") diff --git a/core/services/job/orm_test.go b/core/services/job/orm_test.go index 11f3e94f2d4..6bfe141ad59 100644 --- a/core/services/job/orm_test.go +++ b/core/services/job/orm_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" diff --git a/core/services/job/runner_integration_test.go b/core/services/job/runner_integration_test.go index af3a1f5698d..7856c4f151f 100644 --- a/core/services/job/runner_integration_test.go +++ b/core/services/job/runner_integration_test.go @@ -185,7 +185,7 @@ func TestRunner(t *testing.T) { require.Equal(t, 1, len(jids)) // But if we delete the job, then we can. - require.NoError(t, jobORM.DeleteJob(ctx, jb.ID)) + require.NoError(t, jobORM.DeleteJob(ctx, jb.ID, jb.Type)) jids, err = jobORM.FindJobIDsWithBridge(ctx, bridge.Name.String()) require.NoError(t, err) require.Equal(t, 0, len(jids)) @@ -660,7 +660,7 @@ answer1 [type=median index=0]; } // Ensure we can delete an errored - err = jobORM.DeleteJob(ctx, jb.ID) + err = jobORM.DeleteJob(ctx, jb.ID, jb.Type) require.NoError(t, err) se = []job.SpecError{} err = db.Select(&se, `SELECT * FROM job_spec_errors`) @@ -745,7 +745,7 @@ answer1 [type=median index=0]; assert.Equal(t, "4242", results.Values[0].(decimal.Decimal).String()) // Delete the job - err = jobORM.DeleteJob(ctx, jb.ID) + err = jobORM.DeleteJob(ctx, jb.ID, jb.Type) require.NoError(t, err) // Create another run, it should fail diff --git a/core/services/job/spawner.go b/core/services/job/spawner.go index 16889cbe10b..cc45320de10 100644 --- a/core/services/job/spawner.go +++ b/core/services/job/spawner.go @@ -315,7 +315,7 @@ func (js *spawner) DeleteJob(ctx context.Context, ds sqlutil.DataSource, jobID i lggr.Debugw("Callback: BeforeDeleteJob done") err := sqlutil.Transact(ctx, js.orm.WithDataSource, ds, nil, func(tx ORM) error { - err := tx.DeleteJob(ctx, jobID) + err := tx.DeleteJob(ctx, jobID, aj.spec.Type) if err != nil { js.lggr.Errorw("Error deleting job", "jobID", jobID, "err", err) return err