diff --git a/packages/beacon-node/src/network/reqresp/ReqRespBeaconNode.ts b/packages/beacon-node/src/network/reqresp/ReqRespBeaconNode.ts index 2a20a3f1f7fd..3787d59e5e84 100644 --- a/packages/beacon-node/src/network/reqresp/ReqRespBeaconNode.ts +++ b/packages/beacon-node/src/network/reqresp/ReqRespBeaconNode.ts @@ -50,7 +50,7 @@ export interface ReqRespBeaconNodeModules { getHandler: GetReqRespHandlerFn; } -export type ReqRespBeaconNodeOpts = ReqRespOpts; +export type ReqRespBeaconNodeOpts = Partial; /** * Implementation of Ethereum Consensus p2p Req/Resp domain. diff --git a/packages/reqresp/src/ReqResp.ts b/packages/reqresp/src/ReqResp.ts index dc1459d87497..80bf0be4a0e6 100644 --- a/packages/reqresp/src/ReqResp.ts +++ b/packages/reqresp/src/ReqResp.ts @@ -4,7 +4,7 @@ import type {Libp2p} from "libp2p"; import {Logger, MetricsRegister} from "@lodestar/utils"; import {getMetrics, Metrics} from "./metrics.js"; import {RequestError, RequestErrorCode, sendRequest, SendRequestOpts} from "./request/index.js"; -import {handleRequest} from "./response/index.js"; +import {sendResponse} from "./response/index.js"; import { DialOnlyProtocol, Encoding, @@ -16,6 +16,12 @@ import { } from "./types.js"; import {formatProtocolID} from "./utils/protocolId.js"; import {ReqRespRateLimiter} from "./rate_limiter/ReqRespRateLimiter.js"; +import { + DEFAULT_DIAL_TIMEOUT, + DEFAULT_REQUEST_TIMEOUT, + DEFAULT_RESP_TIMEOUT, + DEFAULT_TTFB_TIMEOUT, +} from "./constants.js"; type ProtocolID = string; @@ -48,6 +54,13 @@ export class ReqResp { protected readonly libp2p: Libp2p; protected readonly logger: Logger; protected readonly metrics: Metrics | null; + protected readonly timeouts: SendRequestOpts = { + requestTimeoutMs: DEFAULT_REQUEST_TIMEOUT, + dialTimeoutMs: DEFAULT_DIAL_TIMEOUT, + respTimeoutMs: DEFAULT_RESP_TIMEOUT, + ttfbTimeoutMs: DEFAULT_TTFB_TIMEOUT, + }; + protected getPeerLogMetadata?: (peerId: string) => string; // to not be used by extending class private readonly rateLimiter: ReqRespRateLimiter; @@ -60,15 +73,18 @@ export class ReqResp { private readonly registeredProtocols = new Map(); private readonly dialOnlyProtocols = new Map(); - constructor( - modules: ReqRespProtocolModules, - private readonly opts: ReqRespOpts = {} - ) { + constructor(modules: ReqRespProtocolModules, opts?: Partial) { this.libp2p = modules.libp2p; this.logger = modules.logger; this.metrics = modules.metricsRegister ? getMetrics(modules.metricsRegister) : null; - this.protocolPrefix = opts.protocolPrefix ?? DEFAULT_PROTOCOL_PREFIX; + this.protocolPrefix = opts?.protocolPrefix ?? DEFAULT_PROTOCOL_PREFIX; this.rateLimiter = new ReqRespRateLimiter(opts); + this.getPeerLogMetadata = opts?.getPeerLogMetadata; + + this.timeouts.requestTimeoutMs = opts?.requestTimeoutMs ?? this.timeouts.requestTimeoutMs; + this.timeouts.respTimeoutMs = opts?.respTimeoutMs ?? this.timeouts.respTimeoutMs; + this.timeouts.dialTimeoutMs = opts?.dialTimeoutMs ?? this.timeouts.dialTimeoutMs; + this.timeouts.ttfbTimeoutMs = opts?.ttfbTimeoutMs ?? this.timeouts.ttfbTimeoutMs; } /** @@ -155,7 +171,7 @@ export class ReqResp { encoding: Encoding, body: Uint8Array ): AsyncIterable { - const peerClient = this.opts.getPeerLogMetadata?.(peerId.toString()); + const peerClient = this.getPeerLogMetadata?.(peerId.toString()); this.metrics?.outgoingRequests.inc({method}); const timer = this.metrics?.outgoingRequestRoundtripTime.startTimer({method}); @@ -173,16 +189,14 @@ export class ReqResp { } try { - yield* sendRequest( - {logger: this.logger, libp2p: this.libp2p, metrics: this.metrics, peerClient}, - peerId, + yield* sendRequest(peerId, protocolIDs, { + ...this.timeouts, + requestBody: body, + modules: {logger: this.logger, libp2p: this.libp2p, metrics: this.metrics, peerClient}, protocols, - protocolIDs, - body, - this.controller.signal, - this.opts, - this.reqCount++ - ); + signal: this.controller.signal, + requestId: this.reqCount++, + }); } catch (e) { this.metrics?.outgoingErrors.inc({method}); @@ -207,7 +221,7 @@ export class ReqResp { } const peerId = connection.remotePeer; - const peerClient = this.opts.getPeerLogMetadata?.(peerId.toString()); + const peerClient = this.getPeerLogMetadata?.(peerId.toString()); const {method} = protocol; this.metrics?.incomingRequests.inc({method}); @@ -216,18 +230,18 @@ export class ReqResp { this.onIncomingRequest?.(peerId, protocol); try { - await handleRequest({ - logger: this.logger, - metrics: this.metrics, + await sendResponse(peerId, protocolID, { + modules: { + logger: this.logger, + metrics: this.metrics, + rateLimiter: this.rateLimiter, + peerClient, + }, stream, - peerId, protocol: protocol as Protocol, - protocolID, - rateLimiter: this.rateLimiter, signal: this.controller.signal, requestId: this.reqCount++, - peerClient, - requestTimeoutMs: this.opts.requestTimeoutMs, + requestTimeoutMs: this.timeouts.requestTimeoutMs, }); // TODO: Do success peer scoring here } catch (err) { diff --git a/packages/reqresp/src/constants.ts b/packages/reqresp/src/constants.ts new file mode 100644 index 000000000000..598e34a14cb2 --- /dev/null +++ b/packages/reqresp/src/constants.ts @@ -0,0 +1,5 @@ +// Default spec values from https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/phase0/p2p-interface.md#configuration +export const DEFAULT_DIAL_TIMEOUT = 5 * 1000; // 5 sec +export const DEFAULT_REQUEST_TIMEOUT = 5 * 1000; // 5 sec +export const DEFAULT_TTFB_TIMEOUT = 5 * 1000; // 5 sec +export const DEFAULT_RESP_TIMEOUT = 10 * 1000; // 10 sec diff --git a/packages/reqresp/src/request/index.ts b/packages/reqresp/src/request/index.ts index 4920a32eb221..345e11156bf2 100644 --- a/packages/reqresp/src/request/index.ts +++ b/packages/reqresp/src/request/index.ts @@ -13,21 +13,15 @@ import {RequestError, RequestErrorCode, responseStatusErrorToRequestError} from export {RequestError, RequestErrorCode}; -// Default spec values from https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/phase0/p2p-interface.md#configuration -export const DEFAULT_DIAL_TIMEOUT = 5 * 1000; // 5 sec -export const DEFAULT_REQUEST_TIMEOUT = 5 * 1000; // 5 sec -export const DEFAULT_TTFB_TIMEOUT = 5 * 1000; // 5 sec -export const DEFAULT_RESP_TIMEOUT = 10 * 1000; // 10 sec - export interface SendRequestOpts { /** The maximum time for complete response transfer. */ - respTimeoutMs?: number; + respTimeoutMs: number; /** Non-spec timeout from sending request until write stream closed by responder */ - requestTimeoutMs?: number; + requestTimeoutMs: number; /** The maximum time to wait for first byte of request response (time-to-first-byte). */ - ttfbTimeoutMs?: number; + ttfbTimeoutMs: number; /** Non-spec timeout from dialing protocol until stream opened */ - dialTimeoutMs?: number; + dialTimeoutMs: number; } type SendRequestModules = { @@ -42,31 +36,39 @@ type SendRequestModules = { * * 1. Dial peer, establish duplex stream * 2. Encoded and write request to peer. Expect the responder to close the stream's write side - * 3. Read and decode reponse(s) from peer. Will close the read stream if: + * 3. Read and decode response(s) from peer. Will close the read stream if: * - An error result is received in one of the chunks. Reads the error_message and throws. * - The responder closes the stream. If at the end or start of a , return. Otherwise throws * - Any part of the response_chunk fails validation. Throws a typed error (see `SszSnappyError`) * - The maximum number of requested chunks are read. Does not throw, returns read chunks only. */ export async function* sendRequest( - {logger, libp2p, metrics, peerClient}: SendRequestModules, peerId: PeerId, - protocols: MixedProtocol[], protocolIDs: string[], - requestBody: Uint8Array, - signal?: AbortSignal, - opts?: SendRequestOpts, - requestId = 0 + options: SendRequestOpts & { + requestBody: Uint8Array; + modules: SendRequestModules; + protocols: MixedProtocol[]; + signal?: AbortSignal; + requestId?: number; + } ): AsyncIterable { + const { + modules: {libp2p, peerClient, logger, metrics}, + protocols, + requestId = 0, + requestBody, + signal, + requestTimeoutMs, + dialTimeoutMs, + ttfbTimeoutMs, + respTimeoutMs, + } = options; + if (protocols.length === 0) { throw Error("sendRequest must set > 0 protocols"); } - const DIAL_TIMEOUT = opts?.dialTimeoutMs ?? DEFAULT_DIAL_TIMEOUT; - const REQUEST_TIMEOUT = opts?.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT; - const TTFB_TIMEOUT = opts?.ttfbTimeoutMs ?? DEFAULT_TTFB_TIMEOUT; - const RESP_TIMEOUT = opts?.respTimeoutMs ?? DEFAULT_RESP_TIMEOUT; - const peerIdStrShort = prettyPrintPeerId(peerId); const {method, encoding, version} = protocols[0]; const logCtx = {method, version, encoding, client: peerClient, peer: peerIdStrShort, requestId}; @@ -102,7 +104,7 @@ export async function* sendRequest( if (!conn) throw Error("dialProtocol timeout"); return conn; }, - DIAL_TIMEOUT, + dialTimeoutMs, signal ).catch((e: Error) => { if (e instanceof TimeoutError) { @@ -127,7 +129,7 @@ export async function* sendRequest( // REQUEST_TIMEOUT: Non-spec timeout from sending request until write stream closed by responder // Note: libp2p.stop() will close all connections, so not necessary to abort this pipe on parent stop - await withTimeout(() => pipe(requestEncode(protocol, requestBody), stream.sink), REQUEST_TIMEOUT, signal).catch( + await withTimeout(() => pipe(requestEncode(protocol, requestBody), stream.sink), requestTimeoutMs, signal).catch( (e) => { // Must close the stream read side (stream.source) manually AND the write side stream.abort(e); @@ -155,12 +157,12 @@ export async function* sendRequest( const ttfbTimeoutController = new AbortController(); const respTimeoutController = new AbortController(); - const timeoutTTFB = setTimeout(() => ttfbTimeoutController.abort(), TTFB_TIMEOUT); + const timeoutTTFB = setTimeout(() => ttfbTimeoutController.abort(), ttfbTimeoutMs); let timeoutRESP: NodeJS.Timeout | null = null; const restartRespTimeout = (): void => { if (timeoutRESP) clearTimeout(timeoutRESP); - timeoutRESP = setTimeout(() => respTimeoutController.abort(), RESP_TIMEOUT); + timeoutRESP = setTimeout(() => respTimeoutController.abort(), respTimeoutMs); }; try { diff --git a/packages/reqresp/src/response/index.ts b/packages/reqresp/src/response/index.ts index 27758caa3f24..eb81026914db 100644 --- a/packages/reqresp/src/response/index.ts +++ b/packages/reqresp/src/response/index.ts @@ -14,23 +14,20 @@ import {ResponseError} from "./errors.js"; export {ResponseError}; -// Default spec values from https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/phase0/p2p-interface.md#configuration -export const DEFAULT_REQUEST_TIMEOUT = 5 * 1000; // 5 sec - -export interface HandleRequestOpts { - logger: Logger; - metrics: Metrics | null; +export interface SendResponseOpts { + modules: { + logger: Logger; + metrics: Metrics | null; + rateLimiter: ReqRespRateLimiter; + /** Peer client type for logging and metrics: 'prysm' | 'lighthouse' */ + peerClient?: string; + }; stream: Stream; - peerId: PeerId; protocol: Protocol; - protocolID: string; - rateLimiter: ReqRespRateLimiter; signal?: AbortSignal; requestId?: number; - /** Peer client type for logging and metrics: 'prysm' | 'lighthouse' */ - peerClient?: string; /** Non-spec timeout from sending request until write stream closed by responder */ - requestTimeoutMs?: number; + requestTimeoutMs: number; } /** @@ -43,21 +40,18 @@ export interface HandleRequestOpts { * 4a. Encode and write `` to peer * 4b. On error, encode and write an error `` and stop */ -export async function handleRequest({ - logger, - metrics, - stream, - peerId, - protocol, - protocolID, - rateLimiter, - signal, - requestId = 0, - peerClient = "unknown", - requestTimeoutMs, -}: HandleRequestOpts): Promise { - const REQUEST_TIMEOUT = requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT; - +export async function sendResponse( + peerId: PeerId, + protocolID: string, + { + modules: {logger, metrics, rateLimiter, peerClient = "unknown"}, + protocol, + signal, + stream, + requestId = 0, + requestTimeoutMs, + }: SendResponseOpts +): Promise { const logCtx = {method: protocol.method, client: peerClient, peer: prettyPrintPeerId(peerId), requestId}; let responseError: Error | null = null; @@ -72,7 +66,7 @@ export async function handleRequest({ const requestBody = await withTimeout( () => pipe(stream.source as AsyncIterable, requestDecode(protocol)), - REQUEST_TIMEOUT, + requestTimeoutMs, signal ).catch((e: unknown) => { if (e instanceof TimeoutError) { diff --git a/packages/reqresp/test/unit/request/index.test.ts b/packages/reqresp/test/unit/request/index.test.ts index 888520457bb2..8b394f061fd7 100644 --- a/packages/reqresp/test/unit/request/index.test.ts +++ b/packages/reqresp/test/unit/request/index.test.ts @@ -14,6 +14,19 @@ import {responseEncode} from "../../utils/response.js"; import {RespStatus} from "../../../src/interface.js"; import {expectRejectedWithLodestarError} from "../../utils/errors.js"; import {pingProtocol} from "../../fixtures/protocols.js"; +import { + DEFAULT_DIAL_TIMEOUT, + DEFAULT_REQUEST_TIMEOUT, + DEFAULT_RESP_TIMEOUT, + DEFAULT_TTFB_TIMEOUT, +} from "../../../src/constants.js"; + +const DEFAULT_TIMEOUTS = { + requestTimeoutMs: DEFAULT_REQUEST_TIMEOUT, + dialTimeoutMs: DEFAULT_DIAL_TIMEOUT, + ttfbTimeoutMs: DEFAULT_TTFB_TIMEOUT, + respTimeoutMs: DEFAULT_RESP_TIMEOUT, +}; describe("request / sendRequest", () => { const logger = getEmptyLogger(); @@ -70,12 +83,15 @@ describe("request / sendRequest", () => { const responses = await pipe( sendRequest( - {logger, libp2p, metrics: null}, peerId, - protocols, protocols.map((p) => p.method), - EMPTY_REQUEST, - controller.signal + { + modules: {logger, libp2p, metrics: null}, + protocols, + requestBody: EMPTY_REQUEST, + signal: controller.signal, + ...DEFAULT_TIMEOUTS, + } ), all ); @@ -89,13 +105,13 @@ describe("request / sendRequest", () => { const timeoutTestCases: { id: string; - opts?: SendRequestOpts; + opts: SendRequestOpts; source: () => AsyncGenerator; error?: LodestarError; }[] = [ { id: "trigger a TTFB_TIMEOUT", - opts: {ttfbTimeoutMs: 0}, + opts: {...DEFAULT_TIMEOUTS, ttfbTimeoutMs: 0}, source: async function* () { await sleep(30); // Pause for too long before first byte yield sszSnappyPing.chunks[0]; @@ -104,7 +120,7 @@ describe("request / sendRequest", () => { }, { id: "trigger a RESP_TIMEOUT", - opts: {respTimeoutMs: 0}, + opts: {...DEFAULT_TIMEOUTS, respTimeoutMs: 0}, source: async function* () { yield sszSnappyPing.chunks[0]; await sleep(30); // Pause for too long after first byte @@ -115,7 +131,7 @@ describe("request / sendRequest", () => { { // Upstream "abortable-iterator" never throws with an infinite sleep. id: "Infinite sleep on first byte", - opts: {ttfbTimeoutMs: 1, respTimeoutMs: 1}, + opts: {...DEFAULT_TIMEOUTS, ttfbTimeoutMs: 1, respTimeoutMs: 1}, source: async function* () { await sleep(100000, controller.signal); yield sszSnappyPing.chunks[0]; @@ -124,7 +140,7 @@ describe("request / sendRequest", () => { }, { id: "Infinite sleep on second chunk", - opts: {ttfbTimeoutMs: 1, respTimeoutMs: 1}, + opts: {...DEFAULT_TIMEOUTS, ttfbTimeoutMs: 1, respTimeoutMs: 1}, source: async function* () { yield sszSnappyPing.chunks[0]; await sleep(100000, controller.signal); @@ -141,15 +157,13 @@ describe("request / sendRequest", () => { await expectRejectedWithLodestarError( pipe( - sendRequest( - {logger, libp2p, metrics: null}, - peerId, - [emptyProtocol], - [testMethod], - EMPTY_REQUEST, - controller.signal, - opts - ), + sendRequest(peerId, [testMethod], { + modules: {logger, libp2p, metrics: null}, + protocols: [emptyProtocol], + requestBody: EMPTY_REQUEST, + signal: controller.signal, + ...opts, + }), all ), error as LodestarError diff --git a/packages/reqresp/test/unit/response/index.test.ts b/packages/reqresp/test/unit/response/index.test.ts index 62a8e63f3fe0..cc531f6f6f66 100644 --- a/packages/reqresp/test/unit/response/index.test.ts +++ b/packages/reqresp/test/unit/response/index.test.ts @@ -4,12 +4,13 @@ import {LodestarError, fromHex} from "@lodestar/utils"; import {getEmptyLogger} from "@lodestar/logger/empty"; import {Protocol, RespStatus} from "../../../src/index.js"; import {ReqRespRateLimiter} from "../../../src/rate_limiter/ReqRespRateLimiter.js"; -import {handleRequest} from "../../../src/response/index.js"; +import {sendResponse} from "../../../src/response/index.js"; import {sszSnappyPing} from "../../fixtures/messages.js"; import {expectRejectedWithLodestarError} from "../../utils/errors.js"; import {MockLibP2pStream, expectEqualByteChunks} from "../../utils/index.js"; import {getValidPeerId} from "../../utils/peer.js"; import {pingProtocol} from "../../fixtures/protocols.js"; +import {DEFAULT_REQUEST_TIMEOUT} from "../../../src/constants.js"; const testCases: { id: string; @@ -58,15 +59,12 @@ describe("response / handleRequest", () => { const stream = new MockLibP2pStream(requestChunks as any); const rateLimiter = new ReqRespRateLimiter({rateLimitMultiplier: 0}); - const resultPromise = handleRequest({ - logger, - metrics: null, + const resultPromise = sendResponse(peerId, protocol.method, { + modules: {logger, metrics: null, rateLimiter}, protocol, - protocolID: protocol.method, stream, - peerId, signal: controller.signal, - rateLimiter, + requestTimeoutMs: DEFAULT_REQUEST_TIMEOUT, }); // Make sure the test error-ed with expected error, otherwise it's hard to debug with responseChunks