Skip to content

Commit

Permalink
Fixes txmgr tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzaldysanchez committed Mar 19, 2024
1 parent 47b2cf8 commit 0831653
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 13 deletions.
1 change: 1 addition & 0 deletions core/chains/evm/txmgr/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) {
eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false, nonceTracker)
ctx := testutils.Context(t)

require.NoError(t, commonutils.JustError(db.Exec(`SET CONSTRAINTS fk_pipeline_runs_job DEFERRED`)))
require.NoError(t, commonutils.JustError(db.Exec(`SET CONSTRAINTS pipeline_runs_pipeline_spec_id_fkey DEFERRED`)))

t.Run("if external wallet sent a transaction from the account and now the nonce is one higher than it should be and we got replacement underpriced then we assume a previous transaction of ours was the one that succeeded, and hand off to EthConfirmer", func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"

commonclient "github.com/smartcontractkit/chainlink/v2/common/client"
commonfee "github.com/smartcontractkit/chainlink/v2/common/fee"
txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr"
Expand Down Expand Up @@ -2954,6 +2955,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {

minConfirmations := int64(2)

pgtest.MustExec(t, db, `SET CONSTRAINTS fk_pipeline_runs_job DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS pipeline_runs_pipeline_spec_id_fkey DEFERRED`)

t.Run("doesn't process task runs that are not suspended (possibly already previously resumed)", func(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/logger"

txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
Expand Down Expand Up @@ -627,6 +628,7 @@ func TestORM_FindTxesPendingCallback(t *testing.T) {
ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
_, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore)

pgtest.MustExec(t, db, `SET CONSTRAINTS fk_pipeline_runs_job DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS pipeline_runs_pipeline_spec_id_fkey DEFERRED`)

head := evmtypes.Head{
Expand Down Expand Up @@ -656,7 +658,7 @@ func TestORM_FindTxesPendingCallback(t *testing.T) {
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr1.ID, minConfirmations, etx1.ID)

// Callback to pipeline service completed. Should be ignored
run2 := cltest.MustInsertPipelineRunWithStatus(t, db, 0, pipeline.RunStatusCompleted)
run2 := cltest.MustInsertPipelineRunWithStatus(t, db, 0, pipeline.RunStatusCompleted, 0)
tr2 := cltest.MustInsertUnfinishedPipelineTaskRun(t, db, run2.ID)
etx2 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 4, 1, fromAddress)
pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": false}'`)
Expand Down
7 changes: 4 additions & 3 deletions core/internal/cltest/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/core/auth"
Expand Down Expand Up @@ -449,11 +450,11 @@ func MustInsertUpkeepForRegistry(t *testing.T, db *sqlx.DB, cfg pg.QConfig, regi
}

func MustInsertPipelineRun(t *testing.T, db *sqlx.DB) (run pipeline.Run) {
require.NoError(t, db.Get(&run, `INSERT INTO pipeline_runs (state,pipeline_spec_id,created_at) VALUES ($1, 0, NOW()) RETURNING *`, pipeline.RunStatusRunning))
require.NoError(t, db.Get(&run, `INSERT INTO pipeline_runs (state,pipeline_spec_id,pruning_key,created_at) VALUES ($1, 0, 0, NOW()) RETURNING *`, pipeline.RunStatusRunning))
return run
}

func MustInsertPipelineRunWithStatus(t *testing.T, db *sqlx.DB, pipelineSpecID int32, status pipeline.RunStatus) (run pipeline.Run) {
func MustInsertPipelineRunWithStatus(t *testing.T, db *sqlx.DB, pipelineSpecID int32, status pipeline.RunStatus, jobID int32) (run pipeline.Run) {
var finishedAt *time.Time
var outputs pipeline.JSONSerializable
var allErrors pipeline.RunErrors
Expand All @@ -475,7 +476,7 @@ func MustInsertPipelineRunWithStatus(t *testing.T, db *sqlx.DB, pipelineSpecID i
default:
t.Fatalf("unknown status: %s", status)
}
require.NoError(t, db.Get(&run, `INSERT INTO pipeline_runs (state,pipeline_spec_id,finished_at,outputs,all_errors,fatal_errors,created_at) VALUES ($1, $2, $3, $4, $5, $6, NOW()) RETURNING *`, status, pipelineSpecID, finishedAt, outputs, allErrors, fatalErrors))
require.NoError(t, db.Get(&run, `INSERT INTO pipeline_runs (state,pipeline_spec_id,pruning_key,finished_at,outputs,all_errors,fatal_errors,created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) RETURNING *`, status, pipelineSpecID, jobID, finishedAt, outputs, allErrors, fatalErrors))
return run
}

Expand Down
1 change: 1 addition & 0 deletions core/services/pipeline/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Run struct {
ID int64 `json:"-"`
JobID int32 `json:"-"`
PipelineSpecID int32 `json:"-"`
PruningKey int32 `json:"-"` // This currently refers to the upstream job ID
PipelineSpec Spec `json:"pipelineSpec"`
Meta JSONSerializable `json:"meta"`
// The errors are only ever strings
Expand Down
21 changes: 13 additions & 8 deletions core/services/pipeline/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/cometbft/cometbft/libs/rand"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink-common/pkg/utils/hex"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
Expand Down Expand Up @@ -715,8 +717,10 @@ func Test_Prune(t *testing.T) {

ps1 := cltest.MustInsertPipelineSpec(t, db)

jobID := rand.Int32()

t.Run("when there are no runs to prune, does nothing", func(t *testing.T) {
porm.Prune(db, ps1.ID)
porm.Prune(db, jobID)

// no error logs; it did nothing
assert.Empty(t, observed.All())
Expand All @@ -725,30 +729,31 @@ func Test_Prune(t *testing.T) {
// ps1 has:
// - 20 completed runs
for i := 0; i < 20; i++ {
cltest.MustInsertPipelineRunWithStatus(t, db, ps1.ID, pipeline.RunStatusCompleted)
cltest.MustInsertPipelineRunWithStatus(t, db, ps1.ID, pipeline.RunStatusCompleted, jobID)
}

ps2 := cltest.MustInsertPipelineSpec(t, db)

jobID2 := rand.Int32()
// ps2 has:
// - 12 completed runs
// - 3 errored runs
// - 3 running run
// - 3 running runs
// - 3 suspended run
for i := 0; i < 12; i++ {
cltest.MustInsertPipelineRunWithStatus(t, db, ps2.ID, pipeline.RunStatusCompleted)
cltest.MustInsertPipelineRunWithStatus(t, db, ps2.ID, pipeline.RunStatusCompleted, jobID2)
}
for i := 0; i < 3; i++ {
cltest.MustInsertPipelineRunWithStatus(t, db, ps2.ID, pipeline.RunStatusErrored)
cltest.MustInsertPipelineRunWithStatus(t, db, ps2.ID, pipeline.RunStatusErrored, jobID2)
}
for i := 0; i < 3; i++ {
cltest.MustInsertPipelineRunWithStatus(t, db, ps2.ID, pipeline.RunStatusRunning)
cltest.MustInsertPipelineRunWithStatus(t, db, ps2.ID, pipeline.RunStatusRunning, jobID2)
}
for i := 0; i < 3; i++ {
cltest.MustInsertPipelineRunWithStatus(t, db, ps2.ID, pipeline.RunStatusSuspended)
cltest.MustInsertPipelineRunWithStatus(t, db, ps2.ID, pipeline.RunStatusSuspended, jobID2)
}

porm.Prune(db, ps2.ID)
porm.Prune(db, jobID2)

cnt := pgtest.MustCount(t, db, "SELECT count(*) FROM pipeline_runs WHERE pipeline_spec_id = $1 AND state = $2", ps1.ID, pipeline.RunStatusCompleted)
assert.Equal(t, cnt, 20)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ WHERE pjps.pipeline_spec_id = pipeline_runs.pipeline_spec_id;

ALTER TABLE pipeline_runs ALTER COLUMN pruning_key SET NOT NULL;

ALTER TABLE pipeline_runs ADD CONSTRAINT fk_pipeline_runs_job FOREIGN KEY (pruning_key) REFERENCES jobs(id);
ALTER TABLE pipeline_runs ADD CONSTRAINT fk_pipeline_runs_job FOREIGN KEY (pruning_key) REFERENCES jobs(id) ON DELETE CASCADE DEFERRABLE;
-- +goose StatementEnd

-- +goose Down
Expand Down

0 comments on commit 0831653

Please sign in to comment.