From 4e45352582ebe656d835f023da6403bb374520d6 Mon Sep 17 00:00:00 2001 From: Amir Y <83904651+amirylm@users.noreply.github.com> Date: Thu, 31 Aug 2023 17:30:51 +0300 Subject: [PATCH] Fix recoverer cleanup (#10417) * fix: remove log from pending, not just from visited cache * recoverer: add/rm pending * check removePending --- .../ocr2keeper/evm21/logprovider/recoverer.go | 51 +++++++++--- .../evm21/logprovider/recoverer_test.go | 78 ++++++++++++++++--- 2 files changed, 109 insertions(+), 20 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index 6e1dd4d4b39..7ac981e9952 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -400,7 +400,7 @@ func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller. visitedAt: time.Now(), payload: payload, } - r.pending = append(r.pending, payload) + r.addPending(payload) } return len(r.pending) - pendingSizeBefore, alreadyPending } @@ -511,26 +511,23 @@ func (r *logRecoverer) clean(ctx context.Context) { lggr.Debug("no expired upkeeps") return } - cleaned, err := r.tryExpire(ctx, expired...) + err := r.tryExpire(ctx, expired...) if err != nil { lggr.Warnw("failed to clean visited upkeeps", "err", err) } - if len(expired) > 0 { - lggr.Debugw("expired upkeeps", "expired", len(expired), "cleaned", cleaned) - } } -func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) (int, error) { +func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { latestBlock, err := r.poller.LatestBlock(pg.WithParentCtx(ctx)) if err != nil { - return 0, fmt.Errorf("failed to get latest block: %w", err) + return fmt.Errorf("failed to get latest block: %w", err) } sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) states, err := r.states.SelectByWorkIDs(ctx, ids...) if err != nil { - return 0, fmt.Errorf("failed to get states: %w", err) + return fmt.Errorf("failed to get states: %w", err) } lggr := r.lggr.With("where", "clean") start, _ := r.getRecoveryWindow(latestBlock) @@ -550,12 +547,13 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) (int, error if logBlock := rec.payload.Trigger.LogTriggerExtension.BlockNumber; int64(logBlock) < start { // we can't recover this log anymore, so we remove it from the visited list lggr.Debugw("removing expired log: old block", "upkeepID", rec.payload.UpkeepID, - "logBlock", logBlock, "start", start) + "latestBlock", latestBlock, "logBlock", logBlock, "start", start) + r.removePending(rec.payload.WorkID) delete(r.visited, ids[i]) removed++ continue } - r.pending = append(r.pending, rec.payload) + r.addPending(rec.payload) rec.visitedAt = time.Now() r.visited[ids[i]] = rec default: @@ -564,5 +562,36 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) (int, error } } - return removed, nil + if removed > 0 { + lggr.Debugw("expired upkeeps", "expired", len(ids), "cleaned", removed) + } + + return nil +} + +// addPending adds a payload to the pending list if it's not already there. +// NOTE: the lock must be held before calling this function. +func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) { + var exist bool + pending := r.pending + for _, p := range pending { + if p.WorkID == payload.WorkID { + exist = true + } + } + if !exist { + r.pending = append(pending, payload) + } +} + +// removePending removes a payload from the pending list. +// NOTE: the lock must be held before calling this function. +func (r *logRecoverer) removePending(workID string) { + updated := make([]ocr2keepers.UpkeepPayload, 0, len(r.pending)) + for _, p := range r.pending { + if p.WorkID != workID { + updated = append(updated, p) + } + } + r.pending = updated } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go index e1024e585ea..7f01434c2b9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go @@ -3,6 +3,7 @@ package logprovider import ( "context" "fmt" + "math" "math/big" "sort" "testing" @@ -97,11 +98,12 @@ func TestLogRecoverer_GetRecoverables(t *testing.T) { } func TestLogRecoverer_Clean(t *testing.T) { + oldLogsOffset := int64(20) + tests := []struct { name string pending []ocr2keepers.UpkeepPayload visited map[string]visitedRecord - latestBlock int64 states []ocr2keepers.UpkeepState wantPending []ocr2keepers.UpkeepPayload wantVisited []string @@ -110,7 +112,6 @@ func TestLogRecoverer_Clean(t *testing.T) { "empty", []ocr2keepers.UpkeepPayload{}, map[string]visitedRecord{}, - 0, []ocr2keepers.UpkeepState{}, []ocr2keepers.UpkeepPayload{}, []string{}, @@ -120,13 +121,14 @@ func TestLogRecoverer_Clean(t *testing.T) { []ocr2keepers.UpkeepPayload{ {WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")}, {WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "2")}, + {WorkID: "3", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "3")}, }, map[string]visitedRecord{ "1": visitedRecord{time.Now(), ocr2keepers.UpkeepPayload{ WorkID: "1", Trigger: ocr2keepers.Trigger{ LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 50, + BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset * 2), }, }, }}, @@ -134,7 +136,7 @@ func TestLogRecoverer_Clean(t *testing.T) { WorkID: "2", Trigger: ocr2keepers.Trigger{ LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 50, + BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset * 2), }, }, }}, @@ -142,7 +144,7 @@ func TestLogRecoverer_Clean(t *testing.T) { WorkID: "3", Trigger: ocr2keepers.Trigger{ LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 50, + BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset - 10), }, }, }}, @@ -150,14 +152,13 @@ func TestLogRecoverer_Clean(t *testing.T) { WorkID: "4", Trigger: ocr2keepers.Trigger{ LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 50, + BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset + 10), }, }, }}, }, - 200, []ocr2keepers.UpkeepState{ - ocr2keepers.Ineligible, + ocr2keepers.UnknownState, ocr2keepers.UnknownState, }, []ocr2keepers.UpkeepPayload{ @@ -176,8 +177,10 @@ func TestLogRecoverer_Clean(t *testing.T) { lookbackBlocks := int64(100) r, _, lp, statesReader := setupTestRecoverer(t, time.Millisecond*50, lookbackBlocks) + start, _ := r.getRecoveryWindow(0) + block24h := int64(math.Abs(float64(start))) - lp.On("LatestBlock", mock.Anything).Return(tc.latestBlock, nil) + lp.On("LatestBlock", mock.Anything).Return(block24h+oldLogsOffset, nil) statesReader.On("SelectByWorkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.states, nil) r.lock.Lock() @@ -914,6 +917,63 @@ func TestLogRecoverer_GetProposalData(t *testing.T) { } } +func TestLogRecoverer_pending(t *testing.T) { + tests := []struct { + name string + exist []ocr2keepers.UpkeepPayload + new []ocr2keepers.UpkeepPayload + want []ocr2keepers.UpkeepPayload + }{ + { + "empty", + []ocr2keepers.UpkeepPayload{}, + []ocr2keepers.UpkeepPayload{}, + []ocr2keepers.UpkeepPayload{}, + }, + { + "add new and existing", + []ocr2keepers.UpkeepPayload{ + {WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")}, + }, + []ocr2keepers.UpkeepPayload{ + {WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")}, + {WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "2")}, + }, + []ocr2keepers.UpkeepPayload{ + {WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")}, + {WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "2")}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + r := NewLogRecoverer(logger.TestLogger(t), nil, nil, nil, nil, nil, NewOptions(200)) + r.lock.Lock() + r.pending = tc.exist + for _, p := range tc.new { + r.addPending(p) + } + pending := r.pending + require.GreaterOrEqual(t, len(pending), len(tc.new)) + require.Equal(t, len(tc.want), len(pending)) + sort.Slice(pending, func(i, j int) bool { + return pending[i].WorkID < pending[j].WorkID + }) + for i := range pending { + require.Equal(t, tc.want[i].WorkID, pending[i].WorkID) + } + r.lock.Unlock() + for _, p := range tc.want { + r.removePending(p.WorkID) + } + r.lock.Lock() + defer r.lock.Unlock() + require.Equal(t, 0, len(r.pending)) + }) + } +} + type mockFilterStore struct { UpkeepFilterStore HasFn func(id *big.Int) bool