Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Time recovery calls #13790

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ func (r *logRecoverer) getLogTriggerCheckData(ctx context.Context, proposal ocr2
}

func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers.UpkeepPayload, error) {
start := time.Now()
defer func() {
r.lggr.Debugw("GetRecoveryProposals finished", "time", time.Since(start))
}()

latestBlock, err := r.poller.LatestBlock(ctx)
if err != nil {
return nil, fmt.Errorf("%w: %s", ErrHeadNotAvailable, err)
Expand Down Expand Up @@ -329,6 +334,11 @@ func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers.
}

func (r *logRecoverer) recover(ctx context.Context) error {
startTime := time.Now()
defer func() {
r.lggr.Debugw("recover finished", "time", time.Since(startTime))
}()

latest, err := r.poller.LatestBlock(ctx)
if err != nil {
return fmt.Errorf("%w: %s", ErrHeadNotAvailable, err)
Expand Down Expand Up @@ -367,6 +377,11 @@ func (r *logRecoverer) recover(ctx context.Context) error {

// recoverFilter recovers logs for a single upkeep filter.
func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startBlock, offsetBlock int64) error {
startTime := time.Now()
defer func() {
r.lggr.Debugw("recoverFilter finished", "time", time.Since(startTime))
}()

start := f.lastRePollBlock + 1 // NOTE: we expect f.lastRePollBlock + 1 <= offsetBlock, as others would have been filtered out
// ensure we don't recover logs from before the filter was created
if configUpdateBlock := int64(f.configUpdateBlock); start < configUpdateBlock {
Expand Down Expand Up @@ -437,6 +452,11 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB
// returns the number of logs added, the number of logs that were already pending,
// and a flag that indicates whether some errors happened while we are trying to add to pending q.
func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.Log) (int, int, bool) {
start := time.Now()
defer func() {
r.lggr.Debugw("populatePending finished", "time", time.Since(start))
}()

r.lock.Lock()
defer r.lock.Unlock()

Expand Down Expand Up @@ -584,6 +604,11 @@ func logToTrigger(log logpoller.Log) ocr2keepers.Trigger {
}

func (r *logRecoverer) clean(ctx context.Context) {
start := time.Now()
defer func() {
r.lggr.Debugw("clean finished", "time", time.Since(start))
}()

r.lock.RLock()
var expired []string
for id, t := range r.visited {
Expand Down Expand Up @@ -659,6 +684,11 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {
// addPending adds a payload to the pending list if it's not already there.
// NOTE: the lock must be held before calling this function.
func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) error {
start := time.Now()
defer func() {
r.lggr.Debugw("addPending finished", "time", time.Since(start))
}()

var exist bool
pending := r.pending
upkeepPayloads := 0
Expand All @@ -683,6 +713,11 @@ func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) error {
// removePending removes a payload from the pending list.
// NOTE: the lock must be held before calling this function.
func (r *logRecoverer) removePending(workID string) {
start := time.Now()
defer func() {
r.lggr.Debugw("removePending finished", "time", time.Since(start))
}()

updated := make([]ocr2keepers.UpkeepPayload, 0, len(r.pending))
for _, p := range r.pending {
if p.WorkID != workID {
Expand All @@ -698,6 +733,11 @@ func (r *logRecoverer) removePending(workID string) {
// Divided by 10 to ensure that nodes with similar block numbers won't end up with different order.
// NOTE: the lock must be held before calling this function.
func (r *logRecoverer) sortPending(latestBlock uint64) {
start := time.Now()
defer func() {
r.lggr.Debugw("sortPending finished", "time", time.Since(start))
}()

normalized := latestBlock / 100
if normalized == 0 {
normalized = 1
Expand Down
Loading