Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(reqresp): improve the reqresp request/response methods #6801

Open
wants to merge 2 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export interface ReqRespBeaconNodeModules {
getHandler: GetReqRespHandlerFn;
}

export type ReqRespBeaconNodeOpts = ReqRespOpts;
export type ReqRespBeaconNodeOpts = Partial<ReqRespOpts>;

/**
* Implementation of Ethereum Consensus p2p Req/Resp domain.
Expand Down
64 changes: 39 additions & 25 deletions packages/reqresp/src/ReqResp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type {Libp2p} from "libp2p";
import {Logger, MetricsRegister} from "@lodestar/utils";
import {getMetrics, Metrics} from "./metrics.js";
import {RequestError, RequestErrorCode, sendRequest, SendRequestOpts} from "./request/index.js";
import {handleRequest} from "./response/index.js";
import {sendResponse} from "./response/index.js";
import {
DialOnlyProtocol,
Encoding,
Expand All @@ -16,6 +16,12 @@ import {
} from "./types.js";
import {formatProtocolID} from "./utils/protocolId.js";
import {ReqRespRateLimiter} from "./rate_limiter/ReqRespRateLimiter.js";
import {
DEFAULT_DIAL_TIMEOUT,
DEFAULT_REQUEST_TIMEOUT,
DEFAULT_RESP_TIMEOUT,
DEFAULT_TTFB_TIMEOUT,
} from "./constants.js";

type ProtocolID = string;

Expand Down Expand Up @@ -48,6 +54,13 @@ export class ReqResp {
protected readonly libp2p: Libp2p;
protected readonly logger: Logger;
protected readonly metrics: Metrics | null;
protected readonly timeouts: SendRequestOpts = {
requestTimeoutMs: DEFAULT_REQUEST_TIMEOUT,
dialTimeoutMs: DEFAULT_DIAL_TIMEOUT,
respTimeoutMs: DEFAULT_RESP_TIMEOUT,
ttfbTimeoutMs: DEFAULT_TTFB_TIMEOUT,
};
protected getPeerLogMetadata?: (peerId: string) => string;

// to not be used by extending class
private readonly rateLimiter: ReqRespRateLimiter;
Expand All @@ -60,15 +73,18 @@ export class ReqResp {
private readonly registeredProtocols = new Map<ProtocolID, MixedProtocol>();
private readonly dialOnlyProtocols = new Map<ProtocolID, boolean>();

constructor(
modules: ReqRespProtocolModules,
private readonly opts: ReqRespOpts = {}
) {
constructor(modules: ReqRespProtocolModules, opts?: Partial<ReqRespOpts>) {
this.libp2p = modules.libp2p;
this.logger = modules.logger;
this.metrics = modules.metricsRegister ? getMetrics(modules.metricsRegister) : null;
this.protocolPrefix = opts.protocolPrefix ?? DEFAULT_PROTOCOL_PREFIX;
this.protocolPrefix = opts?.protocolPrefix ?? DEFAULT_PROTOCOL_PREFIX;
this.rateLimiter = new ReqRespRateLimiter(opts);
this.getPeerLogMetadata = opts?.getPeerLogMetadata;

this.timeouts.requestTimeoutMs = opts?.requestTimeoutMs ?? this.timeouts.requestTimeoutMs;
this.timeouts.respTimeoutMs = opts?.respTimeoutMs ?? this.timeouts.respTimeoutMs;
this.timeouts.dialTimeoutMs = opts?.dialTimeoutMs ?? this.timeouts.dialTimeoutMs;
this.timeouts.ttfbTimeoutMs = opts?.ttfbTimeoutMs ?? this.timeouts.ttfbTimeoutMs;
}

/**
Expand Down Expand Up @@ -155,7 +171,7 @@ export class ReqResp {
encoding: Encoding,
body: Uint8Array
): AsyncIterable<ResponseIncoming> {
const peerClient = this.opts.getPeerLogMetadata?.(peerId.toString());
const peerClient = this.getPeerLogMetadata?.(peerId.toString());
this.metrics?.outgoingRequests.inc({method});
const timer = this.metrics?.outgoingRequestRoundtripTime.startTimer({method});

Expand All @@ -173,16 +189,14 @@ export class ReqResp {
}

try {
yield* sendRequest(
{logger: this.logger, libp2p: this.libp2p, metrics: this.metrics, peerClient},
peerId,
yield* sendRequest(peerId, protocolIDs, {
...this.timeouts,
requestBody: body,
modules: {logger: this.logger, libp2p: this.libp2p, metrics: this.metrics, peerClient},
protocols,
protocolIDs,
body,
this.controller.signal,
this.opts,
this.reqCount++
);
signal: this.controller.signal,
requestId: this.reqCount++,
});
} catch (e) {
this.metrics?.outgoingErrors.inc({method});

Expand All @@ -207,7 +221,7 @@ export class ReqResp {
}

const peerId = connection.remotePeer;
const peerClient = this.opts.getPeerLogMetadata?.(peerId.toString());
const peerClient = this.getPeerLogMetadata?.(peerId.toString());
const {method} = protocol;

this.metrics?.incomingRequests.inc({method});
Expand All @@ -216,18 +230,18 @@ export class ReqResp {
this.onIncomingRequest?.(peerId, protocol);

try {
await handleRequest({
logger: this.logger,
metrics: this.metrics,
await sendResponse(peerId, protocolID, {
modules: {
logger: this.logger,
metrics: this.metrics,
rateLimiter: this.rateLimiter,
peerClient,
},
stream,
peerId,
protocol: protocol as Protocol,
protocolID,
rateLimiter: this.rateLimiter,
signal: this.controller.signal,
requestId: this.reqCount++,
peerClient,
requestTimeoutMs: this.opts.requestTimeoutMs,
requestTimeoutMs: this.timeouts.requestTimeoutMs,
});
// TODO: Do success peer scoring here
} catch (err) {
Expand Down
5 changes: 5 additions & 0 deletions packages/reqresp/src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Default spec values from https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/phase0/p2p-interface.md#configuration
export const DEFAULT_DIAL_TIMEOUT = 5 * 1000; // 5 sec
export const DEFAULT_REQUEST_TIMEOUT = 5 * 1000; // 5 sec
export const DEFAULT_TTFB_TIMEOUT = 5 * 1000; // 5 sec
export const DEFAULT_RESP_TIMEOUT = 10 * 1000; // 10 sec
54 changes: 28 additions & 26 deletions packages/reqresp/src/request/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,15 @@ import {RequestError, RequestErrorCode, responseStatusErrorToRequestError} from

export {RequestError, RequestErrorCode};

// Default spec values from https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/phase0/p2p-interface.md#configuration
export const DEFAULT_DIAL_TIMEOUT = 5 * 1000; // 5 sec
export const DEFAULT_REQUEST_TIMEOUT = 5 * 1000; // 5 sec
export const DEFAULT_TTFB_TIMEOUT = 5 * 1000; // 5 sec
export const DEFAULT_RESP_TIMEOUT = 10 * 1000; // 10 sec

export interface SendRequestOpts {
/** The maximum time for complete response transfer. */
respTimeoutMs?: number;
respTimeoutMs: number;
/** Non-spec timeout from sending request until write stream closed by responder */
requestTimeoutMs?: number;
requestTimeoutMs: number;
/** The maximum time to wait for first byte of request response (time-to-first-byte). */
ttfbTimeoutMs?: number;
ttfbTimeoutMs: number;
/** Non-spec timeout from dialing protocol until stream opened */
dialTimeoutMs?: number;
dialTimeoutMs: number;
}

type SendRequestModules = {
Expand All @@ -42,31 +36,39 @@ type SendRequestModules = {
*
* 1. Dial peer, establish duplex stream
* 2. Encoded and write request to peer. Expect the responder to close the stream's write side
* 3. Read and decode reponse(s) from peer. Will close the read stream if:
* 3. Read and decode response(s) from peer. Will close the read stream if:
* - An error result is received in one of the chunks. Reads the error_message and throws.
* - The responder closes the stream. If at the end or start of a <response_chunk>, return. Otherwise throws
* - Any part of the response_chunk fails validation. Throws a typed error (see `SszSnappyError`)
* - The maximum number of requested chunks are read. Does not throw, returns read chunks only.
*/
export async function* sendRequest(
{logger, libp2p, metrics, peerClient}: SendRequestModules,
peerId: PeerId,
protocols: MixedProtocol[],
protocolIDs: string[],
requestBody: Uint8Array,
signal?: AbortSignal,
opts?: SendRequestOpts,
requestId = 0
options: SendRequestOpts & {
requestBody: Uint8Array;
modules: SendRequestModules;
protocols: MixedProtocol[];
signal?: AbortSignal;
requestId?: number;
}
): AsyncIterable<ResponseIncoming> {
const {
modules: {libp2p, peerClient, logger, metrics},
protocols,
requestId = 0,
requestBody,
signal,
requestTimeoutMs,
dialTimeoutMs,
ttfbTimeoutMs,
respTimeoutMs,
} = options;

if (protocols.length === 0) {
throw Error("sendRequest must set > 0 protocols");
}

const DIAL_TIMEOUT = opts?.dialTimeoutMs ?? DEFAULT_DIAL_TIMEOUT;
const REQUEST_TIMEOUT = opts?.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT;
const TTFB_TIMEOUT = opts?.ttfbTimeoutMs ?? DEFAULT_TTFB_TIMEOUT;
const RESP_TIMEOUT = opts?.respTimeoutMs ?? DEFAULT_RESP_TIMEOUT;

const peerIdStrShort = prettyPrintPeerId(peerId);
const {method, encoding, version} = protocols[0];
const logCtx = {method, version, encoding, client: peerClient, peer: peerIdStrShort, requestId};
Expand Down Expand Up @@ -102,7 +104,7 @@ export async function* sendRequest(
if (!conn) throw Error("dialProtocol timeout");
return conn;
},
DIAL_TIMEOUT,
dialTimeoutMs,
signal
).catch((e: Error) => {
if (e instanceof TimeoutError) {
Expand All @@ -127,7 +129,7 @@ export async function* sendRequest(

// REQUEST_TIMEOUT: Non-spec timeout from sending request until write stream closed by responder
// Note: libp2p.stop() will close all connections, so not necessary to abort this pipe on parent stop
await withTimeout(() => pipe(requestEncode(protocol, requestBody), stream.sink), REQUEST_TIMEOUT, signal).catch(
await withTimeout(() => pipe(requestEncode(protocol, requestBody), stream.sink), requestTimeoutMs, signal).catch(
(e) => {
// Must close the stream read side (stream.source) manually AND the write side
stream.abort(e);
Expand Down Expand Up @@ -155,12 +157,12 @@ export async function* sendRequest(
const ttfbTimeoutController = new AbortController();
const respTimeoutController = new AbortController();

const timeoutTTFB = setTimeout(() => ttfbTimeoutController.abort(), TTFB_TIMEOUT);
const timeoutTTFB = setTimeout(() => ttfbTimeoutController.abort(), ttfbTimeoutMs);
let timeoutRESP: NodeJS.Timeout | null = null;

const restartRespTimeout = (): void => {
if (timeoutRESP) clearTimeout(timeoutRESP);
timeoutRESP = setTimeout(() => respTimeoutController.abort(), RESP_TIMEOUT);
timeoutRESP = setTimeout(() => respTimeoutController.abort(), respTimeoutMs);
};

try {
Expand Down
50 changes: 22 additions & 28 deletions packages/reqresp/src/response/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,20 @@ import {ResponseError} from "./errors.js";

export {ResponseError};

// Default spec values from https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/phase0/p2p-interface.md#configuration
export const DEFAULT_REQUEST_TIMEOUT = 5 * 1000; // 5 sec

export interface HandleRequestOpts {
logger: Logger;
metrics: Metrics | null;
export interface SendResponseOpts {
modules: {
logger: Logger;
metrics: Metrics | null;
rateLimiter: ReqRespRateLimiter;
/** Peer client type for logging and metrics: 'prysm' | 'lighthouse' */
peerClient?: string;
};
stream: Stream;
peerId: PeerId;
protocol: Protocol;
protocolID: string;
rateLimiter: ReqRespRateLimiter;
signal?: AbortSignal;
requestId?: number;
/** Peer client type for logging and metrics: 'prysm' | 'lighthouse' */
peerClient?: string;
/** Non-spec timeout from sending request until write stream closed by responder */
requestTimeoutMs?: number;
requestTimeoutMs: number;
}

/**
Expand All @@ -43,21 +40,18 @@ export interface HandleRequestOpts {
* 4a. Encode and write `<response_chunks>` to peer
* 4b. On error, encode and write an error `<response_chunk>` and stop
*/
export async function handleRequest({
logger,
metrics,
stream,
peerId,
protocol,
protocolID,
rateLimiter,
signal,
requestId = 0,
peerClient = "unknown",
requestTimeoutMs,
}: HandleRequestOpts): Promise<void> {
const REQUEST_TIMEOUT = requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT;

export async function sendResponse(
peerId: PeerId,
protocolID: string,
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did this function signature go from f({a, b, c, d, e}) to f(a, b, {c, modules: {d, e}})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it consistent for request and response methods. Now both functions sendResponse and sendRequest have same function signature with similar options.

modules: {logger, metrics, rateLimiter, peerClient = "unknown"},
protocol,
signal,
stream,
requestId = 0,
requestTimeoutMs,
}: SendResponseOpts
): Promise<void> {
const logCtx = {method: protocol.method, client: peerClient, peer: prettyPrintPeerId(peerId), requestId};

let responseError: Error | null = null;
Expand All @@ -72,7 +66,7 @@ export async function handleRequest({

const requestBody = await withTimeout(
() => pipe(stream.source as AsyncIterable<Uint8ArrayList>, requestDecode(protocol)),
REQUEST_TIMEOUT,
requestTimeoutMs,
signal
).catch((e: unknown) => {
if (e instanceof TimeoutError) {
Expand Down
Loading
Loading