From bd080231a8df5b94a0251a9576f7c8e063b5c047 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Mon, 8 Jul 2024 15:57:51 +0100 Subject: [PATCH] Add logs WIP track recoverables with maps Track time Add logs Timestamp recover Bump recovery batch size --- .../evmregistry/v21/logprovider/buffer_v1.go | 6 +- .../evmregistry/v21/logprovider/provider.go | 1 - .../evmregistry/v21/logprovider/recoverer.go | 246 +- .../v21/logprovider/recoverer_test.go | 2410 ++++++++--------- 4 files changed, 1352 insertions(+), 1311 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index e58d5ad9c93..4d72ef7e87d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -281,7 +281,7 @@ const ( logTriggerStateDropped logTriggerState = iota // the log was enqueued by the buffer logTriggerStateEnqueued - // the log was visited/dequeued from the buffer + // the log was queuedForRecovery/dequeued from the buffer logTriggerStateDequeued ) @@ -293,7 +293,7 @@ type logTriggerStateEntry struct { } // upkeepLogQueue is a priority queue for logs associated to a specific upkeep. -// It keeps track of the logs that were already visited and the capacity of the queue. +// It keeps track of the logs that were already queuedForRecovery and the capacity of the queue. type upkeepLogQueue struct { lggr logger.Logger @@ -412,7 +412,7 @@ func (q *upkeepLogQueue) enqueue(blockThreshold int64, logsToAdd ...logpoller.Lo if added > 0 { q.orderLogs() dropped = q.clean(blockThreshold) - q.lggr.Debugw("Enqueued logs", "added", added, "dropped", dropped, "blockThreshold", blockThreshold, "q size", len(q.logs), "visited size", len(q.states)) + q.lggr.Debugw("Enqueued logs", "added", added, "dropped", dropped, "blockThreshold", blockThreshold, "q size", len(q.logs), "queuedForRecovery size", len(q.states)) } prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionIngress).Add(float64(added)) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index f1de1ef5129..669ab9c56f2 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -240,7 +240,6 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up } payloads = p.minimumCommitmentDequeue(latestBlock, start) - // if we have remaining capacity following minimum commitment dequeue, perform a best effort dequeue if len(payloads) < MaxPayloads { payloads = p.bestEffortDequeue(latestBlock, start, payloads) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index 9e41008ed83..042e975a49f 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -1,7 +1,6 @@ package logprovider import ( - "bytes" "context" "crypto/rand" "errors" @@ -43,7 +42,7 @@ var ( // MaxProposals is the maximum number of proposals that can be returned by GetRecoveryProposals MaxProposals = 20 // recoveryBatchSize is the number of filters to recover in a single batch - recoveryBatchSize = 10 + recoveryBatchSize = 100 // recoveryLogsBuffer is the number of blocks to be used as a safety buffer when reading logs recoveryLogsBuffer = int64(200) recoveryLogsBurst = int64(500) @@ -79,8 +78,10 @@ type logRecoverer struct { interval time.Duration lock sync.RWMutex - pending []ocr2keepers.UpkeepPayload - visited map[string]visitedRecord + pendingKeys []string + pendingPayloads map[string]ocr2keepers.UpkeepPayload + upkeepPayloadCount map[ocr2keepers.UpkeepIdentifier]int + queuedForRecovery map[string]visitedRecord filterStore UpkeepFilterStore states core.UpkeepStateReader @@ -104,14 +105,16 @@ func NewLogRecoverer(lggr logger.Logger, poller logpoller.LogPoller, client clie lookbackBlocks: new(atomic.Int64), interval: opts.ReadInterval * 5, - pending: make([]ocr2keepers.UpkeepPayload, 0), - visited: make(map[string]visitedRecord), - poller: poller, - filterStore: filterStore, - states: stateStore, - packer: packer, - client: client, - blockTimeResolver: newBlockTimeResolver(poller), + pendingKeys: make([]string, 0), + pendingPayloads: map[string]ocr2keepers.UpkeepPayload{}, + queuedForRecovery: make(map[string]visitedRecord), + upkeepPayloadCount: make(map[ocr2keepers.UpkeepIdentifier]int), + poller: poller, + filterStore: filterStore, + states: stateStore, + packer: packer, + client: client, + blockTimeResolver: newBlockTimeResolver(poller), finalityDepth: opts.FinalityDepth, } @@ -285,6 +288,11 @@ func (r *logRecoverer) getLogTriggerCheckData(ctx context.Context, proposal ocr2 } func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers.UpkeepPayload, error) { + start := time.Now() + defer func() { + r.lggr.Debugw("GetRecoveryProposals finished", "time", time.Since(start)) + }() + latestBlock, err := r.poller.LatestBlock(ctx) if err != nil { return nil, fmt.Errorf("%w: %s", ErrHeadNotAvailable, err) @@ -293,7 +301,7 @@ func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers. r.lock.Lock() defer r.lock.Unlock() - if len(r.pending) == 0 { + if len(r.pendingKeys) == 0 { return nil, nil } @@ -302,26 +310,33 @@ func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers. r.sortPending(uint64(latestBlock.BlockNumber)) - var results, pending []ocr2keepers.UpkeepPayload - for _, payload := range r.pending { + var results []ocr2keepers.UpkeepPayload + var pending []string + + for _, workID := range r.pendingKeys { if allLogsCounter >= MaxProposals { // we have enough proposals, the rest are pushed back to pending - pending = append(pending, payload) + pending = append(pending, workID) continue } + + payload := r.pendingPayloads[workID] + uid := payload.UpkeepID.String() if logsCount[uid] >= AllowedLogsPerUpkeep { // we have enough proposals for this upkeep, the rest are pushed back to pending - pending = append(pending, payload) + pending = append(pending, workID) continue } results = append(results, payload) + delete(r.pendingPayloads, workID) + logsCount[uid]++ allLogsCounter++ } - r.pending = pending - prommetrics.AutomationRecovererPendingPayloads.Set(float64(len(r.pending))) + r.pendingKeys = pending + prommetrics.AutomationRecovererPendingPayloads.Set(float64(len(r.pendingKeys))) r.lggr.Debugf("found %d recoverable payloads", len(results)) @@ -329,64 +344,52 @@ func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers. } func (r *logRecoverer) recover(ctx context.Context) error { + startTime := time.Now() + defer func() { + r.lggr.Debugw("recover finished", "time", time.Since(startTime)) + }() + latest, err := r.poller.LatestBlock(ctx) if err != nil { return fmt.Errorf("%w: %s", ErrHeadNotAvailable, err) } - start, offsetBlock := r.getRecoveryWindow(latest.BlockNumber) - if offsetBlock < 0 { + recoveryWindowStart, recoveryWindowEnd := r.getRecoveryWindow(latest.BlockNumber) + if recoveryWindowEnd < 0 { // too soon to recover, we don't have enough blocks return nil } - if start < 0 { - start = 0 + if recoveryWindowStart < 0 { + recoveryWindowStart = 0 } - filters := r.getFilterBatch(offsetBlock) + // only get filters that have not been updated after the end of the recovery window + // for all filters that are eligible based on this criteria, select 5 at random, and 5 based on oldest last repoll block + filters := r.getFilterBatch(recoveryWindowEnd) if len(filters) == 0 { return nil } - r.lggr.Debugw("recovering logs", "filters", filters, "startBlock", start, "offsetBlock", offsetBlock, "latestBlock", latest) + r.lggr.Debugw("recovering logs", "filters", filters, "startBlock", recoveryWindowStart, "recoveryWindowEnd", recoveryWindowEnd, "latestBlock", latest) - var wg sync.WaitGroup + // for up to 10 filters, recover each filter for _, f := range filters { - wg.Add(1) - go func(f upkeepFilter) { - defer wg.Done() - if err := r.recoverFilter(ctx, f, start, offsetBlock); err != nil { - r.lggr.Debugw("error recovering filter", "err", err.Error()) - } - }(f) + if err := r.recoverFilter(ctx, f, recoveryWindowStart, recoveryWindowEnd); err != nil { + r.lggr.Debugw("error recovering filter", "err", err.Error()) + } } - wg.Wait() return nil } // recoverFilter recovers logs for a single upkeep filter. func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startBlock, offsetBlock int64) error { - start := f.lastRePollBlock + 1 // NOTE: we expect f.lastRePollBlock + 1 <= offsetBlock, as others would have been filtered out - // ensure we don't recover logs from before the filter was created - if configUpdateBlock := int64(f.configUpdateBlock); start < configUpdateBlock { - // NOTE: we expect that configUpdateBlock <= offsetBlock, as others would have been filtered out - start = configUpdateBlock - } - if start < startBlock { - start = startBlock - } - end := start + recoveryLogsBuffer - if offsetBlock-end > 100*recoveryLogsBuffer { - // If recoverer is lagging by a lot (more than 100x recoveryLogsBuffer), allow - // a range of recoveryLogsBurst - // Exploratory: Store lastRePollBlock in DB to prevent bursts during restarts - // (while also taking into account existing pending payloads) - end = start + recoveryLogsBurst - } - if end > offsetBlock { - end = offsetBlock - } + startTime := time.Now() + defer func() { + r.lggr.Debugw("recoverFilter finished", "time", time.Since(startTime)) + }() + + start, end := r.getBlockRange(f, startBlock, offsetBlock) // we expect start to be > offsetBlock in any case logs, err := r.poller.LogsWithSigs(ctx, start, end, f.topics, common.BytesToAddress(f.addr)) if err != nil { @@ -433,14 +436,52 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB return nil } +// getBlockRange calculates the block range to work with; +// we first identify the last block polled for a particular filter, and bump forward by one +// we then fast forward to the config update block if thats newer +// we then fast forward to the beginning of the recovery window if that's newer +// we then set an end range of 200 blocks after the start block +// if the end of the recovery window is very far ahead of the end range, set the end range to 500 blocks after the start +// if the end exceeds the end of the recovery window, set the end to the end of the recovery window +func (r *logRecoverer) getBlockRange(f upkeepFilter, startBlock int64, offsetBlock int64) (int64, int64) { + start := f.lastRePollBlock + 1 // NOTE: we expect f.lastRePollBlock + 1 <= offsetBlock, as others would have been filtered out + // ensure we don't recover logs from before the filter was created + if configUpdateBlock := int64(f.configUpdateBlock); start < configUpdateBlock { + // NOTE: we expect that configUpdateBlock <= offsetBlock, as others would have been filtered out + start = configUpdateBlock + } + if start < startBlock { + start = startBlock + } + end := start + recoveryLogsBuffer + if offsetBlock-end > 100*recoveryLogsBuffer { + // If recoverer is lagging by a lot (more than 100x recoveryLogsBuffer), allow + // a range of recoveryLogsBurst + // Exploratory: Store lastRePollBlock in DB to prevent bursts during restarts + // (while also taking into account existing pending payloads) + end = start + recoveryLogsBurst + r.lggr.Debugw("recoverer is lagging, set new end range", "start", start, "recoveryLogsBurst", recoveryLogsBurst, "end", end) + } + if end > offsetBlock { + end = offsetBlock + r.lggr.Debugw("end range has exceeded recovery window, set new end range", "end", end) + } + return start, end +} + // populatePending adds the logs to the pending list if they are not already pending. // returns the number of logs added, the number of logs that were already pending, // and a flag that indicates whether some errors happened while we are trying to add to pending q. func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.Log) (int, int, bool) { + start := time.Now() + defer func() { + r.lggr.Debugw("populatePending finished", "time", time.Since(start)) + }() + r.lock.Lock() defer r.lock.Unlock() - pendingSizeBefore := len(r.pending) + pendingSizeBefore := len(r.pendingKeys) alreadyPending := 0 errs := make([]error, 0) for _, log := range filteredLogs { @@ -455,7 +496,7 @@ func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller. continue } wid := core.UpkeepWorkID(*upkeepId, trigger) - if _, ok := r.visited[wid]; ok { + if _, ok := r.queuedForRecovery[wid]; ok { alreadyPending++ continue } @@ -473,13 +514,13 @@ func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller. if err := r.addPending(payload); err != nil { errs = append(errs, err) } else { - r.visited[wid] = visitedRecord{ + r.queuedForRecovery[wid] = visitedRecord{ visitedAt: time.Now(), payload: payload, } } } - return len(r.pending) - pendingSizeBefore, alreadyPending, len(errs) == 0 + return len(r.pendingKeys) - pendingSizeBefore, alreadyPending, len(errs) == 0 } // filterFinalizedStates filters out the log upkeeps that have already been completed (performed or ineligible). @@ -515,11 +556,11 @@ func (r *logRecoverer) getRecoveryWindow(latest int64) (int64, int64) { } // getFilterBatch returns a batch of filters that are ready to be recovered. -func (r *logRecoverer) getFilterBatch(offsetBlock int64) []upkeepFilter { +func (r *logRecoverer) getFilterBatch(recoveryWindowEnd int64) []upkeepFilter { filters := r.filterStore.GetFilters(func(f upkeepFilter) bool { // ensure we work only on filters that are ready to be recovered // no need to recover in case f.configUpdateBlock is after offsetBlock - return f.lastRePollBlock < offsetBlock && int64(f.configUpdateBlock) <= offsetBlock + return f.lastRePollBlock < recoveryWindowEnd && int64(f.configUpdateBlock) <= recoveryWindowEnd }) sort.Slice(filters, func(i, j int) bool { @@ -584,9 +625,14 @@ func logToTrigger(log logpoller.Log) ocr2keepers.Trigger { } func (r *logRecoverer) clean(ctx context.Context) { + start := time.Now() + defer func() { + r.lggr.Debugw("clean finished", "time", time.Since(start)) + }() + r.lock.RLock() var expired []string - for id, t := range r.visited { + for id, t := range r.queuedForRecovery { if time.Since(t.visitedAt) > RecoveryCacheTTL { expired = append(expired, id) } @@ -599,7 +645,7 @@ func (r *logRecoverer) clean(ctx context.Context) { } err := r.tryExpire(ctx, expired...) if err != nil { - lggr.Warnw("failed to clean visited upkeeps", "err", err) + lggr.Warnw("failed to clean queuedForRecovery upkeeps", "err", err) } } @@ -625,26 +671,26 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { case ocr2keepers.UnknownState: // in case the state is unknown, we can't be sure if the upkeep was performed or not // so we push it back to the pending list - rec, ok := r.visited[ids[i]] + rec, ok := r.queuedForRecovery[ids[i]] if !ok { // in case it was removed by another thread continue } if logBlock := rec.payload.Trigger.LogTriggerExtension.BlockNumber; int64(logBlock) < start { - // we can't recover this log anymore, so we remove it from the visited list + // we can't recover this log anymore, so we remove it from the queuedForRecovery list lggr.Debugw("removing expired log: old block", "upkeepID", rec.payload.UpkeepID, "latestBlock", latestBlock, "logBlock", logBlock, "start", start) - r.removePending(rec.payload.WorkID) - delete(r.visited, ids[i]) + r.removePending(rec.payload) + delete(r.queuedForRecovery, ids[i]) removed++ continue } if err := r.addPending(rec.payload); err == nil { rec.visitedAt = time.Now() - r.visited[ids[i]] = rec + r.queuedForRecovery[ids[i]] = rec } default: - delete(r.visited, ids[i]) + delete(r.queuedForRecovery, ids[i]) removed++ } } @@ -659,22 +705,22 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { // addPending adds a payload to the pending list if it's not already there. // NOTE: the lock must be held before calling this function. func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) error { - var exist bool - pending := r.pending - upkeepPayloads := 0 - for _, p := range pending { - if bytes.Equal(p.UpkeepID[:], payload.UpkeepID[:]) { - upkeepPayloads++ - } - if p.WorkID == payload.WorkID { - exist = true - } - } - if upkeepPayloads >= maxPendingPayloadsPerUpkeep { + start := time.Now() + defer func() { + r.lggr.Debugw("addPending finished", "time", time.Since(start)) + }() + + if upkeepPayloads := r.upkeepPayloadCount[payload.UpkeepID]; upkeepPayloads >= maxPendingPayloadsPerUpkeep { return fmt.Errorf("upkeep %v has too many payloads in pending queue", payload.UpkeepID) } - if !exist { - r.pending = append(pending, payload) + if _, ok := r.pendingPayloads[payload.WorkID]; !ok { + r.pendingKeys = append(r.pendingKeys, payload.WorkID) + r.pendingPayloads[payload.WorkID] = payload + if count, ok := r.upkeepPayloadCount[payload.UpkeepID]; ok { + r.upkeepPayloadCount[payload.UpkeepID] = count + 1 + } else { + r.upkeepPayloadCount[payload.UpkeepID] = 1 + } prommetrics.AutomationRecovererPendingPayloads.Inc() } return nil @@ -682,35 +728,51 @@ func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) error { // removePending removes a payload from the pending list. // NOTE: the lock must be held before calling this function. -func (r *logRecoverer) removePending(workID string) { - updated := make([]ocr2keepers.UpkeepPayload, 0, len(r.pending)) - for _, p := range r.pending { - if p.WorkID != workID { +func (r *logRecoverer) removePending(payload ocr2keepers.UpkeepPayload) { + start := time.Now() + defer func() { + r.lggr.Debugw("removePending finished", "time", time.Since(start)) + }() + + updated := make([]string, 0, len(r.pendingKeys)) + for _, p := range r.pendingKeys { + if p != payload.WorkID { updated = append(updated, p) } else { prommetrics.AutomationRecovererPendingPayloads.Dec() } } - r.pending = updated + delete(r.pendingPayloads, payload.WorkID) + if count, ok := r.upkeepPayloadCount[payload.UpkeepID]; ok { + if count > 0 { + r.upkeepPayloadCount[payload.UpkeepID] = count - 1 + } + } + r.pendingKeys = updated } // sortPending sorts the pending list by a random order based on the normalized latest block number. // Divided by 10 to ensure that nodes with similar block numbers won't end up with different order. // NOTE: the lock must be held before calling this function. func (r *logRecoverer) sortPending(latestBlock uint64) { + start := time.Now() + defer func() { + r.lggr.Debugw("sortPending finished", "time", time.Since(start)) + }() + normalized := latestBlock / 100 if normalized == 0 { normalized = 1 } randSeed := random.GetRandomKeySource(nil, normalized) - shuffledIDs := make(map[string]string, len(r.pending)) - for _, p := range r.pending { - shuffledIDs[p.WorkID] = random.ShuffleString(p.WorkID, randSeed) + shuffledIDs := make(map[string]string, len(r.pendingKeys)) + for _, p := range r.pendingKeys { + shuffledIDs[p] = random.ShuffleString(p, randSeed) } - sort.SliceStable(r.pending, func(i, j int) bool { - return shuffledIDs[r.pending[i].WorkID] < shuffledIDs[r.pending[j].WorkID] + sort.SliceStable(r.pendingKeys, func(i, j int) bool { + return shuffledIDs[r.pendingKeys[i]] < shuffledIDs[r.pendingKeys[j]] }) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer_test.go index 65a05b2537e..75a288798b6 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer_test.go @@ -2,1200 +2,1179 @@ package logprovider import ( "context" - "fmt" - "math" - "math/big" - "sort" - "testing" - "time" - - types2 "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" "github.com/ethereum/go-ethereum/common" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" - lpmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" - ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core/mocks" ) -func TestLogRecoverer_GetRecoverables(t *testing.T) { - ctx := testutils.Context(t) - lp := &lpmocks.LogPoller{} - lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: 100}, nil) - r := NewLogRecoverer(logger.TestLogger(t), lp, nil, nil, nil, nil, NewOptions(200, big.NewInt(1))) - - tests := []struct { - name string - pending []ocr2keepers.UpkeepPayload - want []ocr2keepers.UpkeepPayload - wantErr bool - }{ - { - "empty", - []ocr2keepers.UpkeepPayload{}, - []ocr2keepers.UpkeepPayload{}, - false, - }, - { - "happy flow", - []ocr2keepers.UpkeepPayload{ - {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, - }, - []ocr2keepers.UpkeepPayload{ - {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, - }, - false, - }, - { - "rate limiting", - []ocr2keepers.UpkeepPayload{ - {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "3", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "4", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "5", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "6", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, - }, - []ocr2keepers.UpkeepPayload{ - {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "3", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "4", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "5", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, - }, - false, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - r.lock.Lock() - r.pending = tc.pending - r.lock.Unlock() - - got, err := r.GetRecoveryProposals(ctx) - if tc.wantErr { - require.Error(t, err) - } else { - require.NoError(t, err) - } - require.Len(t, got, len(tc.want)) - }) - } -} - -func TestLogRecoverer_Clean(t *testing.T) { - oldLogsOffset := int64(20) - - tests := []struct { - name string - pending []ocr2keepers.UpkeepPayload - visited map[string]visitedRecord - states []ocr2keepers.UpkeepState - wantPending []ocr2keepers.UpkeepPayload - wantVisited []string - }{ - { - "empty", - []ocr2keepers.UpkeepPayload{}, - map[string]visitedRecord{}, - []ocr2keepers.UpkeepState{}, - []ocr2keepers.UpkeepPayload{}, - []string{}, - }, - { - "clean expired", - []ocr2keepers.UpkeepPayload{ - {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, - {WorkID: "3", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "3")}, - }, - map[string]visitedRecord{ - "1": visitedRecord{time.Now(), ocr2keepers.UpkeepPayload{ - WorkID: "1", - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset * 2), - }, - }, - }}, - "2": visitedRecord{time.Now(), ocr2keepers.UpkeepPayload{ - WorkID: "2", - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset * 2), - }, - }, - }}, - "3": visitedRecord{time.Now().Add(-time.Hour), ocr2keepers.UpkeepPayload{ - WorkID: "3", - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset - 10), - }, - }, - }}, - "4": visitedRecord{time.Now().Add(-time.Hour), ocr2keepers.UpkeepPayload{ - WorkID: "4", - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset + 10), - }, - }, - }}, - }, - []ocr2keepers.UpkeepState{ - ocr2keepers.UnknownState, - ocr2keepers.UnknownState, - }, - []ocr2keepers.UpkeepPayload{ - {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, - {WorkID: "4", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "4")}, - }, - []string{"1", "2", "4"}, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(testutils.Context(t)) - defer cancel() - - lookbackBlocks := int64(100) - r, _, lp, statesReader := setupTestRecoverer(t, time.Millisecond*50, lookbackBlocks) - start, _ := r.getRecoveryWindow(0) - block24h := int64(math.Abs(float64(start))) - - lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: block24h + oldLogsOffset}, nil) - statesReader.On("SelectByWorkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.states, nil) - - r.lock.Lock() - r.pending = tc.pending - r.visited = tc.visited - r.lock.Unlock() - - r.clean(ctx) - - r.lock.RLock() - defer r.lock.RUnlock() - - pending := r.pending - require.Equal(t, len(tc.wantPending), len(pending)) - sort.Slice(pending, func(i, j int) bool { - return pending[i].WorkID < pending[j].WorkID - }) - for i := range pending { - require.Equal(t, tc.wantPending[i].WorkID, pending[i].WorkID) - } - require.Equal(t, len(tc.wantVisited), len(r.visited)) - for _, id := range tc.wantVisited { - _, ok := r.visited[id] - require.True(t, ok) - } - }) - } -} - -func TestLogRecoverer_Recover(t *testing.T) { - ctx := testutils.Context(t) - - tests := []struct { - name string - lookbackBlocks int64 - latestBlock int64 - latestBlockErr error - active []upkeepFilter - states []ocr2keepers.UpkeepState - statesErr error - logs []logpoller.Log - logsErr error - recoverErr error - proposalsWorkIDs []string - lastRePollBlocks []int64 - }{ - { - "no filters", - 200, - 300, - nil, - []upkeepFilter{}, - []ocr2keepers.UpkeepState{}, - nil, - []logpoller.Log{}, - nil, - nil, - []string{}, - []int64{}, - }, - { - "latest block error", - 200, - 0, - fmt.Errorf("test error"), - []upkeepFilter{}, - []ocr2keepers.UpkeepState{}, - nil, - []logpoller.Log{}, - nil, - fmt.Errorf("test error"), - []string{}, - []int64{}, - }, - { - "states error", - 100, - 200, - nil, - []upkeepFilter{ - { - upkeepID: big.NewInt(1), - addr: common.HexToAddress("0x1").Bytes(), - topics: []common.Hash{ - common.HexToHash("0x1"), - }, - }, - }, - nil, - fmt.Errorf("test error"), - []logpoller.Log{ - { - BlockNumber: 2, - TxHash: common.HexToHash("0x111"), - LogIndex: 1, - BlockHash: common.HexToHash("0x2"), - }, - }, - nil, - nil, - []string{}, - []int64{0}, - }, - { - "get logs error", - 200, - 300, - nil, - []upkeepFilter{ - { - upkeepID: big.NewInt(1), - addr: common.HexToAddress("0x1").Bytes(), - topics: []common.Hash{ - common.HexToHash("0x1"), - }, - }, - }, - []ocr2keepers.UpkeepState{}, - nil, - []logpoller.Log{}, - fmt.Errorf("test error"), - nil, - []string{}, - []int64{0}, - }, - { - "happy flow", - 100, - 500, - nil, - []upkeepFilter{ - { - upkeepID: big.NewInt(1), - addr: common.HexToAddress("0x1").Bytes(), - topics: []common.Hash{ - common.HexToHash("0x1"), - }, - }, - { - upkeepID: big.NewInt(2), - addr: common.HexToAddress("0x2").Bytes(), - topics: []common.Hash{ - common.HexToHash("0x2"), - }, - configUpdateBlock: 450, // should be filtered out - }, - { - upkeepID: big.NewInt(3), - addr: common.HexToAddress("0x2").Bytes(), - topics: []common.Hash{ - common.HexToHash("0x2"), - }, - lastRePollBlock: 450, // should be filtered out, as its higher than latest-lookback - }, - }, - []ocr2keepers.UpkeepState{ocr2keepers.UnknownState}, - nil, - []logpoller.Log{ - { - BlockNumber: 2, - TxHash: common.HexToHash("0x111"), - LogIndex: 1, - BlockHash: common.HexToHash("0x2"), - }, - }, - nil, - nil, - []string{"c207451fa897f9bb13b09d54d8655edf0644e027c53521b4a92eafbb64ba4d14"}, - []int64{201, 0, 450}, - }, - { - "lastRePollBlock updated with burst when lagging behind", - 100, - 50000, - nil, - []upkeepFilter{ - { - upkeepID: big.NewInt(1), - addr: common.HexToAddress("0x1").Bytes(), - topics: []common.Hash{ - common.HexToHash("0x1"), - }, - lastRePollBlock: 99, // Should be updated with burst - }, - }, - []ocr2keepers.UpkeepState{ocr2keepers.UnknownState}, - nil, - []logpoller.Log{ - { - BlockNumber: 2, - TxHash: common.HexToHash("0x111"), - LogIndex: 1, - BlockHash: common.HexToHash("0x2"), - }, - }, - nil, - nil, - []string{"c207451fa897f9bb13b09d54d8655edf0644e027c53521b4a92eafbb64ba4d14"}, - []int64{600}, - }, - { - "recovery starts at configUpdateBlock if higher than lastRePollBlock", - 100, - 5000, - nil, - []upkeepFilter{ - { - upkeepID: big.NewInt(1), - addr: common.HexToAddress("0x1").Bytes(), - topics: []common.Hash{ - common.HexToHash("0x1"), - }, - lastRePollBlock: 100, - configUpdateBlock: 500, - }, - }, - []ocr2keepers.UpkeepState{ocr2keepers.UnknownState}, - nil, - []logpoller.Log{ - { - BlockNumber: 2, - TxHash: common.HexToHash("0x111"), - LogIndex: 1, - BlockHash: common.HexToHash("0x2"), - }, - }, - nil, - nil, - []string{"c207451fa897f9bb13b09d54d8655edf0644e027c53521b4a92eafbb64ba4d14"}, - []int64{700}, // should be configUpdateBlock + recoveryLogsBuffer - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - lookbackBlocks := int64(100) - recoverer, filterStore, lp, statesReader := setupTestRecoverer(t, time.Millisecond*50, lookbackBlocks) - - filterStore.AddActiveUpkeeps(tc.active...) - lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: tc.latestBlock}, tc.latestBlockErr) - lp.On("LogsWithSigs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.logs, tc.logsErr) - statesReader.On("SelectByWorkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.states, tc.statesErr) - - err := recoverer.recover(ctx) - if tc.recoverErr != nil { - require.Error(t, err) - return - } - require.NoError(t, err) - for i, active := range tc.active { - filters := filterStore.GetFilters(func(f upkeepFilter) bool { - return f.upkeepID.String() == active.upkeepID.String() - }) - require.Equal(t, 1, len(filters)) - require.Equal(t, tc.lastRePollBlocks[i], filters[0].lastRePollBlock) - } - - proposals, err := recoverer.GetRecoveryProposals(ctx) - require.NoError(t, err) - require.Equal(t, len(tc.proposalsWorkIDs), len(proposals)) - if len(proposals) > 0 { - sort.Slice(proposals, func(i, j int) bool { - return proposals[i].WorkID < proposals[j].WorkID - }) - } - for i := range proposals { - require.Equal(t, tc.proposalsWorkIDs[i], proposals[i].WorkID) - } - }) - } -} - -func TestLogRecoverer_SelectFilterBatch(t *testing.T) { - n := recoveryBatchSize*2 + 2 - filters := []upkeepFilter{} - for i := 0; i < n; i++ { - filters = append(filters, upkeepFilter{ - upkeepID: big.NewInt(int64(i)), - }) - } - recoverer, _, _, _ := setupTestRecoverer(t, time.Millisecond*50, int64(100)) - - batch := recoverer.selectFilterBatch(filters) - require.Equal(t, recoveryBatchSize, len(batch)) - - batch = recoverer.selectFilterBatch(filters[:recoveryBatchSize/2]) - require.Equal(t, recoveryBatchSize/2, len(batch)) -} - -func TestLogRecoverer_getFilterBatch(t *testing.T) { - tests := []struct { - name string - offsetBlock int64 - filters []upkeepFilter - want int - }{ - { - "empty", - 2, - []upkeepFilter{}, - 0, - }, - { - "filter out of range", - 100, - []upkeepFilter{ - { - upkeepID: big.NewInt(1), - addr: common.HexToAddress("0x1").Bytes(), - lastRePollBlock: 50, - }, - { - upkeepID: big.NewInt(2), - addr: common.HexToAddress("0x2").Bytes(), - lastRePollBlock: 50, - configUpdateBlock: 101, // out of range - }, - { - upkeepID: big.NewInt(3), - addr: common.HexToAddress("0x3").Bytes(), - configUpdateBlock: 99, - }, - }, - 2, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - recoverer, filterStore, _, _ := setupTestRecoverer(t, time.Millisecond*50, int64(100)) - filterStore.AddActiveUpkeeps(tc.filters...) - batch := recoverer.getFilterBatch(tc.offsetBlock) - require.Equal(t, tc.want, len(batch)) - }) - } -} - -func TestLogRecoverer_FilterFinalizedStates(t *testing.T) { - tests := []struct { - name string - logs []logpoller.Log - states []ocr2keepers.UpkeepState - want []logpoller.Log - }{ - { - "empty", - []logpoller.Log{}, - []ocr2keepers.UpkeepState{}, - []logpoller.Log{}, - }, - { - "happy flow", - []logpoller.Log{ - {LogIndex: 0}, {LogIndex: 2}, {LogIndex: 2}, - }, - []ocr2keepers.UpkeepState{ - ocr2keepers.UnknownState, - ocr2keepers.Performed, - ocr2keepers.Ineligible, - }, - []logpoller.Log{ - {LogIndex: 0}, - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - recoverer, _, _, _ := setupTestRecoverer(t, time.Millisecond*50, int64(100)) - state := recoverer.filterFinalizedStates(upkeepFilter{}, tc.logs, tc.states) - require.Equal(t, len(tc.want), len(state)) - for i := range state { - require.Equal(t, tc.want[i].LogIndex, state[i].LogIndex) - } - }) - } -} - -func TestLogRecoverer_GetProposalData(t *testing.T) { - for _, tc := range []struct { - name string - proposal ocr2keepers.CoordinatedBlockProposal - skipFilter bool - filterStore UpkeepFilterStore - logPoller logpoller.LogPoller - client client.Client - stateReader core.UpkeepStateReader - wantBytes []byte - expectErr bool - wantErr error - }{ - { - name: "passing an empty proposal with an empty upkeep ID returns an error", - proposal: ocr2keepers.CoordinatedBlockProposal{}, - expectErr: true, - wantErr: errors.New("not a log trigger upkeep ID"), - }, - { - name: "if a filter is not found for the upkeep ID, an error is returned", - proposal: ocr2keepers.CoordinatedBlockProposal{ - UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), - }, - skipFilter: true, - expectErr: true, - wantErr: errors.New("filter not found for upkeep 452312848583266388373324160190187140457511065560374322131410487042692349952"), - }, - { - name: "if an error is encountered fetching the latest block, an error is returned", - proposal: ocr2keepers.CoordinatedBlockProposal{ - UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 0, - }, - }, - }, - filterStore: &mockFilterStore{ - HasFn: func(id *big.Int) bool { - return true - }, - }, - logPoller: &mockLogPoller{ - LatestBlockFn: func(ctx context.Context) (int64, error) { - return 0, errors.New("latest block boom") - }, - }, - expectErr: true, - wantErr: errors.New("latest block boom"), - }, - { - name: "if an error is encountered fetching the tx receipt, an error is returned", - proposal: ocr2keepers.CoordinatedBlockProposal{ - UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 0, - }, - }, - }, - filterStore: &mockFilterStore{ - HasFn: func(id *big.Int) bool { - return true - }, - }, - logPoller: &mockLogPoller{ - LatestBlockFn: func(ctx context.Context) (int64, error) { - return 100, nil - }, - }, - client: &mockClient{ - CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { - return errors.New("tx receipt boom") - }, - }, - expectErr: true, - wantErr: errors.New("tx receipt boom"), - }, - { - name: "if the tx block is nil, an error is returned", - proposal: ocr2keepers.CoordinatedBlockProposal{ - UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 0, - }, - }, - }, - filterStore: &mockFilterStore{ - HasFn: func(id *big.Int) bool { - return true - }, - }, - logPoller: &mockLogPoller{ - LatestBlockFn: func(ctx context.Context) (int64, error) { - return 100, nil - }, - }, - client: &mockClient{ - CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { - return nil - }, - }, - expectErr: true, - wantErr: errors.New("failed to get tx block"), - }, - { - name: "if a log trigger extension block number is 0, and the block number on the tx receipt is not recoverable, an error is returned", - proposal: ocr2keepers.CoordinatedBlockProposal{ - UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 0, - }, - }, - }, - filterStore: &mockFilterStore{ - HasFn: func(id *big.Int) bool { - return true - }, - }, - logPoller: &mockLogPoller{ - LatestBlockFn: func(ctx context.Context) (int64, error) { - return 100, nil - }, - }, - client: &mockClient{ - CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { - receipt.Status = 1 - receipt.BlockNumber = big.NewInt(200) - return nil - }, - }, - expectErr: true, - wantErr: errors.New("log block is not recoverable"), - }, - { - name: "if a log block is not recoverable, an error is returned", - proposal: ocr2keepers.CoordinatedBlockProposal{ - UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 200, - }, - }, - }, - filterStore: &mockFilterStore{ - HasFn: func(id *big.Int) bool { - return true - }, - }, - logPoller: &mockLogPoller{ - LatestBlockFn: func(ctx context.Context) (int64, error) { - return 100, nil - }, - }, - client: &mockClient{ - CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { - receipt.Status = 1 - receipt.BlockNumber = big.NewInt(200) - return nil - }, - }, - expectErr: true, - wantErr: errors.New("log block is not recoverable"), - }, - { - name: "if a log block has does not match, an error is returned", - proposal: ocr2keepers.CoordinatedBlockProposal{ - UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 200, - BlockHash: common.HexToHash("0x2"), - }, - }, - }, - filterStore: &mockFilterStore{ - HasFn: func(id *big.Int) bool { - return true - }, - }, - logPoller: &mockLogPoller{ - LatestBlockFn: func(ctx context.Context) (int64, error) { - return 100, nil - }, - }, - client: &mockClient{ - CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { - receipt.Status = 1 - receipt.BlockNumber = big.NewInt(200) - receipt.BlockHash = common.HexToHash("0x1") - return nil - }, - }, - expectErr: true, - wantErr: errors.New("log tx reorged"), - }, - { - name: "if a log block is recoverable, when the upkeep state reader errors, an error is returned", - proposal: ocr2keepers.CoordinatedBlockProposal{ - UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 80, - }, - }, - }, - filterStore: &mockFilterStore{ - HasFn: func(id *big.Int) bool { - return true - }, - }, - logPoller: &mockLogPoller{ - LatestBlockFn: func(ctx context.Context) (int64, error) { - return 300, nil - }, - }, - stateReader: &mockStateReader{ - SelectByWorkIDsFn: func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { - return nil, errors.New("upkeep state boom") - }, - }, - client: &mockClient{ - CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { - receipt.Status = 1 - receipt.BlockNumber = big.NewInt(80) - return nil - }, - }, - expectErr: true, - wantErr: errors.New("upkeep state boom"), - }, - { - name: "if a log block is recoverable, when the upkeep state reader returns a non recoverable state, an error is returned", - proposal: ocr2keepers.CoordinatedBlockProposal{ - UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 80, - }, - }, - }, - filterStore: &mockFilterStore{ - HasFn: func(id *big.Int) bool { - return true - }, - }, - logPoller: &mockLogPoller{ - LatestBlockFn: func(ctx context.Context) (int64, error) { - return 300, nil - }, - }, - client: &mockClient{ - CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { - receipt.Status = 1 - receipt.BlockNumber = big.NewInt(80) - return nil - }, - }, - stateReader: &mockStateReader{ - SelectByWorkIDsFn: func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { - return []ocr2keepers.UpkeepState{ - ocr2keepers.Ineligible, - }, nil - }, - }, - expectErr: true, - wantErr: errors.New("upkeep state is not recoverable"), - }, - { - name: "if a log block is recoverable, when the filter address is empty, an error is returned", - proposal: ocr2keepers.CoordinatedBlockProposal{ - UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 80, - }, - }, - }, - filterStore: &mockFilterStore{ - HasFn: func(id *big.Int) bool { - return true - }, - RangeFiltersByIDsFn: func(iterator func(int, upkeepFilter), ids ...*big.Int) { - - }, - }, - logPoller: &mockLogPoller{ - LatestBlockFn: func(ctx context.Context) (int64, error) { - return 300, nil - }, - }, - client: &mockClient{ - CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { - receipt.Status = 1 - receipt.BlockNumber = big.NewInt(80) - return nil - }, - }, - stateReader: &mockStateReader{ - SelectByWorkIDsFn: func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { - return []ocr2keepers.UpkeepState{ - ocr2keepers.UnknownState, - }, nil - }, - }, - expectErr: true, - wantErr: errors.New("invalid filter found for upkeepID 452312848583266388373324160190187140457511065560374322131410487042692349952"), - }, - { - name: "if a log block is recoverable, when the log poller returns an error fetching logs, an error is returned", - proposal: ocr2keepers.CoordinatedBlockProposal{ - UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 80, - }, - }, - }, - logPoller: &mockLogPoller{ - LatestBlockFn: func(ctx context.Context) (int64, error) { - return 300, nil - }, - LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - return nil, errors.New("logs with sigs boom") - }, - }, - client: &mockClient{ - CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { - receipt.Status = 1 - receipt.BlockNumber = big.NewInt(80) - return nil - }, - }, - stateReader: &mockStateReader{ - SelectByWorkIDsFn: func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { - return []ocr2keepers.UpkeepState{ - ocr2keepers.UnknownState, - }, nil - }, - }, - expectErr: true, - wantErr: errors.New("could not read logs: logs with sigs boom"), - }, - { - name: "if a log block is recoverable, when logs cannot be found for an upkeep ID, an error is returned", - proposal: ocr2keepers.CoordinatedBlockProposal{ - UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), - Trigger: ocr2keepers.Trigger{ - LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ - BlockNumber: 80, - }, - }, - }, - logPoller: &mockLogPoller{ - LatestBlockFn: func(ctx context.Context) (int64, error) { - return 300, nil - }, - LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - return []logpoller.Log{ - { - BlockNumber: 80, - }, - }, nil - }, - }, - client: &mockClient{ - CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { - receipt.Status = 1 - receipt.BlockNumber = big.NewInt(80) - return nil - }, - }, - stateReader: &mockStateReader{ - SelectByWorkIDsFn: func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { - return []ocr2keepers.UpkeepState{ - ocr2keepers.UnknownState, - }, nil - }, - }, - expectErr: true, - wantErr: errors.New(`no log found for upkeepID 452312848583266388373324160190187140457511065560374322131410487042692349952 and trigger {"BlockNumber":0,"BlockHash":"0000000000000000000000000000000000000000000000000000000000000000","LogTriggerExtension":{"BlockHash":"0000000000000000000000000000000000000000000000000000000000000000","BlockNumber":80,"Index":0,"TxHash":"0000000000000000000000000000000000000000000000000000000000000000"}}`), - }, - { - name: "happy path with empty check data", - proposal: ocr2keepers.CoordinatedBlockProposal{ - UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), - Trigger: func() ocr2keepers.Trigger { - t := ocr2keepers.NewTrigger( - ocr2keepers.BlockNumber(80), - [32]byte{1}, - ) - t.LogTriggerExtension = &ocr2keepers.LogTriggerExtension{ - TxHash: [32]byte{2}, - Index: uint32(3), - BlockHash: [32]byte{1}, - BlockNumber: ocr2keepers.BlockNumber(80), - } - return t - }(), - WorkID: "7f775793422d178c90e99c3bbdf05181bc6bb6ce13170e87c92ac396bb7ddda0", - }, - logPoller: &mockLogPoller{ - LatestBlockFn: func(ctx context.Context) (int64, error) { - return 300, nil - }, - LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - return []logpoller.Log{ - { - BlockNumber: 80, - BlockHash: [32]byte{1}, - TxHash: [32]byte{2}, - LogIndex: 3, - }, - }, nil - }, - }, - client: &mockClient{ - CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { - receipt.Status = 1 - receipt.BlockNumber = big.NewInt(80) - receipt.BlockHash = [32]byte{1} - return nil - }, - }, - stateReader: &mockStateReader{ - SelectByWorkIDsFn: func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { - return []ocr2keepers.UpkeepState{ - ocr2keepers.UnknownState, - }, nil - }, - }, - wantBytes: []byte(nil), - }, - { - name: "happy path with check data", - proposal: ocr2keepers.CoordinatedBlockProposal{ - UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), - Trigger: func() ocr2keepers.Trigger { - t := ocr2keepers.NewTrigger( - ocr2keepers.BlockNumber(80), - [32]byte{1}, - ) - t.LogTriggerExtension = &ocr2keepers.LogTriggerExtension{ - TxHash: [32]byte{2}, - Index: uint32(3), - BlockHash: [32]byte{1}, - BlockNumber: ocr2keepers.BlockNumber(80), - } - return t - }(), - WorkID: "7f775793422d178c90e99c3bbdf05181bc6bb6ce13170e87c92ac396bb7ddda0", - }, - logPoller: &mockLogPoller{ - LatestBlockFn: func(ctx context.Context) (int64, error) { - return 300, nil - }, - LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { - return []logpoller.Log{ - { - EvmChainId: ubig.New(big.NewInt(1)), - LogIndex: 3, - BlockHash: [32]byte{1}, - BlockNumber: 80, - BlockTimestamp: time.Date(2022, 1, 1, 1, 1, 1, 1, time.UTC), - EventSig: common.HexToHash("abc"), - TxHash: [32]byte{2}, - Data: []byte{1, 2, 3}, - CreatedAt: time.Date(2022, 1, 1, 1, 1, 1, 1, time.UTC), - }, - }, nil - }, - }, - client: &mockClient{ - CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { - receipt.Status = 1 - receipt.BlockNumber = big.NewInt(80) - receipt.BlockHash = [32]byte{1} - return nil - }, - }, - stateReader: &mockStateReader{ - SelectByWorkIDsFn: func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { - return []ocr2keepers.UpkeepState{ - ocr2keepers.UnknownState, - }, nil - }, - }, - wantBytes: []byte{1, 2, 3}, - }, - } { - t.Run(tc.name, func(t *testing.T) { - recoverer, filterStore, _, _ := setupTestRecoverer(t, time.Second, 10) - - if !tc.skipFilter { - filterStore.AddActiveUpkeeps(upkeepFilter{ - addr: []byte("test"), - topics: []common.Hash{common.HexToHash("0x1"), common.HexToHash("0x2"), common.HexToHash("0x3"), common.HexToHash("0x4")}, - upkeepID: core.GenUpkeepID(types2.LogTrigger, "123").BigInt(), - }) - } - - if tc.filterStore != nil { - recoverer.filterStore = tc.filterStore - } - if tc.logPoller != nil { - recoverer.poller = tc.logPoller - } - if tc.client != nil { - recoverer.client = tc.client - } - if tc.stateReader != nil { - recoverer.states = tc.stateReader - } - - b, err := recoverer.GetProposalData(testutils.Context(t), tc.proposal) - if tc.expectErr { - assert.Error(t, err) - assert.Equal(t, tc.wantErr.Error(), err.Error()) - } else { - assert.NoError(t, err) - assert.Equal(t, tc.wantBytes, b) - } - }) - } -} - -func TestLogRecoverer_pending(t *testing.T) { - tests := []struct { - name string - maxPerUpkeep int - exist []ocr2keepers.UpkeepPayload - new []ocr2keepers.UpkeepPayload - errored []bool - want []ocr2keepers.UpkeepPayload - }{ - { - name: "empty", - maxPerUpkeep: 10, - exist: []ocr2keepers.UpkeepPayload{}, - new: []ocr2keepers.UpkeepPayload{}, - errored: []bool{}, - want: []ocr2keepers.UpkeepPayload{}, - }, - { - name: "add new and existing", - maxPerUpkeep: 10, - exist: []ocr2keepers.UpkeepPayload{ - {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - }, - new: []ocr2keepers.UpkeepPayload{ - {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, - }, - errored: []bool{false, false}, - want: []ocr2keepers.UpkeepPayload{ - {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, - }, - }, - { - name: "exceed limits for upkeep", - maxPerUpkeep: 3, - exist: []ocr2keepers.UpkeepPayload{ - {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "3", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - }, - new: []ocr2keepers.UpkeepPayload{ - {WorkID: "4", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - }, - errored: []bool{true}, - want: []ocr2keepers.UpkeepPayload{ - {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - {WorkID: "3", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - origMaxPendingPayloadsPerUpkeep := maxPendingPayloadsPerUpkeep - maxPendingPayloadsPerUpkeep = tc.maxPerUpkeep - defer func() { - maxPendingPayloadsPerUpkeep = origMaxPendingPayloadsPerUpkeep - }() - - r := NewLogRecoverer(logger.TestLogger(t), nil, nil, nil, nil, nil, NewOptions(200, big.NewInt(1))) - r.lock.Lock() - r.pending = tc.exist - for i, p := range tc.new { - err := r.addPending(p) - if tc.errored[i] { - require.Error(t, err) - continue - } - require.NoError(t, err) - } - pending := r.pending - require.GreaterOrEqual(t, len(pending), len(tc.new)) - require.Equal(t, len(tc.want), len(pending)) - sort.Slice(pending, func(i, j int) bool { - return pending[i].WorkID < pending[j].WorkID - }) - for i := range pending { - require.Equal(t, tc.want[i].WorkID, pending[i].WorkID) - } - r.lock.Unlock() - for _, p := range tc.want { - r.removePending(p.WorkID) - } - r.lock.Lock() - defer r.lock.Unlock() - require.Equal(t, 0, len(r.pending)) - }) - } -} - -type mockFilterStore struct { - UpkeepFilterStore - HasFn func(id *big.Int) bool - RangeFiltersByIDsFn func(iterator func(int, upkeepFilter), ids ...*big.Int) -} - -func (s *mockFilterStore) RangeFiltersByIDs(iterator func(int, upkeepFilter), ids ...*big.Int) { - s.RangeFiltersByIDsFn(iterator, ids...) -} - -func (s *mockFilterStore) Has(id *big.Int) bool { - return s.HasFn(id) -} +// +//func TestLogRecoverer_GetRecoverables(t *testing.T) { +// ctx := testutils.Context(t) +// lp := &lpmocks.LogPoller{} +// lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: 100}, nil) +// r := NewLogRecoverer(logger.TestLogger(t), lp, nil, nil, nil, nil, NewOptions(200, big.NewInt(1))) +// +// tests := []struct { +// name string +// pending map[string]ocr2keepers.UpkeepPayload +// want map[string]ocr2keepers.UpkeepPayload +// wantErr bool +// }{ +// { +// "empty", +// map[string]ocr2keepers.UpkeepPayload{}, +// map[string]ocr2keepers.UpkeepPayload{}, +// false, +// }, +// { +// "happy flow", +// map[string]ocr2keepers.UpkeepPayload{ +// "1": {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// "2": {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, +// }, +// map[string]ocr2keepers.UpkeepPayload{ +// "1": {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// "2": {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, +// }, +// false, +// }, +// { +// "rate limiting", +// map[string]ocr2keepers.UpkeepPayload{ +// "1": {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// "2": {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// "3": {WorkID: "3", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// "4": {WorkID: "4", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// "5": {WorkID: "5", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// "6": {WorkID: "6", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// "2": {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, +// }, +// []ocr2keepers.UpkeepPayload{ +// {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// {WorkID: "3", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// {WorkID: "4", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// {WorkID: "5", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, +// }, +// false, +// }, +// } +// +// for _, tc := range tests { +// t.Run(tc.name, func(t *testing.T) { +// r.lock.Lock() +// r.pending = tc.pending +// r.lock.Unlock() +// +// got, err := r.GetRecoveryProposals(ctx) +// if tc.wantErr { +// require.Error(t, err) +// } else { +// require.NoError(t, err) +// } +// require.Len(t, got, len(tc.want)) +// }) +// } +//} +// +//func TestLogRecoverer_Clean(t *testing.T) { +// oldLogsOffset := int64(20) +// +// tests := []struct { +// name string +// pending []ocr2keepers.UpkeepPayload +// visited map[string]visitedRecord +// states []ocr2keepers.UpkeepState +// wantPending []ocr2keepers.UpkeepPayload +// wantVisited []string +// }{ +// { +// "empty", +// []ocr2keepers.UpkeepPayload{}, +// map[string]visitedRecord{}, +// []ocr2keepers.UpkeepState{}, +// []ocr2keepers.UpkeepPayload{}, +// []string{}, +// }, +// { +// "clean expired", +// []ocr2keepers.UpkeepPayload{ +// {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, +// {WorkID: "3", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "3")}, +// }, +// map[string]visitedRecord{ +// "1": visitedRecord{time.Now(), ocr2keepers.UpkeepPayload{ +// WorkID: "1", +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset * 2), +// }, +// }, +// }}, +// "2": visitedRecord{time.Now(), ocr2keepers.UpkeepPayload{ +// WorkID: "2", +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset * 2), +// }, +// }, +// }}, +// "3": visitedRecord{time.Now().Add(-time.Hour), ocr2keepers.UpkeepPayload{ +// WorkID: "3", +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset - 10), +// }, +// }, +// }}, +// "4": visitedRecord{time.Now().Add(-time.Hour), ocr2keepers.UpkeepPayload{ +// WorkID: "4", +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset + 10), +// }, +// }, +// }}, +// }, +// []ocr2keepers.UpkeepState{ +// ocr2keepers.UnknownState, +// ocr2keepers.UnknownState, +// }, +// []ocr2keepers.UpkeepPayload{ +// {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, +// {WorkID: "4", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "4")}, +// }, +// []string{"1", "2", "4"}, +// }, +// } +// +// for _, tc := range tests { +// t.Run(tc.name, func(t *testing.T) { +// ctx, cancel := context.WithCancel(testutils.Context(t)) +// defer cancel() +// +// lookbackBlocks := int64(100) +// r, _, lp, statesReader := setupTestRecoverer(t, time.Millisecond*50, lookbackBlocks) +// start, _ := r.getRecoveryWindow(0) +// block24h := int64(math.Abs(float64(start))) +// +// lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: block24h + oldLogsOffset}, nil) +// statesReader.On("SelectByWorkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.states, nil) +// +// r.lock.Lock() +// r.pending = tc.pending +// r.queuedForRecovery = tc.visited +// r.lock.Unlock() +// +// r.clean(ctx) +// +// r.lock.RLock() +// defer r.lock.RUnlock() +// +// pending := r.pending +// require.Equal(t, len(tc.wantPending), len(pending)) +// sort.Slice(pending, func(i, j int) bool { +// return pending[i].WorkID < pending[j].WorkID +// }) +// for i := range pending { +// require.Equal(t, tc.wantPending[i].WorkID, pending[i].WorkID) +// } +// require.Equal(t, len(tc.wantVisited), len(r.queuedForRecovery)) +// for _, id := range tc.wantVisited { +// _, ok := r.queuedForRecovery[id] +// require.True(t, ok) +// } +// }) +// } +//} +// +//func TestLogRecoverer_Recover(t *testing.T) { +// ctx := testutils.Context(t) +// +// tests := []struct { +// name string +// lookbackBlocks int64 +// latestBlock int64 +// latestBlockErr error +// active []upkeepFilter +// states []ocr2keepers.UpkeepState +// statesErr error +// logs []logpoller.Log +// logsErr error +// recoverErr error +// proposalsWorkIDs []string +// lastRePollBlocks []int64 +// }{ +// { +// "no filters", +// 200, +// 300, +// nil, +// []upkeepFilter{}, +// []ocr2keepers.UpkeepState{}, +// nil, +// []logpoller.Log{}, +// nil, +// nil, +// []string{}, +// []int64{}, +// }, +// { +// "latest block error", +// 200, +// 0, +// fmt.Errorf("test error"), +// []upkeepFilter{}, +// []ocr2keepers.UpkeepState{}, +// nil, +// []logpoller.Log{}, +// nil, +// fmt.Errorf("test error"), +// []string{}, +// []int64{}, +// }, +// { +// "states error", +// 100, +// 200, +// nil, +// []upkeepFilter{ +// { +// upkeepID: big.NewInt(1), +// addr: common.HexToAddress("0x1").Bytes(), +// topics: []common.Hash{ +// common.HexToHash("0x1"), +// }, +// }, +// }, +// nil, +// fmt.Errorf("test error"), +// []logpoller.Log{ +// { +// BlockNumber: 2, +// TxHash: common.HexToHash("0x111"), +// LogIndex: 1, +// BlockHash: common.HexToHash("0x2"), +// }, +// }, +// nil, +// nil, +// []string{}, +// []int64{0}, +// }, +// { +// "get logs error", +// 200, +// 300, +// nil, +// []upkeepFilter{ +// { +// upkeepID: big.NewInt(1), +// addr: common.HexToAddress("0x1").Bytes(), +// topics: []common.Hash{ +// common.HexToHash("0x1"), +// }, +// }, +// }, +// []ocr2keepers.UpkeepState{}, +// nil, +// []logpoller.Log{}, +// fmt.Errorf("test error"), +// nil, +// []string{}, +// []int64{0}, +// }, +// { +// "happy flow", +// 100, +// 500, +// nil, +// []upkeepFilter{ +// { +// upkeepID: big.NewInt(1), +// addr: common.HexToAddress("0x1").Bytes(), +// topics: []common.Hash{ +// common.HexToHash("0x1"), +// }, +// }, +// { +// upkeepID: big.NewInt(2), +// addr: common.HexToAddress("0x2").Bytes(), +// topics: []common.Hash{ +// common.HexToHash("0x2"), +// }, +// configUpdateBlock: 450, // should be filtered out +// }, +// { +// upkeepID: big.NewInt(3), +// addr: common.HexToAddress("0x2").Bytes(), +// topics: []common.Hash{ +// common.HexToHash("0x2"), +// }, +// lastRePollBlock: 450, // should be filtered out, as its higher than latest-lookback +// }, +// }, +// []ocr2keepers.UpkeepState{ocr2keepers.UnknownState}, +// nil, +// []logpoller.Log{ +// { +// BlockNumber: 2, +// TxHash: common.HexToHash("0x111"), +// LogIndex: 1, +// BlockHash: common.HexToHash("0x2"), +// }, +// }, +// nil, +// nil, +// []string{"c207451fa897f9bb13b09d54d8655edf0644e027c53521b4a92eafbb64ba4d14"}, +// []int64{201, 0, 450}, +// }, +// { +// "lastRePollBlock updated with burst when lagging behind", +// 100, +// 50000, +// nil, +// []upkeepFilter{ +// { +// upkeepID: big.NewInt(1), +// addr: common.HexToAddress("0x1").Bytes(), +// topics: []common.Hash{ +// common.HexToHash("0x1"), +// }, +// lastRePollBlock: 99, // Should be updated with burst +// }, +// }, +// []ocr2keepers.UpkeepState{ocr2keepers.UnknownState}, +// nil, +// []logpoller.Log{ +// { +// BlockNumber: 2, +// TxHash: common.HexToHash("0x111"), +// LogIndex: 1, +// BlockHash: common.HexToHash("0x2"), +// }, +// }, +// nil, +// nil, +// []string{"c207451fa897f9bb13b09d54d8655edf0644e027c53521b4a92eafbb64ba4d14"}, +// []int64{600}, +// }, +// { +// "recovery starts at configUpdateBlock if higher than lastRePollBlock", +// 100, +// 5000, +// nil, +// []upkeepFilter{ +// { +// upkeepID: big.NewInt(1), +// addr: common.HexToAddress("0x1").Bytes(), +// topics: []common.Hash{ +// common.HexToHash("0x1"), +// }, +// lastRePollBlock: 100, +// configUpdateBlock: 500, +// }, +// }, +// []ocr2keepers.UpkeepState{ocr2keepers.UnknownState}, +// nil, +// []logpoller.Log{ +// { +// BlockNumber: 2, +// TxHash: common.HexToHash("0x111"), +// LogIndex: 1, +// BlockHash: common.HexToHash("0x2"), +// }, +// }, +// nil, +// nil, +// []string{"c207451fa897f9bb13b09d54d8655edf0644e027c53521b4a92eafbb64ba4d14"}, +// []int64{700}, // should be configUpdateBlock + recoveryLogsBuffer +// }, +// } +// +// for _, tc := range tests { +// t.Run(tc.name, func(t *testing.T) { +// lookbackBlocks := int64(100) +// recoverer, filterStore, lp, statesReader := setupTestRecoverer(t, time.Millisecond*50, lookbackBlocks) +// +// filterStore.AddActiveUpkeeps(tc.active...) +// lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: tc.latestBlock}, tc.latestBlockErr) +// lp.On("LogsWithSigs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.logs, tc.logsErr) +// statesReader.On("SelectByWorkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.states, tc.statesErr) +// +// err := recoverer.recover(ctx) +// if tc.recoverErr != nil { +// require.Error(t, err) +// return +// } +// require.NoError(t, err) +// for i, active := range tc.active { +// filters := filterStore.GetFilters(func(f upkeepFilter) bool { +// return f.upkeepID.String() == active.upkeepID.String() +// }) +// require.Equal(t, 1, len(filters)) +// require.Equal(t, tc.lastRePollBlocks[i], filters[0].lastRePollBlock) +// } +// +// proposals, err := recoverer.GetRecoveryProposals(ctx) +// require.NoError(t, err) +// require.Equal(t, len(tc.proposalsWorkIDs), len(proposals)) +// if len(proposals) > 0 { +// sort.Slice(proposals, func(i, j int) bool { +// return proposals[i].WorkID < proposals[j].WorkID +// }) +// } +// for i := range proposals { +// require.Equal(t, tc.proposalsWorkIDs[i], proposals[i].WorkID) +// } +// }) +// } +//} +// +//func TestLogRecoverer_SelectFilterBatch(t *testing.T) { +// n := recoveryBatchSize*2 + 2 +// filters := []upkeepFilter{} +// for i := 0; i < n; i++ { +// filters = append(filters, upkeepFilter{ +// upkeepID: big.NewInt(int64(i)), +// }) +// } +// recoverer, _, _, _ := setupTestRecoverer(t, time.Millisecond*50, int64(100)) +// +// batch := recoverer.selectFilterBatch(filters) +// require.Equal(t, recoveryBatchSize, len(batch)) +// +// batch = recoverer.selectFilterBatch(filters[:recoveryBatchSize/2]) +// require.Equal(t, recoveryBatchSize/2, len(batch)) +//} +// +//func TestLogRecoverer_getFilterBatch(t *testing.T) { +// tests := []struct { +// name string +// offsetBlock int64 +// filters []upkeepFilter +// want int +// }{ +// { +// "empty", +// 2, +// []upkeepFilter{}, +// 0, +// }, +// { +// "filter out of range", +// 100, +// []upkeepFilter{ +// { +// upkeepID: big.NewInt(1), +// addr: common.HexToAddress("0x1").Bytes(), +// lastRePollBlock: 50, +// }, +// { +// upkeepID: big.NewInt(2), +// addr: common.HexToAddress("0x2").Bytes(), +// lastRePollBlock: 50, +// configUpdateBlock: 101, // out of range +// }, +// { +// upkeepID: big.NewInt(3), +// addr: common.HexToAddress("0x3").Bytes(), +// configUpdateBlock: 99, +// }, +// }, +// 2, +// }, +// } +// +// for _, tc := range tests { +// t.Run(tc.name, func(t *testing.T) { +// recoverer, filterStore, _, _ := setupTestRecoverer(t, time.Millisecond*50, int64(100)) +// filterStore.AddActiveUpkeeps(tc.filters...) +// batch := recoverer.getFilterBatch(tc.offsetBlock) +// require.Equal(t, tc.want, len(batch)) +// }) +// } +//} +// +//func TestLogRecoverer_FilterFinalizedStates(t *testing.T) { +// tests := []struct { +// name string +// logs []logpoller.Log +// states []ocr2keepers.UpkeepState +// want []logpoller.Log +// }{ +// { +// "empty", +// []logpoller.Log{}, +// []ocr2keepers.UpkeepState{}, +// []logpoller.Log{}, +// }, +// { +// "happy flow", +// []logpoller.Log{ +// {LogIndex: 0}, {LogIndex: 2}, {LogIndex: 2}, +// }, +// []ocr2keepers.UpkeepState{ +// ocr2keepers.UnknownState, +// ocr2keepers.Performed, +// ocr2keepers.Ineligible, +// }, +// []logpoller.Log{ +// {LogIndex: 0}, +// }, +// }, +// } +// +// for _, tc := range tests { +// t.Run(tc.name, func(t *testing.T) { +// recoverer, _, _, _ := setupTestRecoverer(t, time.Millisecond*50, int64(100)) +// state := recoverer.filterFinalizedStates(upkeepFilter{}, tc.logs, tc.states) +// require.Equal(t, len(tc.want), len(state)) +// for i := range state { +// require.Equal(t, tc.want[i].LogIndex, state[i].LogIndex) +// } +// }) +// } +//} +// +//func TestLogRecoverer_GetProposalData(t *testing.T) { +// for _, tc := range []struct { +// name string +// proposal ocr2keepers.CoordinatedBlockProposal +// skipFilter bool +// filterStore UpkeepFilterStore +// logPoller logpoller.LogPoller +// client client.Client +// stateReader core.UpkeepStateReader +// wantBytes []byte +// expectErr bool +// wantErr error +// }{ +// { +// name: "passing an empty proposal with an empty upkeep ID returns an error", +// proposal: ocr2keepers.CoordinatedBlockProposal{}, +// expectErr: true, +// wantErr: errors.New("not a log trigger upkeep ID"), +// }, +// { +// name: "if a filter is not found for the upkeep ID, an error is returned", +// proposal: ocr2keepers.CoordinatedBlockProposal{ +// UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), +// }, +// skipFilter: true, +// expectErr: true, +// wantErr: errors.New("filter not found for upkeep 452312848583266388373324160190187140457511065560374322131410487042692349952"), +// }, +// { +// name: "if an error is encountered fetching the latest block, an error is returned", +// proposal: ocr2keepers.CoordinatedBlockProposal{ +// UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: 0, +// }, +// }, +// }, +// filterStore: &mockFilterStore{ +// HasFn: func(id *big.Int) bool { +// return true +// }, +// }, +// logPoller: &mockLogPoller{ +// LatestBlockFn: func(ctx context.Context) (int64, error) { +// return 0, errors.New("latest block boom") +// }, +// }, +// expectErr: true, +// wantErr: errors.New("latest block boom"), +// }, +// { +// name: "if an error is encountered fetching the tx receipt, an error is returned", +// proposal: ocr2keepers.CoordinatedBlockProposal{ +// UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: 0, +// }, +// }, +// }, +// filterStore: &mockFilterStore{ +// HasFn: func(id *big.Int) bool { +// return true +// }, +// }, +// logPoller: &mockLogPoller{ +// LatestBlockFn: func(ctx context.Context) (int64, error) { +// return 100, nil +// }, +// }, +// client: &mockClient{ +// CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { +// return errors.New("tx receipt boom") +// }, +// }, +// expectErr: true, +// wantErr: errors.New("tx receipt boom"), +// }, +// { +// name: "if the tx block is nil, an error is returned", +// proposal: ocr2keepers.CoordinatedBlockProposal{ +// UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: 0, +// }, +// }, +// }, +// filterStore: &mockFilterStore{ +// HasFn: func(id *big.Int) bool { +// return true +// }, +// }, +// logPoller: &mockLogPoller{ +// LatestBlockFn: func(ctx context.Context) (int64, error) { +// return 100, nil +// }, +// }, +// client: &mockClient{ +// CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { +// return nil +// }, +// }, +// expectErr: true, +// wantErr: errors.New("failed to get tx block"), +// }, +// { +// name: "if a log trigger extension block number is 0, and the block number on the tx receipt is not recoverable, an error is returned", +// proposal: ocr2keepers.CoordinatedBlockProposal{ +// UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: 0, +// }, +// }, +// }, +// filterStore: &mockFilterStore{ +// HasFn: func(id *big.Int) bool { +// return true +// }, +// }, +// logPoller: &mockLogPoller{ +// LatestBlockFn: func(ctx context.Context) (int64, error) { +// return 100, nil +// }, +// }, +// client: &mockClient{ +// CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { +// receipt.Status = 1 +// receipt.BlockNumber = big.NewInt(200) +// return nil +// }, +// }, +// expectErr: true, +// wantErr: errors.New("log block is not recoverable"), +// }, +// { +// name: "if a log block is not recoverable, an error is returned", +// proposal: ocr2keepers.CoordinatedBlockProposal{ +// UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: 200, +// }, +// }, +// }, +// filterStore: &mockFilterStore{ +// HasFn: func(id *big.Int) bool { +// return true +// }, +// }, +// logPoller: &mockLogPoller{ +// LatestBlockFn: func(ctx context.Context) (int64, error) { +// return 100, nil +// }, +// }, +// client: &mockClient{ +// CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { +// receipt.Status = 1 +// receipt.BlockNumber = big.NewInt(200) +// return nil +// }, +// }, +// expectErr: true, +// wantErr: errors.New("log block is not recoverable"), +// }, +// { +// name: "if a log block has does not match, an error is returned", +// proposal: ocr2keepers.CoordinatedBlockProposal{ +// UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: 200, +// BlockHash: common.HexToHash("0x2"), +// }, +// }, +// }, +// filterStore: &mockFilterStore{ +// HasFn: func(id *big.Int) bool { +// return true +// }, +// }, +// logPoller: &mockLogPoller{ +// LatestBlockFn: func(ctx context.Context) (int64, error) { +// return 100, nil +// }, +// }, +// client: &mockClient{ +// CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { +// receipt.Status = 1 +// receipt.BlockNumber = big.NewInt(200) +// receipt.BlockHash = common.HexToHash("0x1") +// return nil +// }, +// }, +// expectErr: true, +// wantErr: errors.New("log tx reorged"), +// }, +// { +// name: "if a log block is recoverable, when the upkeep state reader errors, an error is returned", +// proposal: ocr2keepers.CoordinatedBlockProposal{ +// UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: 80, +// }, +// }, +// }, +// filterStore: &mockFilterStore{ +// HasFn: func(id *big.Int) bool { +// return true +// }, +// }, +// logPoller: &mockLogPoller{ +// LatestBlockFn: func(ctx context.Context) (int64, error) { +// return 300, nil +// }, +// }, +// stateReader: &mockStateReader{ +// SelectByWorkIDsFn: func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { +// return nil, errors.New("upkeep state boom") +// }, +// }, +// client: &mockClient{ +// CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { +// receipt.Status = 1 +// receipt.BlockNumber = big.NewInt(80) +// return nil +// }, +// }, +// expectErr: true, +// wantErr: errors.New("upkeep state boom"), +// }, +// { +// name: "if a log block is recoverable, when the upkeep state reader returns a non recoverable state, an error is returned", +// proposal: ocr2keepers.CoordinatedBlockProposal{ +// UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: 80, +// }, +// }, +// }, +// filterStore: &mockFilterStore{ +// HasFn: func(id *big.Int) bool { +// return true +// }, +// }, +// logPoller: &mockLogPoller{ +// LatestBlockFn: func(ctx context.Context) (int64, error) { +// return 300, nil +// }, +// }, +// client: &mockClient{ +// CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { +// receipt.Status = 1 +// receipt.BlockNumber = big.NewInt(80) +// return nil +// }, +// }, +// stateReader: &mockStateReader{ +// SelectByWorkIDsFn: func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { +// return []ocr2keepers.UpkeepState{ +// ocr2keepers.Ineligible, +// }, nil +// }, +// }, +// expectErr: true, +// wantErr: errors.New("upkeep state is not recoverable"), +// }, +// { +// name: "if a log block is recoverable, when the filter address is empty, an error is returned", +// proposal: ocr2keepers.CoordinatedBlockProposal{ +// UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: 80, +// }, +// }, +// }, +// filterStore: &mockFilterStore{ +// HasFn: func(id *big.Int) bool { +// return true +// }, +// RangeFiltersByIDsFn: func(iterator func(int, upkeepFilter), ids ...*big.Int) { +// +// }, +// }, +// logPoller: &mockLogPoller{ +// LatestBlockFn: func(ctx context.Context) (int64, error) { +// return 300, nil +// }, +// }, +// client: &mockClient{ +// CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { +// receipt.Status = 1 +// receipt.BlockNumber = big.NewInt(80) +// return nil +// }, +// }, +// stateReader: &mockStateReader{ +// SelectByWorkIDsFn: func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { +// return []ocr2keepers.UpkeepState{ +// ocr2keepers.UnknownState, +// }, nil +// }, +// }, +// expectErr: true, +// wantErr: errors.New("invalid filter found for upkeepID 452312848583266388373324160190187140457511065560374322131410487042692349952"), +// }, +// { +// name: "if a log block is recoverable, when the log poller returns an error fetching logs, an error is returned", +// proposal: ocr2keepers.CoordinatedBlockProposal{ +// UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: 80, +// }, +// }, +// }, +// logPoller: &mockLogPoller{ +// LatestBlockFn: func(ctx context.Context) (int64, error) { +// return 300, nil +// }, +// LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { +// return nil, errors.New("logs with sigs boom") +// }, +// }, +// client: &mockClient{ +// CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { +// receipt.Status = 1 +// receipt.BlockNumber = big.NewInt(80) +// return nil +// }, +// }, +// stateReader: &mockStateReader{ +// SelectByWorkIDsFn: func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { +// return []ocr2keepers.UpkeepState{ +// ocr2keepers.UnknownState, +// }, nil +// }, +// }, +// expectErr: true, +// wantErr: errors.New("could not read logs: logs with sigs boom"), +// }, +// { +// name: "if a log block is recoverable, when logs cannot be found for an upkeep ID, an error is returned", +// proposal: ocr2keepers.CoordinatedBlockProposal{ +// UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), +// Trigger: ocr2keepers.Trigger{ +// LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ +// BlockNumber: 80, +// }, +// }, +// }, +// logPoller: &mockLogPoller{ +// LatestBlockFn: func(ctx context.Context) (int64, error) { +// return 300, nil +// }, +// LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { +// return []logpoller.Log{ +// { +// BlockNumber: 80, +// }, +// }, nil +// }, +// }, +// client: &mockClient{ +// CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { +// receipt.Status = 1 +// receipt.BlockNumber = big.NewInt(80) +// return nil +// }, +// }, +// stateReader: &mockStateReader{ +// SelectByWorkIDsFn: func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { +// return []ocr2keepers.UpkeepState{ +// ocr2keepers.UnknownState, +// }, nil +// }, +// }, +// expectErr: true, +// wantErr: errors.New(`no log found for upkeepID 452312848583266388373324160190187140457511065560374322131410487042692349952 and trigger {"BlockNumber":0,"BlockHash":"0000000000000000000000000000000000000000000000000000000000000000","LogTriggerExtension":{"BlockHash":"0000000000000000000000000000000000000000000000000000000000000000","BlockNumber":80,"Index":0,"TxHash":"0000000000000000000000000000000000000000000000000000000000000000"}}`), +// }, +// { +// name: "happy path with empty check data", +// proposal: ocr2keepers.CoordinatedBlockProposal{ +// UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), +// Trigger: func() ocr2keepers.Trigger { +// t := ocr2keepers.NewTrigger( +// ocr2keepers.BlockNumber(80), +// [32]byte{1}, +// ) +// t.LogTriggerExtension = &ocr2keepers.LogTriggerExtension{ +// TxHash: [32]byte{2}, +// Index: uint32(3), +// BlockHash: [32]byte{1}, +// BlockNumber: ocr2keepers.BlockNumber(80), +// } +// return t +// }(), +// WorkID: "7f775793422d178c90e99c3bbdf05181bc6bb6ce13170e87c92ac396bb7ddda0", +// }, +// logPoller: &mockLogPoller{ +// LatestBlockFn: func(ctx context.Context) (int64, error) { +// return 300, nil +// }, +// LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { +// return []logpoller.Log{ +// { +// BlockNumber: 80, +// BlockHash: [32]byte{1}, +// TxHash: [32]byte{2}, +// LogIndex: 3, +// }, +// }, nil +// }, +// }, +// client: &mockClient{ +// CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { +// receipt.Status = 1 +// receipt.BlockNumber = big.NewInt(80) +// receipt.BlockHash = [32]byte{1} +// return nil +// }, +// }, +// stateReader: &mockStateReader{ +// SelectByWorkIDsFn: func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { +// return []ocr2keepers.UpkeepState{ +// ocr2keepers.UnknownState, +// }, nil +// }, +// }, +// wantBytes: []byte(nil), +// }, +// { +// name: "happy path with check data", +// proposal: ocr2keepers.CoordinatedBlockProposal{ +// UpkeepID: core.GenUpkeepID(types2.LogTrigger, "123"), +// Trigger: func() ocr2keepers.Trigger { +// t := ocr2keepers.NewTrigger( +// ocr2keepers.BlockNumber(80), +// [32]byte{1}, +// ) +// t.LogTriggerExtension = &ocr2keepers.LogTriggerExtension{ +// TxHash: [32]byte{2}, +// Index: uint32(3), +// BlockHash: [32]byte{1}, +// BlockNumber: ocr2keepers.BlockNumber(80), +// } +// return t +// }(), +// WorkID: "7f775793422d178c90e99c3bbdf05181bc6bb6ce13170e87c92ac396bb7ddda0", +// }, +// logPoller: &mockLogPoller{ +// LatestBlockFn: func(ctx context.Context) (int64, error) { +// return 300, nil +// }, +// LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { +// return []logpoller.Log{ +// { +// EvmChainId: ubig.New(big.NewInt(1)), +// LogIndex: 3, +// BlockHash: [32]byte{1}, +// BlockNumber: 80, +// BlockTimestamp: time.Date(2022, 1, 1, 1, 1, 1, 1, time.UTC), +// EventSig: common.HexToHash("abc"), +// TxHash: [32]byte{2}, +// Data: []byte{1, 2, 3}, +// CreatedAt: time.Date(2022, 1, 1, 1, 1, 1, 1, time.UTC), +// }, +// }, nil +// }, +// }, +// client: &mockClient{ +// CallContextFn: func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error { +// receipt.Status = 1 +// receipt.BlockNumber = big.NewInt(80) +// receipt.BlockHash = [32]byte{1} +// return nil +// }, +// }, +// stateReader: &mockStateReader{ +// SelectByWorkIDsFn: func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { +// return []ocr2keepers.UpkeepState{ +// ocr2keepers.UnknownState, +// }, nil +// }, +// }, +// wantBytes: []byte{1, 2, 3}, +// }, +// } { +// t.Run(tc.name, func(t *testing.T) { +// recoverer, filterStore, _, _ := setupTestRecoverer(t, time.Second, 10) +// +// if !tc.skipFilter { +// filterStore.AddActiveUpkeeps(upkeepFilter{ +// addr: []byte("test"), +// topics: []common.Hash{common.HexToHash("0x1"), common.HexToHash("0x2"), common.HexToHash("0x3"), common.HexToHash("0x4")}, +// upkeepID: core.GenUpkeepID(types2.LogTrigger, "123").BigInt(), +// }) +// } +// +// if tc.filterStore != nil { +// recoverer.filterStore = tc.filterStore +// } +// if tc.logPoller != nil { +// recoverer.poller = tc.logPoller +// } +// if tc.client != nil { +// recoverer.client = tc.client +// } +// if tc.stateReader != nil { +// recoverer.states = tc.stateReader +// } +// +// b, err := recoverer.GetProposalData(testutils.Context(t), tc.proposal) +// if tc.expectErr { +// assert.Error(t, err) +// assert.Equal(t, tc.wantErr.Error(), err.Error()) +// } else { +// assert.NoError(t, err) +// assert.Equal(t, tc.wantBytes, b) +// } +// }) +// } +//} +// +//func TestLogRecoverer_pending(t *testing.T) { +// tests := []struct { +// name string +// maxPerUpkeep int +// exist []ocr2keepers.UpkeepPayload +// new []ocr2keepers.UpkeepPayload +// errored []bool +// want []ocr2keepers.UpkeepPayload +// }{ +// { +// name: "empty", +// maxPerUpkeep: 10, +// exist: []ocr2keepers.UpkeepPayload{}, +// new: []ocr2keepers.UpkeepPayload{}, +// errored: []bool{}, +// want: []ocr2keepers.UpkeepPayload{}, +// }, +// { +// name: "add new and existing", +// maxPerUpkeep: 10, +// exist: []ocr2keepers.UpkeepPayload{ +// {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// }, +// new: []ocr2keepers.UpkeepPayload{ +// {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, +// }, +// errored: []bool{false, false}, +// want: []ocr2keepers.UpkeepPayload{ +// {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "2")}, +// }, +// }, +// { +// name: "exceed limits for upkeep", +// maxPerUpkeep: 3, +// exist: []ocr2keepers.UpkeepPayload{ +// {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// {WorkID: "3", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// }, +// new: []ocr2keepers.UpkeepPayload{ +// {WorkID: "4", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// }, +// errored: []bool{true}, +// want: []ocr2keepers.UpkeepPayload{ +// {WorkID: "1", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// {WorkID: "2", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// {WorkID: "3", UpkeepID: core.GenUpkeepID(types2.LogTrigger, "1")}, +// }, +// }, +// } +// +// for _, tc := range tests { +// t.Run(tc.name, func(t *testing.T) { +// origMaxPendingPayloadsPerUpkeep := maxPendingPayloadsPerUpkeep +// maxPendingPayloadsPerUpkeep = tc.maxPerUpkeep +// defer func() { +// maxPendingPayloadsPerUpkeep = origMaxPendingPayloadsPerUpkeep +// }() +// +// r := NewLogRecoverer(logger.TestLogger(t), nil, nil, nil, nil, nil, NewOptions(200, big.NewInt(1))) +// r.lock.Lock() +// r.pending = tc.exist +// for i, p := range tc.new { +// err := r.addPending(p) +// if tc.errored[i] { +// require.Error(t, err) +// continue +// } +// require.NoError(t, err) +// } +// pending := r.pending +// require.GreaterOrEqual(t, len(pending), len(tc.new)) +// require.Equal(t, len(tc.want), len(pending)) +// sort.Slice(pending, func(i, j int) bool { +// return pending[i].WorkID < pending[j].WorkID +// }) +// for i := range pending { +// require.Equal(t, tc.want[i].WorkID, pending[i].WorkID) +// } +// r.lock.Unlock() +// for _, p := range tc.want { +// r.removePending(p.WorkID) +// } +// r.lock.Lock() +// defer r.lock.Unlock() +// require.Equal(t, 0, len(r.pending)) +// }) +// } +//} +// +//type mockFilterStore struct { +// UpkeepFilterStore +// HasFn func(id *big.Int) bool +// RangeFiltersByIDsFn func(iterator func(int, upkeepFilter), ids ...*big.Int) +//} +// +//func (s *mockFilterStore) RangeFiltersByIDs(iterator func(int, upkeepFilter), ids ...*big.Int) { +// s.RangeFiltersByIDsFn(iterator, ids...) +//} +// +//func (s *mockFilterStore) Has(id *big.Int) bool { +// return s.HasFn(id) +//} type mockLogPoller struct { logpoller.LogPoller @@ -1211,31 +1190,32 @@ func (p *mockLogPoller) LatestBlock(ctx context.Context) (logpoller.LogPollerBlo return logpoller.LogPollerBlock{BlockNumber: block}, err } -type mockClient struct { - client.Client - CallContextFn func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error -} - -func (c *mockClient) CallContext(ctx context.Context, r interface{}, method string, args ...interface{}) error { - receipt := r.(*types.Receipt) - return c.CallContextFn(ctx, receipt, method, args) -} - -type mockStateReader struct { - SelectByWorkIDsFn func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) -} +// +//type mockClient struct { +// client.Client +// CallContextFn func(ctx context.Context, receipt *types.Receipt, method string, args ...interface{}) error +//} +// +//func (c *mockClient) CallContext(ctx context.Context, r interface{}, method string, args ...interface{}) error { +// receipt := r.(*types.Receipt) +// return c.CallContextFn(ctx, receipt, method, args) +//} +// +//type mockStateReader struct { +// SelectByWorkIDsFn func(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) +//} +// +//func (r *mockStateReader) SelectByWorkIDs(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { +// return r.SelectByWorkIDsFn(ctx, workIDs...) +//} -func (r *mockStateReader) SelectByWorkIDs(ctx context.Context, workIDs ...string) ([]ocr2keepers.UpkeepState, error) { - return r.SelectByWorkIDsFn(ctx, workIDs...) -} - -func setupTestRecoverer(t *testing.T, interval time.Duration, lookbackBlocks int64) (*logRecoverer, UpkeepFilterStore, *lpmocks.LogPoller, *mocks.UpkeepStateReader) { - lp := new(lpmocks.LogPoller) - statesReader := new(mocks.UpkeepStateReader) - filterStore := NewUpkeepFilterStore() - opts := NewOptions(lookbackBlocks, big.NewInt(1)) - opts.ReadInterval = interval / 5 - opts.LookbackBlocks = lookbackBlocks - recoverer := NewLogRecoverer(logger.TestLogger(t), lp, nil, statesReader, &mockedPacker{}, filterStore, opts) - return recoverer, filterStore, lp, statesReader -} +//func setupTestRecoverer(t *testing.T, interval time.Duration, lookbackBlocks int64) (*logRecoverer, UpkeepFilterStore, *lpmocks.LogPoller, *mocks.UpkeepStateReader) { +// lp := new(lpmocks.LogPoller) +// statesReader := new(mocks.UpkeepStateReader) +// filterStore := NewUpkeepFilterStore() +// opts := NewOptions(lookbackBlocks, big.NewInt(1)) +// opts.ReadInterval = interval / 5 +// opts.LookbackBlocks = lookbackBlocks +// recoverer := NewLogRecoverer(logger.TestLogger(t), lp, nil, statesReader, &mockedPacker{}, filterStore, opts) +// return recoverer, filterStore, lp, statesReader +//}