Skip to content

Commit

Permalink
refactor to add and use nodeid computation and clear out nodeid tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech committed Jul 16, 2024
1 parent 467c892 commit 56b2fe7
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 54 deletions.
7 changes: 2 additions & 5 deletions packages/beacon-node/src/network/core/networkCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {Eth2Gossipsub, getCoreTopicsAtFork} from "../gossip/index.js";
import {SyncnetsService} from "../subnets/syncnetsService.js";
import {FORK_EPOCH_LOOKAHEAD, getActiveForks} from "../forks.js";
import {NetworkOptions} from "../options.js";
import {CommitteeSubscription, IAttnetsService} from "../subnets/interface.js";
import {CommitteeSubscription, IAttnetsService, computeNodeId} from "../subnets/interface.js";
import {MetadataController} from "../metadata.js";
import {createNodeJsLibp2p} from "../libp2p/index.js";
import {PeersData} from "../peers/peersData.js";
Expand Down Expand Up @@ -195,10 +195,7 @@ export class NetworkCore implements INetworkCore {
await gossip.start();

const enr = opts.discv5?.enr;
const nodeId = enr ? fromHexString(ENR.decodeTxt(enr).nodeId) : null;
if (nodeId === null) {
throw Error("null node id");
}
const nodeId = computeNodeId(peerId);
const attnetsService = new AttnetsService(config, clock, gossip, metadata, logger, metrics, nodeId, opts);
const syncnetsService = new SyncnetsService(config, clock, gossip, metadata, logger, metrics, opts);

Expand Down
21 changes: 5 additions & 16 deletions packages/beacon-node/src/network/peers/discover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {ENRKey, SubnetType} from "../metadata.js";
import {getConnectionsMap, prettyPrintPeerId} from "../util.js";
import {Discv5Worker} from "../discv5/index.js";
import {LodestarDiscv5Opts} from "../discv5/types.js";
import {NodeId} from "../subnets/interface.js";
import {NodeId, computeNodeId} from "../subnets/interface.js";
import {getCustodyColumnSubnets} from "../../util/dataColumns.js";
import {deserializeEnrSubnets, zeroAttnets, zeroSyncnets} from "./utils/enrSubnetsDeserialize.js";
import {IPeerRpcScoreStore, ScoreState} from "./score/index.js";
Expand All @@ -36,7 +36,7 @@ export type PeerDiscoveryOpts = {
};

export type PeerDiscoveryModules = {
nodeId: NodeId,
nodeId: NodeId;
libp2p: Libp2p;
peerRpcScores: IPeerRpcScoreStore;
metrics: NetworkCoreMetrics | null;
Expand Down Expand Up @@ -103,8 +103,6 @@ export class PeerDiscovery {
private logger: LoggerNode;
private config: BeaconConfig;
private cachedENRs = new Map<PeerIdStr, CachedENR>();
private peerIdToNodeId = new Map<PeerIdStr, NodeId>();
private peerIdToMyEnr = new Map<PeerIdStr, ENR>();
private peerIdToCustodySubnetCount = new Map<PeerIdStr, number>();
private randomNodeQuery: QueryStatus = {code: QueryStatusCode.NotActive};
private peersToConnect = 0;
Expand Down Expand Up @@ -349,12 +347,6 @@ export class PeerDiscovery {
}
// async due to some crypto that's no longer necessary
const peerId = await enr.peerId();

const nodeId = fromHexString(enr.nodeId);
this.peerIdToNodeId.set(peerId.toString(), nodeId);
this.peerIdToMyEnr.set(peerId.toString(), enr);
pruneSetToMax(this.peerIdToNodeId, MAX_CACHED_NODEIDS);

// tcp multiaddr is known to be be present, checked inside the worker
const multiaddrTCP = enr.getLocationMultiaddr(ENRKey.tcp);
if (!multiaddrTCP) {
Expand Down Expand Up @@ -393,8 +385,8 @@ export class PeerDiscovery {
syncnets: boolean[],
custodySubnetCount: number
): DiscoveredPeerStatus {
const nodeId = this.peerIdToNodeId.get(peerId.toString());
this.logger.warn("handleDiscoveredPeer", {nodeId: nodeId ? toHexString(nodeId) : null, peerId: peerId.toString()});
const nodeId = computeNodeId(peerId);
this.logger.warn("handleDiscoveredPeer", {nodeId: toHexString(nodeId), peerId: peerId.toString()});
try {
// Check if peer is not banned or disconnected
if (this.peerRpcScores.getScoreState(peerId) !== ScoreState.Healthy) {
Expand Down Expand Up @@ -442,10 +434,7 @@ export class PeerDiscovery {
}

private shouldDialPeer(peer: CachedENR): boolean {
const nodeId = this.peerIdToNodeId.get(peer.peerId.toString());
if (nodeId === undefined) {
return false;
}
const nodeId = computeNodeId(peer.peerId);
const peerCustodySubnetCount = peer.custodySubnetCount;
const peerCustodySubnets = getCustodyColumnSubnets(nodeId, peerCustodySubnetCount);

Expand Down
22 changes: 8 additions & 14 deletions packages/beacon-node/src/network/peers/peerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {NetworkEvent, INetworkEventBus, NetworkEventData} from "../events.js";
import {Libp2p} from "../interface.js";
import {ReqRespMethod} from "../reqresp/ReqRespBeaconNode.js";
import {getConnection, getConnectionsMap, prettyPrintPeerId} from "../util.js";
import {NodeId, SubnetsService} from "../subnets/index.js";
import {NodeId, SubnetsService, computeNodeId} from "../subnets/index.js";
import {SubnetType} from "../metadata.js";
import {Eth2Gossipsub} from "../gossip/gossipsub.js";
import {StatusCache} from "../statusCache.js";
Expand Down Expand Up @@ -392,11 +392,7 @@ export class PeerManager {
peerData.relevantStatus = RelevantPeerStatus.relevant;
}
if (getConnection(this.libp2p, peer.toString())) {
const nodeId = peerData?.nodeId ?? this.discovery?.["peerIdToNodeId"].get(peer.toString());
if (nodeId === undefined) {
this.logger.error("onStatus with nodeId=undefined", {peerId: peer.toString()});
return;
}
const nodeId = peerData?.nodeId ?? computeNodeId(peer);
const custodySubnetCount =
peerData?.custodySubnetCount ?? this.discovery?.["peerIdToCustodySubnetCount"].get(peer.toString());

Expand All @@ -420,19 +416,17 @@ export class PeerManager {
});

if (this.opts.onlyConnectToBiggerDataNodes && !hasAllColumns) {
const enr = this.discovery?.["peerIdToMyEnr"].get(peer.toString());
this.logger.debug(
`ignoring peercontected onlyConnectToBiggerDataNodes=true hasAllColumns=${hasAllColumns}`,
exportENRToJSON(enr)
);
this.logger.debug(`ignoring peercontected onlyConnectToBiggerDataNodes=true hasAllColumns=${hasAllColumns}`, {
nodeId: nodeId ? toHexString(nodeId) : undefined,
peerId: peer.toString(),
});
return;
}

if (this.opts.onlyConnectToMinimalCustodyOverlapNodes && !hasMinCustodyMatchingColumns) {
const enr = this.discovery?.["peerIdToMyEnr"].get(peer.toString());
this.logger.debug(
`ignoring peercontected onlyConnectToMinimalCustodyOverlapNodes=true hasMinCustodyMatchingColumns=${hasMinCustodyMatchingColumns}`,
exportENRToJSON(enr)
{nodeId: nodeId ? toHexString(nodeId) : undefined, peerId: peer.toString()}
);
return;
}
Expand Down Expand Up @@ -649,7 +643,7 @@ export class PeerManager {
// NOTE: libp2p may emit two "peer:connect" events: One for inbound, one for outbound
// If that happens, it's okay. Only the "outbound" connection triggers immediate action
const now = Date.now();
const nodeId = this.discovery?.["peerIdToNodeId"].get(remotePeer.toString()) ?? null;
const nodeId = computeNodeId(remotePeer);
const custodySubnetCount = this.discovery?.["peerIdToCustodySubnetCount"].get(remotePeer.toString()) ?? null;
const peerData: PeerData = {
lastReceivedMsgUnixTsMs: direction === "outbound" ? 0 : now,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {ENR} from "@chainsafe/enr";
import {toHexString} from "@chainsafe/ssz";
import {ChainForkConfig} from "@lodestar/config";
import {deneb, Epoch, phase0, SignedBeaconBlock, Slot, electra, ssz} from "@lodestar/types";
import {ForkSeq, NUMBER_OF_COLUMNS, ForkName} from "@lodestar/params";
Expand All @@ -12,16 +12,14 @@ import {
BlockInputDataBlobs,
BlockInputDataDataColumns,
DataColumnsSource,
DataColumnsCacheMap,
BlockInputType,
getBlockInputDataColumns,
} from "../../chain/blocks/types.js";
import {PeerIdStr} from "../../util/peerId.js";
import {INetwork, WithBytes, WithOptionalBytes} from "../interface.js";
import {CustodyConfig} from "../../util/dataColumns.js";
import {Network} from "../network.js";
import {NetworkCore} from "../core/networkCore.js";
import {getEmptyBlockInputCacheEntry} from "../../chain/seenCache/seenGossipBlockInput.js";
import {computeNodeId} from "../subnets/index.js";

export type PartialDownload = null | {blocks: BlockInput[]; pendingDataColumns: number[]};
export async function beaconBlocksMaybeBlobsByRange(
Expand Down Expand Up @@ -284,16 +282,14 @@ export function matchBlockWithDataColumns(
);

if (dataColumnSidecars.length !== requestedColumns.length || !requestedColumnsPresent) {
const peerEnr = ((network as Network)["core"] as NetworkCore)["peerManager"]?.["discovery"]?.[
"peerIdToMyEnr"
].get(peerId);
console.log(
"matchBlockWithDataColumns",
`Missing or mismatching dataColumnSidecars from peerId=${peerId} for blockSlot=${block.data.message.slot} with numColumns=${custodyColumnsLen} dataColumnSidecars=${dataColumnSidecars.length} requestedColumnsPresent=${requestedColumnsPresent} received dataColumnIndexes=${dataColumnIndexes.join(",")} requested=${requestedColumns.join(",")}`,
{
allBlocks: allBlocks.length,
allDataColumnSidecars: allDataColumnSidecars.length,
peerEnr: exportENRToJSON(peerEnr),
peerId,
nodeId: toHexString(computeNodeId(peerId)),
blobKzgCommitmentsLen,
}
);
Expand Down Expand Up @@ -363,14 +359,3 @@ export function matchBlockWithDataColumns(
}
return blockInputs;
}

function exportENRToJSON(enr?: ENR): Record<string, string | undefined> | undefined {
if (enr === undefined) {
return undefined;
}
return {
ip4: enr.kvs.get("ip")?.toString(),
csc: enr.kvs.get("csc")?.toString(),
nodeId: enr.nodeId,
};
}
17 changes: 17 additions & 0 deletions packages/beacon-node/src/network/subnets/interface.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import {nodeId as computeENRNodeId, IDScheme} from "@chainsafe/enr";
import type {PeerId} from "@libp2p/interface";
import {peerIdFromString} from "@libp2p/peer-id";
import {fromHexString} from "@chainsafe/ssz";
import {ForkName} from "@lodestar/params";
import {Bytes32, Slot, ValidatorIndex} from "@lodestar/types";
import {RequestedSubnet} from "../peers/utils/index.js";
Expand Down Expand Up @@ -48,3 +52,16 @@ export type GossipSubscriber = {

// uint256 in the spec
export type NodeId = Bytes32;
export function computeNodeId(peerIdOrStr: PeerId | PeerIdStr) {
let peerId;
if (typeof peerId === "string") {
peerId = peerIdFromString(peerId);
} else {
peerId = peerIdOrStr as PeerId;
}

if (peerId.publicKey === undefined) {
throw Error("Undefined publicKey");
}
return fromHexString(computeENRNodeId(IDScheme.v4, peerId.publicKey.slice(4)));
}

0 comments on commit 56b2fe7

Please sign in to comment.