diff --git a/packages/beacon-node/src/chain/initState.ts b/packages/beacon-node/src/chain/initState.ts index a0bd5c4968a1..20a2188136b5 100644 --- a/packages/beacon-node/src/chain/initState.ts +++ b/packages/beacon-node/src/chain/initState.ts @@ -87,7 +87,7 @@ export async function initStateFromEth1({ const builder = new GenesisBuilder({ config, - eth1Provider: new Eth1Provider(config, opts, signal), + eth1Provider: new Eth1Provider(config, {...opts, logger}, signal), logger, signal, pendingStatus: diff --git a/packages/beacon-node/src/eth1/eth1DepositDataTracker.ts b/packages/beacon-node/src/eth1/eth1DepositDataTracker.ts index a7fc91fafb06..f36f70abbbc4 100644 --- a/packages/beacon-node/src/eth1/eth1DepositDataTracker.ts +++ b/packages/beacon-node/src/eth1/eth1DepositDataTracker.ts @@ -175,16 +175,16 @@ export class Eth1DepositDataTracker { await sleep(sleepTimeMs, this.signal); } } catch (e) { + this.metrics?.eth1.depositTrackerUpdateErrors.inc(1); + // From Infura: 429 Too Many Requests if (e instanceof HttpRpcError && e.status === 429) { this.logger.debug("Eth1 provider rate limited", {}, e); await sleep(RATE_LIMITED_WAIT_MS, this.signal); + // only log error if state switched from online to some other state } else if (!isErrorAborted(e)) { - this.logger.error("Error updating eth1 chain cache", {}, e as Error); await sleep(MIN_WAIT_ON_ERROR_MS, this.signal); } - - this.metrics?.eth1.depositTrackerUpdateErrors.inc(1); } } } diff --git a/packages/beacon-node/src/eth1/index.ts b/packages/beacon-node/src/eth1/index.ts index f53af42ff6a3..9fdba90258a2 100644 --- a/packages/beacon-node/src/eth1/index.ts +++ b/packages/beacon-node/src/eth1/index.ts @@ -67,7 +67,13 @@ export class Eth1ForBlockProduction implements IEth1ForBlockProduction { modules: Eth1DepositDataTrackerModules & Eth1MergeBlockTrackerModules & {eth1Provider?: IEth1Provider} ) { const eth1Provider = - modules.eth1Provider || new Eth1Provider(modules.config, opts, modules.signal, modules.metrics?.eth1HttpClient); + modules.eth1Provider || + new Eth1Provider( + modules.config, + {...opts, logger: modules.logger}, + modules.signal, + modules.metrics?.eth1HttpClient + ); this.eth1DepositDataTracker = opts.disableEth1DepositDataTracker ? null diff --git a/packages/beacon-node/src/eth1/interface.ts b/packages/beacon-node/src/eth1/interface.ts index 550213f9b252..fc9626eb5b8a 100644 --- a/packages/beacon-node/src/eth1/interface.ts +++ b/packages/beacon-node/src/eth1/interface.ts @@ -29,6 +29,14 @@ export interface IEth1Provider { getBlocksByNumber(fromBlock: number, toBlock: number): Promise; getDepositEvents(fromBlock: number, toBlock: number): Promise; validateContract(): Promise; + getState(): Eth1ProviderState; +} + +export enum Eth1ProviderState { + ONLINE = "ONLINE", + OFFLINE = "OFFLINE", + ERROR = "ERROR", + AUTH_FAILED = "AUTH_FAILED", } export type Eth1DataAndDeposits = { diff --git a/packages/beacon-node/src/eth1/provider/eth1Provider.ts b/packages/beacon-node/src/eth1/provider/eth1Provider.ts index 3fe5913d7a87..eb8f37d37489 100644 --- a/packages/beacon-node/src/eth1/provider/eth1Provider.ts +++ b/packages/beacon-node/src/eth1/provider/eth1Provider.ts @@ -1,15 +1,25 @@ import {toHexString} from "@chainsafe/ssz"; import {phase0} from "@lodestar/types"; import {ChainConfig} from "@lodestar/config"; -import {fromHex} from "@lodestar/utils"; +import {fromHex, isErrorAborted, createElapsedTimeTracker} from "@lodestar/utils"; +import {Logger} from "@lodestar/logger"; +import {FetchError, isFetchError} from "@lodestar/api"; import {linspace} from "../../util/numpy.js"; import {depositEventTopics, parseDepositLog} from "../utils/depositContract.js"; -import {Eth1Block, IEth1Provider} from "../interface.js"; +import {Eth1Block, Eth1ProviderState, IEth1Provider} from "../interface.js"; import {DEFAULT_PROVIDER_URLS, Eth1Options} from "../options.js"; import {isValidAddress} from "../../util/address.js"; import {EthJsonRpcBlockRaw} from "../interface.js"; -import {JsonRpcHttpClient, JsonRpcHttpClientMetrics, ReqOpts} from "./jsonRpcHttpClient.js"; +import {HTTP_CONNECTION_ERROR_CODES, HTTP_FATAL_ERROR_CODES} from "../../execution/engine/utils.js"; +import { + ErrorJsonRpcResponse, + HttpRpcError, + JsonRpcHttpClient, + JsonRpcHttpClientEvent, + JsonRpcHttpClientMetrics, + ReqOpts, +} from "./jsonRpcHttpClient.js"; import {isJsonRpcTruncatedError, quantityToNum, numToQuantity, dataToBytes} from "./utils.js"; /* eslint-disable @typescript-eslint/naming-convention */ @@ -42,17 +52,23 @@ const getBlockByHashOpts: ReqOpts = {routeId: "getBlockByHash"}; const getBlockNumberOpts: ReqOpts = {routeId: "getBlockNumber"}; const getLogsOpts: ReqOpts = {routeId: "getLogs"}; +const isOneMinutePassed = createElapsedTimeTracker({minElapsedTime: 60_000}); + export class Eth1Provider implements IEth1Provider { readonly deployBlock: number; private readonly depositContractAddress: string; private readonly rpc: JsonRpcHttpClient; + // The default state is ONLINE, it will be updated to offline if we receive a http error + private state: Eth1ProviderState = Eth1ProviderState.ONLINE; + private logger?: Logger; constructor( config: Pick, - opts: Pick, + opts: Pick & {logger?: Logger}, signal?: AbortSignal, metrics?: JsonRpcHttpClientMetrics | null ) { + this.logger = opts.logger; this.deployBlock = opts.depositContractDeployBlock ?? 0; this.depositContractAddress = toHexString(config.DEPOSIT_CONTRACT_ADDRESS); this.rpc = new JsonRpcHttpClient(opts.providerUrls ?? DEFAULT_PROVIDER_URLS, { @@ -62,6 +78,44 @@ export class Eth1Provider implements IEth1Provider { jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined, metrics: metrics, }); + + this.rpc.emitter.on(JsonRpcHttpClientEvent.RESPONSE, () => { + const oldState = this.state; + this.state = Eth1ProviderState.ONLINE; + + if (oldState !== Eth1ProviderState.ONLINE) { + this.logger?.info("Eth1Provider is back online", {oldState, newState: this.state}); + } + }); + + this.rpc.emitter.on(JsonRpcHttpClientEvent.ERROR, ({error}) => { + if (isErrorAborted(error)) { + this.state = Eth1ProviderState.ONLINE; + } else if ((error as unknown) instanceof HttpRpcError || (error as unknown) instanceof ErrorJsonRpcResponse) { + this.state = Eth1ProviderState.ERROR; + } else if (error && isFetchError(error) && HTTP_FATAL_ERROR_CODES.includes((error as FetchError).code)) { + this.state = Eth1ProviderState.OFFLINE; + } else if (error && isFetchError(error) && HTTP_CONNECTION_ERROR_CODES.includes((error as FetchError).code)) { + this.state = Eth1ProviderState.AUTH_FAILED; + } + + if (this.state !== Eth1ProviderState.ONLINE) { + if (isOneMinutePassed()) { + this.logger?.error( + "Eth1Provider faced error", + { + state: this.state, + lastErrorAt: new Date(Date.now() - isOneMinutePassed.msSinceLastCall).toLocaleTimeString(), + }, + error + ); + } + } + }); + } + + getState(): Eth1ProviderState { + return this.state; } async validateContract(): Promise { diff --git a/packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts b/packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts index 59454f8c2f9e..9207dc21909f 100644 --- a/packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts +++ b/packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts @@ -1,8 +1,33 @@ +import {EventEmitter} from "events"; +import StrictEventEmitter from "strict-event-emitter-types"; import {fetch} from "@lodestar/api"; import {ErrorAborted, TimeoutError, isValidHttpUrl, retry} from "@lodestar/utils"; import {IGauge, IHistogram} from "../../metrics/interface.js"; import {IJson, RpcPayload} from "../interface.js"; import {encodeJwtToken} from "./jwt.js"; + +export enum JsonRpcHttpClientEvent { + /** + * When registered this event will be emitted before the client throws an error. + * This is useful for defining the error behavior in a common place at the time of declaration of the client. + */ + ERROR = "jsonRpcHttpClient:error", + /** + * When registered this event will be emitted before the client returns the valid response to the caller. + * This is useful for defining some common behavior for each request/response cycle + */ + RESPONSE = "jsonRpcHttpClient:response", +} + +export type JsonRpcHttpClientEvents = { + [JsonRpcHttpClientEvent.ERROR]: (event: {payload?: unknown; error: Error}) => void; + [JsonRpcHttpClientEvent.RESPONSE]: (event: {payload?: unknown; response: unknown}) => void; +}; + +export class JsonRpcHttpClientEventEmitter extends (EventEmitter as { + new (): StrictEventEmitter; +}) {} + /** * Limits the amount of response text printed with RPC or parsing errors */ @@ -46,6 +71,7 @@ export interface IJsonRpcHttpClient { fetch(payload: RpcPayload

, opts?: ReqOpts): Promise; fetchWithRetries(payload: RpcPayload

, opts?: ReqOpts): Promise; fetchBatch(rpcPayloadArr: RpcPayload[], opts?: ReqOpts): Promise; + emitter: JsonRpcHttpClientEventEmitter; } export class JsonRpcHttpClient implements IJsonRpcHttpClient { @@ -58,6 +84,7 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient { */ private readonly jwtSecret?: Uint8Array; private readonly metrics: JsonRpcHttpClientMetrics | null; + readonly emitter = new JsonRpcHttpClientEventEmitter(); constructor( private readonly urls: string[], @@ -107,31 +134,38 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient { * Perform RPC request */ async fetch(payload: RpcPayload

, opts?: ReqOpts): Promise { - const res: RpcResponse = await this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts); - return parseRpcResponse(res, payload); + return this.wrapWithEvents( + async () => { + const res: RpcResponse = await this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts); + return parseRpcResponse(res, payload); + }, + {payload} + ); } /** * Perform RPC request with retry */ async fetchWithRetries(payload: RpcPayload

, opts?: ReqOpts): Promise { - const routeId = opts?.routeId ?? "unknown"; - - const res = await retry>( - async (attempt) => { - /** If this is a retry, increment the retry counter for this method */ - if (attempt > 1) { - this.opts?.metrics?.retryCount.inc({routeId}); + return this.wrapWithEvents(async () => { + const routeId = opts?.routeId ?? "unknown"; + + const res = await retry>( + async (attempt) => { + /** If this is a retry, increment the retry counter for this method */ + if (attempt > 1) { + this.opts?.metrics?.retryCount.inc({routeId}); + } + return this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts); + }, + { + retries: opts?.retryAttempts ?? this.opts?.retryAttempts ?? 1, + retryDelay: opts?.retryDelay ?? this.opts?.retryDelay ?? 0, + shouldRetry: opts?.shouldRetry, } - return this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts); - }, - { - retries: opts?.retryAttempts ?? this.opts?.retryAttempts ?? 1, - retryDelay: opts?.retryDelay ?? this.opts?.retryDelay ?? 0, - shouldRetry: opts?.shouldRetry, - } - ); - return parseRpcResponse(res, payload); + ); + return parseRpcResponse(res, payload); + }, payload); } /** @@ -139,26 +173,41 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient { * Type-wise assumes all requests results have the same type */ async fetchBatch(rpcPayloadArr: RpcPayload[], opts?: ReqOpts): Promise { - if (rpcPayloadArr.length === 0) return []; - - const resArr: RpcResponse[] = await this.fetchJson( - rpcPayloadArr.map(({method, params}) => ({jsonrpc: "2.0", method, params, id: this.id++})), - opts - ); + return this.wrapWithEvents(async () => { + if (rpcPayloadArr.length === 0) return []; + + const resArr: RpcResponse[] = await this.fetchJson( + rpcPayloadArr.map(({method, params}) => ({jsonrpc: "2.0", method, params, id: this.id++})), + opts + ); + + if (!Array.isArray(resArr)) { + // Nethermind may reply to batch request with a JSON RPC error + if ((resArr as RpcResponseError).error !== undefined) { + throw new ErrorJsonRpcResponse(resArr as RpcResponseError, "batch"); + } - if (!Array.isArray(resArr)) { - // Nethermind may reply to batch request with a JSON RPC error - if ((resArr as RpcResponseError).error !== undefined) { - throw new ErrorJsonRpcResponse(resArr as RpcResponseError, "batch"); + throw Error(`expected array of results, got ${resArr} - ${jsonSerializeTry(resArr)}`); } - throw Error(`expected array of results, got ${resArr} - ${jsonSerializeTry(resArr)}`); - } + return resArr.map((res, i) => parseRpcResponse(res, rpcPayloadArr[i])); + }, rpcPayloadArr); + } - return resArr.map((res, i) => parseRpcResponse(res, rpcPayloadArr[i])); + private async wrapWithEvents(func: () => Promise, payload?: unknown): Promise { + try { + const response = await func(); + this.emitter.emit(JsonRpcHttpClientEvent.RESPONSE, {payload, response}); + return response; + } catch (error) { + this.emitter.emit(JsonRpcHttpClientEvent.ERROR, {payload, error: error as Error}); + throw error; + } } private async fetchJson(json: T, opts?: ReqOpts): Promise { + if (this.urls.length === 0) throw Error("No url provided"); + const routeId = opts?.routeId ?? "unknown"; let lastError: Error | null = null; @@ -170,21 +219,13 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient { try { return await this.fetchJsonOneUrl(this.urls[i], json, opts); } catch (e) { + lastError = e as Error; if (this.opts?.shouldNotFallback?.(e as Error)) { - throw e; + break; } - - lastError = e as Error; } } - - if (lastError !== null) { - throw lastError; - } else if (this.urls.length === 0) { - throw Error("No url provided"); - } else { - throw Error("Unknown error"); - } + throw lastError ?? Error("Unknown error"); } /** diff --git a/packages/beacon-node/src/execution/engine/http.ts b/packages/beacon-node/src/execution/engine/http.ts index 9be9b0d3be99..6f5b3553dcb4 100644 --- a/packages/beacon-node/src/execution/engine/http.ts +++ b/packages/beacon-node/src/execution/engine/http.ts @@ -5,13 +5,13 @@ import { ErrorJsonRpcResponse, HttpRpcError, IJsonRpcHttpClient, + JsonRpcHttpClientEvent, ReqOpts, } from "../../eth1/provider/jsonRpcHttpClient.js"; import {Metrics} from "../../metrics/index.js"; import {JobItemQueue} from "../../util/queue/index.js"; import {EPOCHS_PER_BATCH} from "../../sync/constants.js"; import {numToQuantity} from "../../eth1/provider/utils.js"; -import {IJson, RpcPayload} from "../../eth1/interface.js"; import { ExecutionPayloadStatus, ExecutePayloadResponse, @@ -110,7 +110,7 @@ export class ExecutionEngineHttp implements IExecutionEngine { private readonly rpcFetchQueue: JobItemQueue<[EngineRequest], EngineResponse>; private jobQueueProcessor = async ({method, params, methodOpts}: EngineRequest): Promise => { - return this.fetchWithRetries( + return this.rpc.fetchWithRetries( {method, params}, methodOpts ); @@ -126,22 +126,14 @@ export class ExecutionEngineHttp implements IExecutionEngine { metrics?.engineHttpProcessorQueue ); this.logger = logger; - } - protected async fetchWithRetries(payload: RpcPayload

, opts?: ReqOpts): Promise { - try { - const res = await this.rpc.fetchWithRetries(payload, opts); + this.rpc.emitter.on(JsonRpcHttpClientEvent.ERROR, ({error}) => { + this.updateEngineState(getExecutionEngineState({payloadError: error, oldState: this.state})); + }); + + this.rpc.emitter.on(JsonRpcHttpClientEvent.RESPONSE, () => { this.updateEngineState(getExecutionEngineState({targetState: ExecutionEngineState.ONLINE, oldState: this.state})); - return res; - } catch (err) { - this.updateEngineState(getExecutionEngineState({payloadError: err, oldState: this.state})); - - /* - * TODO: For some error cases as abort, we may not want to escalate the error to the caller - * But for now the higher level code handles such cases so we can just rethrow the error - */ - throw err; - } + }); } /** @@ -370,7 +362,7 @@ export class ExecutionEngineHttp implements IExecutionEngine { : ForkSeq[fork] >= ForkSeq.capella ? "engine_getPayloadV2" : "engine_getPayloadV1"; - const payloadResponse = await this.fetchWithRetries< + const payloadResponse = await this.rpc.fetchWithRetries< EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method] >( @@ -390,7 +382,7 @@ export class ExecutionEngineHttp implements IExecutionEngine { async getPayloadBodiesByHash(blockHashes: RootHex[]): Promise<(ExecutionPayloadBody | null)[]> { const method = "engine_getPayloadBodiesByHashV1"; assertReqSizeLimit(blockHashes.length, 32); - const response = await this.fetchWithRetries< + const response = await this.rpc.fetchWithRetries< EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method] >({method, params: [blockHashes]}); @@ -405,7 +397,7 @@ export class ExecutionEngineHttp implements IExecutionEngine { assertReqSizeLimit(blockCount, 32); const start = numToQuantity(startBlockNumber); const count = numToQuantity(blockCount); - const response = await this.fetchWithRetries< + const response = await this.rpc.fetchWithRetries< EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method] >({method, params: [start, count]}); diff --git a/packages/beacon-node/src/execution/engine/utils.ts b/packages/beacon-node/src/execution/engine/utils.ts index 13ad8d855062..e661af8daf70 100644 --- a/packages/beacon-node/src/execution/engine/utils.ts +++ b/packages/beacon-node/src/execution/engine/utils.ts @@ -1,7 +1,13 @@ import {isFetchError} from "@lodestar/api"; import {isErrorAborted} from "@lodestar/utils"; import {IJson, RpcPayload} from "../../eth1/interface.js"; -import {IJsonRpcHttpClient, ErrorJsonRpcResponse, HttpRpcError} from "../../eth1/provider/jsonRpcHttpClient.js"; +import { + IJsonRpcHttpClient, + ErrorJsonRpcResponse, + HttpRpcError, + JsonRpcHttpClientEventEmitter, + JsonRpcHttpClientEvent, +} from "../../eth1/provider/jsonRpcHttpClient.js"; import {isQueueErrorAborted} from "../../util/queue/errors.js"; import {ExecutionPayloadStatus, ExecutionEngineState} from "./interface.js"; @@ -11,16 +17,20 @@ export type JsonRpcBackend = { }; export class ExecutionEngineMockJsonRpcClient implements IJsonRpcHttpClient { + readonly emitter = new JsonRpcHttpClientEventEmitter(); + constructor(private readonly backend: JsonRpcBackend) {} async fetch(payload: RpcPayload

): Promise { - const handler = this.backend.handlers[payload.method]; - if (handler === undefined) { - throw Error(`Unknown method ${payload.method}`); - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - return handler(...(payload.params as any[])) as R; + return this.wrapWithEvents(async () => { + const handler = this.backend.handlers[payload.method]; + if (handler === undefined) { + throw Error(`Unknown method ${payload.method}`); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return handler(...(payload.params as any[])) as R; + }, payload); } fetchWithRetries(payload: RpcPayload

): Promise { @@ -30,6 +40,17 @@ export class ExecutionEngineMockJsonRpcClient implements IJsonRpcHttpClient { fetchBatch(rpcPayloadArr: RpcPayload[]): Promise { return Promise.all(rpcPayloadArr.map((payload) => this.fetch(payload))); } + + private async wrapWithEvents(func: () => Promise, payload?: unknown): Promise { + try { + const response = await func(); + this.emitter.emit(JsonRpcHttpClientEvent.RESPONSE, {payload, response}); + return response; + } catch (error) { + this.emitter.emit(JsonRpcHttpClientEvent.ERROR, {payload, error: error as Error}); + throw error; + } + } } export const HTTP_FATAL_ERROR_CODES = ["ECONNREFUSED", "ENOTFOUND", "EAI_AGAIN"]; diff --git a/packages/beacon-node/test/unit/chain/bls/bls.test.ts b/packages/beacon-node/test/unit/chain/bls/bls.test.ts index 89a5e48a9db9..f2da1d0a886b 100644 --- a/packages/beacon-node/test/unit/chain/bls/bls.test.ts +++ b/packages/beacon-node/test/unit/chain/bls/bls.test.ts @@ -4,7 +4,7 @@ import {CoordType} from "@chainsafe/blst"; import {PublicKey} from "@chainsafe/bls/types"; import {ISignatureSet, SignatureSetType} from "@lodestar/state-transition"; import {BlsSingleThreadVerifier} from "../../../../src/chain/bls/singleThread.js"; -import {BlsMultiThreadWorkerPool} from "../../../../lib/chain/bls/multithread/index.js"; +import {BlsMultiThreadWorkerPool} from "../../../../src/chain/bls/multithread/index.js"; import {testLogger} from "../../../utils/logger.js"; describe("BlsVerifier ", function () { diff --git a/packages/beacon-node/test/unit/chain/genesis/genesis.test.ts b/packages/beacon-node/test/unit/chain/genesis/genesis.test.ts index 78be9a88e4be..eb117646f7fc 100644 --- a/packages/beacon-node/test/unit/chain/genesis/genesis.test.ts +++ b/packages/beacon-node/test/unit/chain/genesis/genesis.test.ts @@ -10,7 +10,7 @@ import {ErrorAborted} from "@lodestar/utils"; import {GenesisBuilder} from "../../../../src/chain/genesis/genesis.js"; import {testLogger} from "../../../utils/logger.js"; import {ZERO_HASH_HEX} from "../../../../src/constants/index.js"; -import {EthJsonRpcBlockRaw, IEth1Provider} from "../../../../src/eth1/interface.js"; +import {Eth1ProviderState, EthJsonRpcBlockRaw, IEth1Provider} from "../../../../src/eth1/interface.js"; describe("genesis builder", function () { const logger = testLogger(); @@ -62,6 +62,7 @@ describe("genesis builder", function () { validateContract: async () => { return; }, + getState: () => Eth1ProviderState.ONLINE, ...eth1Provider, }; } diff --git a/packages/beacon-node/test/unit/eth1/eth1MergeBlockTracker.test.ts b/packages/beacon-node/test/unit/eth1/eth1MergeBlockTracker.test.ts index f9dafcf1fe7e..828573bbb06b 100644 --- a/packages/beacon-node/test/unit/eth1/eth1MergeBlockTracker.test.ts +++ b/packages/beacon-node/test/unit/eth1/eth1MergeBlockTracker.test.ts @@ -5,7 +5,7 @@ import {sleep} from "@lodestar/utils"; import {IEth1Provider} from "../../../src/index.js"; import {ZERO_HASH} from "../../../src/constants/index.js"; import {Eth1MergeBlockTracker, StatusCode, toPowBlock} from "../../../src/eth1/eth1MergeBlockTracker.js"; -import {EthJsonRpcBlockRaw} from "../../../src/eth1/interface.js"; +import {Eth1ProviderState, EthJsonRpcBlockRaw} from "../../../src/eth1/interface.js"; import {testLogger} from "../../utils/logger.js"; /* eslint-disable @typescript-eslint/naming-convention */ @@ -57,6 +57,7 @@ describe("eth1 / Eth1MergeBlockTracker", () => { validateContract: async (): Promise => { throw Error("Not implemented"); }, + getState: () => Eth1ProviderState.ONLINE, }; const eth1MergeBlockTracker = new Eth1MergeBlockTracker( @@ -133,6 +134,7 @@ describe("eth1 / Eth1MergeBlockTracker", () => { validateContract: async (): Promise => { throw Error("Not implemented"); }, + getState: () => Eth1ProviderState.ONLINE, }; await runFindMergeBlockTest(eth1Provider, blocks[blocks.length - 1]); @@ -216,6 +218,7 @@ describe("eth1 / Eth1MergeBlockTracker", () => { validateContract: async (): Promise => { throw Error("Not implemented"); }, + getState: () => Eth1ProviderState.ONLINE, }; } diff --git a/packages/utils/src/waitFor.ts b/packages/utils/src/waitFor.ts index 91206267e6ca..ea4fdf91b9ed 100644 --- a/packages/utils/src/waitFor.ts +++ b/packages/utils/src/waitFor.ts @@ -51,3 +51,33 @@ export function waitFor(condition: () => boolean, opts: WaitForOpts = {}): Promi }; }); } + +export interface ElapsedTimeTracker { + (): boolean; + msSinceLastCall: number; +} + +/** + * Create a tracker which keeps track of the last time a function was called + * + * @param durationMs + * @returns + */ +export function createElapsedTimeTracker({minElapsedTime}: {minElapsedTime: number}): ElapsedTimeTracker { + // Initialized with undefined as the function has not been called yet + let lastTimeCalled: number | undefined = undefined; + + function elapsedTimeTracker(): boolean { + const now = Date.now(); + const msSinceLastCall = now - (lastTimeCalled ?? 0); + lastTimeCalled = now; + + return msSinceLastCall > minElapsedTime; + } + + return Object.assign(elapsedTimeTracker, { + get msSinceLastCall() { + return Date.now() - (lastTimeCalled ?? 0); + }, + }); +} diff --git a/packages/utils/test/unit/waitFor.test.ts b/packages/utils/test/unit/waitFor.test.ts index 1dd3dec766b7..d659be3d4bcb 100644 --- a/packages/utils/test/unit/waitFor.test.ts +++ b/packages/utils/test/unit/waitFor.test.ts @@ -1,7 +1,8 @@ import "../setup.js"; import {expect} from "chai"; -import {waitFor} from "../../src/waitFor.js"; +import {waitFor, createElapsedTimeTracker} from "../../src/waitFor.js"; import {ErrorAborted, TimeoutError} from "../../src/errors.js"; +import {sleep} from "../../src/sleep.js"; describe("waitFor", () => { const interval = 10; @@ -35,3 +36,29 @@ describe("waitFor", () => { await expect(waitFor(() => true, {interval, timeout, signal: controller.signal})).to.be.rejectedWith(ErrorAborted); }); }); + +describe("waitForElapsedTime", () => { + it("should true for the first time", () => { + const callIfTimePassed = createElapsedTimeTracker({minElapsedTime: 1000}); + + expect(callIfTimePassed()).to.be.true; + }); + + it("should return true after the minElapsedTime has passed", async () => { + const callIfTimePassed = createElapsedTimeTracker({minElapsedTime: 100}); + callIfTimePassed(); + + await sleep(150); + + expect(callIfTimePassed()).to.be.true; + }); + + it("should return false before the minElapsedTime has passed", async () => { + const callIfTimePassed = createElapsedTimeTracker({minElapsedTime: 100}); + callIfTimePassed(); + + await sleep(10); + + expect(callIfTimePassed()).to.be.false; + }); +});