Skip to content

Commit

Permalink
Refactor DeleteJob to improve performance (#15228)
Browse files Browse the repository at this point in the history
* Refactors `DeleteJob` to receive the job type

* Fixes CI
  • Loading branch information
vyzaldysanchez authored Nov 14, 2024
1 parent 2fcfb81 commit 99ab8b4
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 75 deletions.
24 changes: 12 additions & 12 deletions core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestORM(t *testing.T) {
require.NoError(t, err)
require.Len(t, dbSpecs, 3)

err = orm.DeleteJob(testutils.Context(t), jb.ID)
err = orm.DeleteJob(testutils.Context(t), jb.ID, jb.Type)
require.NoError(t, err)

dbSpecs = []job.Job{}
Expand Down Expand Up @@ -312,7 +312,7 @@ func TestORM(t *testing.T) {
require.Equal(t, bhsJob.BlockhashStoreSpec.RunTimeout, savedJob.BlockhashStoreSpec.RunTimeout)
require.Equal(t, bhsJob.BlockhashStoreSpec.EVMChainID, savedJob.BlockhashStoreSpec.EVMChainID)
require.Equal(t, bhsJob.BlockhashStoreSpec.FromAddresses, savedJob.BlockhashStoreSpec.FromAddresses)
err = orm.DeleteJob(ctx, bhsJob.ID)
err = orm.DeleteJob(ctx, bhsJob.ID, bhsJob.Type)
require.NoError(t, err)
_, err = orm.FindJob(testutils.Context(t), bhsJob.ID)
require.Error(t, err)
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestORM(t *testing.T) {
require.Equal(t, bhsJob.BlockHeaderFeederSpec.FromAddresses, savedJob.BlockHeaderFeederSpec.FromAddresses)
require.Equal(t, bhsJob.BlockHeaderFeederSpec.GetBlockhashesBatchSize, savedJob.BlockHeaderFeederSpec.GetBlockhashesBatchSize)
require.Equal(t, bhsJob.BlockHeaderFeederSpec.StoreBlockhashesBatchSize, savedJob.BlockHeaderFeederSpec.StoreBlockhashesBatchSize)
err = orm.DeleteJob(ctx, bhsJob.ID)
err = orm.DeleteJob(ctx, bhsJob.ID, bhsJob.Type)
require.NoError(t, err)
_, err = orm.FindJob(testutils.Context(t), bhsJob.ID)
require.Error(t, err)
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) {
cltest.AssertCount(t, db, "ocr_oracle_specs", 1)
cltest.AssertCount(t, db, "pipeline_specs", 1)

err = jobORM.DeleteJob(ctx, jb.ID)
err = jobORM.DeleteJob(ctx, jb.ID, jb.Type)
require.NoError(t, err)
cltest.AssertCount(t, db, "ocr_oracle_specs", 0)
cltest.AssertCount(t, db, "pipeline_specs", 0)
Expand All @@ -403,7 +403,7 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) {
cltest.AssertCount(t, db, "keeper_registries", 1)
cltest.AssertCount(t, db, "upkeep_registrations", 1)

err := jobORM.DeleteJob(ctx, keeperJob.ID)
err := jobORM.DeleteJob(ctx, keeperJob.ID, keeperJob.Type)
require.NoError(t, err)
cltest.AssertCount(t, db, "keeper_specs", 0)
cltest.AssertCount(t, db, "keeper_registries", 0)
Expand All @@ -423,7 +423,7 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) {
require.NoError(t, err)
cltest.AssertCount(t, db, "vrf_specs", 1)
cltest.AssertCount(t, db, "jobs", 1)
err = jobORM.DeleteJob(ctx, jb.ID)
err = jobORM.DeleteJob(ctx, jb.ID, jb.Type)
require.NoError(t, err)
cltest.AssertCount(t, db, "vrf_specs", 0)
cltest.AssertCount(t, db, "jobs", 0)
Expand All @@ -436,7 +436,7 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) {
_, err := db.Exec(`INSERT INTO external_initiator_webhook_specs (external_initiator_id, webhook_spec_id, spec) VALUES ($1,$2,$3)`, ei.ID, webhookSpec.ID, `{"ei": "foo", "name": "webhookSpecTwoEIs"}`)
require.NoError(t, err)

err = jobORM.DeleteJob(ctx, jb.ID)
err = jobORM.DeleteJob(ctx, jb.ID, jb.Type)
require.NoError(t, err)
cltest.AssertCount(t, db, "webhook_specs", 0)
cltest.AssertCount(t, db, "external_initiator_webhook_specs", 0)
Expand Down Expand Up @@ -523,7 +523,7 @@ func TestORM_CreateJob_VRFV2(t *testing.T) {
var vrfOwnerAddress evmtypes.EIP55Address
require.NoError(t, db.Get(&vrfOwnerAddress, `SELECT vrf_owner_address FROM vrf_specs LIMIT 1`))
require.Equal(t, "0x32891BD79647DC9136Fc0a59AAB48c7825eb624c", vrfOwnerAddress.Address().String())
require.NoError(t, jobORM.DeleteJob(ctx, jb.ID))
require.NoError(t, jobORM.DeleteJob(ctx, jb.ID, jb.Type))
cltest.AssertCount(t, db, "vrf_specs", 0)
cltest.AssertCount(t, db, "jobs", 0)

Expand All @@ -536,7 +536,7 @@ func TestORM_CreateJob_VRFV2(t *testing.T) {
require.Equal(t, int64(0), requestedConfsDelay)
require.NoError(t, db.Get(&requestTimeout, `SELECT request_timeout FROM vrf_specs LIMIT 1`))
require.Equal(t, 1*time.Hour, requestTimeout)
require.NoError(t, jobORM.DeleteJob(ctx, jb.ID))
require.NoError(t, jobORM.DeleteJob(ctx, jb.ID, jb.Type))
cltest.AssertCount(t, db, "vrf_specs", 0)
cltest.AssertCount(t, db, "jobs", 0)
}
Expand Down Expand Up @@ -607,7 +607,7 @@ func TestORM_CreateJob_VRFV2Plus(t *testing.T) {
require.ElementsMatch(t, fromAddresses, actual)
var vrfOwnerAddress evmtypes.EIP55Address
require.Error(t, db.Get(&vrfOwnerAddress, `SELECT vrf_owner_address FROM vrf_specs LIMIT 1`))
require.NoError(t, jobORM.DeleteJob(ctx, jb.ID))
require.NoError(t, jobORM.DeleteJob(ctx, jb.ID, jb.Type))
cltest.AssertCount(t, db, "vrf_specs", 0)
cltest.AssertCount(t, db, "jobs", 0)

Expand All @@ -624,7 +624,7 @@ func TestORM_CreateJob_VRFV2Plus(t *testing.T) {
require.Equal(t, int64(0), requestedConfsDelay)
require.NoError(t, db.Get(&requestTimeout, `SELECT request_timeout FROM vrf_specs LIMIT 1`))
require.Equal(t, 1*time.Hour, requestTimeout)
require.NoError(t, jobORM.DeleteJob(ctx, jb.ID))
require.NoError(t, jobORM.DeleteJob(ctx, jb.ID, jb.Type))
cltest.AssertCount(t, db, "vrf_specs", 0)
cltest.AssertCount(t, db, "jobs", 0)
}
Expand Down Expand Up @@ -652,7 +652,7 @@ func TestORM_CreateJob_OCRBootstrap(t *testing.T) {
require.NoError(t, db.Get(&relay, `SELECT relay FROM bootstrap_specs LIMIT 1`))
require.Equal(t, "evm", relay)

require.NoError(t, jobORM.DeleteJob(ctx, jb.ID))
require.NoError(t, jobORM.DeleteJob(ctx, jb.ID, jb.Type))
cltest.AssertCount(t, db, "bootstrap_specs", 0)
cltest.AssertCount(t, db, "jobs", 0)
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/job/kv_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,5 @@ func TestJobKVStore(t *testing.T) {
require.NoError(t, err)
require.Equal(t, td2, fetchedBytes)

require.NoError(t, jobORM.DeleteJob(ctx, jobID))
require.NoError(t, jobORM.DeleteJob(ctx, jobID, jb.Type))
}
21 changes: 11 additions & 10 deletions core/services/job/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 27 additions & 48 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type ORM interface {
FindJobIDByAddress(ctx context.Context, address evmtypes.EIP55Address, evmChainID *big.Big) (int32, error)
FindOCR2JobIDByAddress(ctx context.Context, contractID string, feedID *common.Hash) (int32, error)
FindJobIDsWithBridge(ctx context.Context, name string) ([]int32, error)
DeleteJob(ctx context.Context, id int32) error
DeleteJob(ctx context.Context, id int32, jobType Type) error
RecordError(ctx context.Context, jobID int32, description string) error
// TryRecordError is a helper which calls RecordError and logs the returned error if present.
TryRecordError(ctx context.Context, jobID int32, description string)
Expand Down Expand Up @@ -709,14 +709,35 @@ func (o *orm) InsertJob(ctx context.Context, job *Job) error {
}

// DeleteJob removes a job
func (o *orm) DeleteJob(ctx context.Context, id int32) error {
func (o *orm) DeleteJob(ctx context.Context, id int32, jobType Type) error {
o.lggr.Debugw("Deleting job", "jobID", id)
queries := map[Type]string{
DirectRequest: `DELETE FROM direct_request_specs WHERE id IN (SELECT direct_request_spec_id FROM deleted_jobs)`,
FluxMonitor: `DELETE FROM flux_monitor_specs WHERE id IN (SELECT flux_monitor_spec_id FROM deleted_jobs)`,
OffchainReporting: `DELETE FROM ocr_oracle_specs WHERE id IN (SELECT ocr_oracle_spec_id FROM deleted_jobs)`,
OffchainReporting2: `DELETE FROM ocr2_oracle_specs WHERE id IN (SELECT ocr2_oracle_spec_id FROM deleted_jobs)`,
Keeper: `DELETE FROM keeper_specs WHERE id IN (SELECT keeper_spec_id FROM deleted_jobs)`,
Cron: `DELETE FROM cron_specs WHERE id IN (SELECT cron_spec_id FROM deleted_jobs)`,
VRF: `DELETE FROM vrf_specs WHERE id IN (SELECT vrf_spec_id FROM deleted_jobs)`,
Webhook: `DELETE FROM webhook_specs WHERE id IN (SELECT webhook_spec_id FROM deleted_jobs)`,
BlockhashStore: `DELETE FROM blockhash_store_specs WHERE id IN (SELECT blockhash_store_spec_id FROM deleted_jobs)`,
Bootstrap: `DELETE FROM bootstrap_specs WHERE id IN (SELECT bootstrap_spec_id FROM deleted_jobs)`,
BlockHeaderFeeder: `DELETE FROM block_header_feeder_specs WHERE id IN (SELECT block_header_feeder_spec_id FROM deleted_jobs)`,
Gateway: `DELETE FROM gateway_specs WHERE id IN (SELECT gateway_spec_id FROM deleted_jobs)`,
Workflow: `DELETE FROM workflow_specs WHERE id in (SELECT workflow_spec_id FROM deleted_jobs)`,
StandardCapabilities: `DELETE FROM standardcapabilities_specs WHERE id in (SELECT standard_capabilities_spec_id FROM deleted_jobs)`,
CCIP: `DELETE FROM ccip_specs WHERE id in (SELECT ccip_spec_id FROM deleted_jobs)`,
}
q, ok := queries[jobType]
if !ok {
return errors.Errorf("job type %s not supported", jobType)
}
// Added a 1-minute timeout to this query since this can take a long time as data increases.
// This was added specifically due to an issue with a database that had a million of pipeline_runs and pipeline_task_runs
// and this query was taking ~40secs.
ctx, cancel := context.WithTimeout(sqlutil.WithoutDefaultTimeout(ctx), time.Minute)
defer cancel()
query := `
query := fmt.Sprintf(`
WITH deleted_jobs AS (
DELETE FROM jobs WHERE id = $1 RETURNING
id,
Expand All @@ -736,55 +757,13 @@ func (o *orm) DeleteJob(ctx context.Context, id int32) error {
standard_capabilities_spec_id,
ccip_spec_id
),
deleted_oracle_specs AS (
DELETE FROM ocr_oracle_specs WHERE id IN (SELECT ocr_oracle_spec_id FROM deleted_jobs)
),
deleted_oracle2_specs AS (
DELETE FROM ocr2_oracle_specs WHERE id IN (SELECT ocr2_oracle_spec_id FROM deleted_jobs)
),
deleted_keeper_specs AS (
DELETE FROM keeper_specs WHERE id IN (SELECT keeper_spec_id FROM deleted_jobs)
),
deleted_cron_specs AS (
DELETE FROM cron_specs WHERE id IN (SELECT cron_spec_id FROM deleted_jobs)
),
deleted_fm_specs AS (
DELETE FROM flux_monitor_specs WHERE id IN (SELECT flux_monitor_spec_id FROM deleted_jobs)
),
deleted_vrf_specs AS (
DELETE FROM vrf_specs WHERE id IN (SELECT vrf_spec_id FROM deleted_jobs)
),
deleted_webhook_specs AS (
DELETE FROM webhook_specs WHERE id IN (SELECT webhook_spec_id FROM deleted_jobs)
),
deleted_dr_specs AS (
DELETE FROM direct_request_specs WHERE id IN (SELECT direct_request_spec_id FROM deleted_jobs)
),
deleted_blockhash_store_specs AS (
DELETE FROM blockhash_store_specs WHERE id IN (SELECT blockhash_store_spec_id FROM deleted_jobs)
),
deleted_bootstrap_specs AS (
DELETE FROM bootstrap_specs WHERE id IN (SELECT bootstrap_spec_id FROM deleted_jobs)
),
deleted_block_header_feeder_specs AS (
DELETE FROM block_header_feeder_specs WHERE id IN (SELECT block_header_feeder_spec_id FROM deleted_jobs)
),
deleted_gateway_specs AS (
DELETE FROM gateway_specs WHERE id IN (SELECT gateway_spec_id FROM deleted_jobs)
),
deleted_workflow_specs AS (
DELETE FROM workflow_specs WHERE id in (SELECT workflow_spec_id FROM deleted_jobs)
),
deleted_standardcapabilities_specs AS (
DELETE FROM standardcapabilities_specs WHERE id in (SELECT standard_capabilities_spec_id FROM deleted_jobs)
),
deleted_ccip_specs AS (
DELETE FROM ccip_specs WHERE id in (SELECT ccip_spec_id FROM deleted_jobs)
deleted_specific_specs AS (
%s
),
deleted_job_pipeline_specs AS (
DELETE FROM job_pipeline_specs WHERE job_id IN (SELECT id FROM deleted_jobs) RETURNING pipeline_spec_id
)
DELETE FROM pipeline_specs WHERE id IN (SELECT pipeline_spec_id FROM deleted_job_pipeline_specs)`
DELETE FROM pipeline_specs WHERE id IN (SELECT pipeline_spec_id FROM deleted_job_pipeline_specs)`, q)
res, err := o.ds.ExecContext(ctx, query, id)
if err != nil {
return errors.Wrap(err, "DeleteJob failed to delete job")
Expand Down
1 change: 1 addition & 0 deletions core/services/job/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
Expand Down
6 changes: 3 additions & 3 deletions core/services/job/runner_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestRunner(t *testing.T) {
require.Equal(t, 1, len(jids))

// But if we delete the job, then we can.
require.NoError(t, jobORM.DeleteJob(ctx, jb.ID))
require.NoError(t, jobORM.DeleteJob(ctx, jb.ID, jb.Type))
jids, err = jobORM.FindJobIDsWithBridge(ctx, bridge.Name.String())
require.NoError(t, err)
require.Equal(t, 0, len(jids))
Expand Down Expand Up @@ -660,7 +660,7 @@ answer1 [type=median index=0];
}

// Ensure we can delete an errored
err = jobORM.DeleteJob(ctx, jb.ID)
err = jobORM.DeleteJob(ctx, jb.ID, jb.Type)
require.NoError(t, err)
se = []job.SpecError{}
err = db.Select(&se, `SELECT * FROM job_spec_errors`)
Expand Down Expand Up @@ -745,7 +745,7 @@ answer1 [type=median index=0];
assert.Equal(t, "4242", results.Values[0].(decimal.Decimal).String())

// Delete the job
err = jobORM.DeleteJob(ctx, jb.ID)
err = jobORM.DeleteJob(ctx, jb.ID, jb.Type)
require.NoError(t, err)

// Create another run, it should fail
Expand Down
2 changes: 1 addition & 1 deletion core/services/job/spawner.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (js *spawner) DeleteJob(ctx context.Context, ds sqlutil.DataSource, jobID i
lggr.Debugw("Callback: BeforeDeleteJob done")

err := sqlutil.Transact(ctx, js.orm.WithDataSource, ds, nil, func(tx ORM) error {
err := tx.DeleteJob(ctx, jobID)
err := tx.DeleteJob(ctx, jobID, aj.spec.Type)
if err != nil {
js.lggr.Errorw("Error deleting job", "jobID", jobID, "err", err)
return err
Expand Down

0 comments on commit 99ab8b4

Please sign in to comment.