diff --git a/.changeset/mean-tomatoes-beg.md b/.changeset/mean-tomatoes-beg.md new file mode 100644 index 0000000000..eefedfed0c --- /dev/null +++ b/.changeset/mean-tomatoes-beg.md @@ -0,0 +1,5 @@ +--- +"@farcaster/hubble": patch +--- + +fix: direct peering data needs to be processed by the worker due to obscure node behavior diff --git a/apps/hubble/src/cli.ts b/apps/hubble/src/cli.ts index 9f407285e7..9252c95811 100644 --- a/apps/hubble/src/cli.ts +++ b/apps/hubble/src/cli.ts @@ -1,8 +1,6 @@ import { FarcasterNetwork, farcasterNetworkFromJSON } from "@farcaster/hub-nodejs"; -import { peerIdFromString } from "@libp2p/peer-id"; import { Ed25519PeerId, PeerId, RSAPeerId, Secp256k1PeerId } from "@libp2p/interface"; import { createEd25519PeerId, createFromProtobuf, exportToProtobuf } from "@libp2p/peer-id-factory"; -import { AddrInfo } from "@chainsafe/libp2p-gossipsub/types"; import { Command } from "commander"; import fs, { existsSync } from "fs"; import { mkdir, readFile, writeFile } from "fs/promises"; @@ -481,32 +479,7 @@ app ); } - const directPeers = ((cliOptions.directPeers ?? hubConfig.directPeers ?? []) as string[]) - .map((a) => parseAddress(a)) - .map((a) => { - if (a.isErr()) { - logger.warn( - { errorCode: a.error.errCode, message: a.error.message }, - "Couldn't parse direct peer address, ignoring", - ); - } else if (a.value.getPeerId()) { - logger.warn( - { errorCode: "unavailable", message: "peer id missing from direct peer" }, - "Direct peer missing peer id, ignoring", - ); - } - - return a; - }) - .filter((a) => a.isOk() && a.value.getPeerId()) - .map((a) => a._unsafeUnwrap()) - .map((a) => { - return { - id: peerIdFromString(a.getPeerId() ?? ""), - addrs: [a], - } as AddrInfo; - }); - + const directPeers = (cliOptions.directPeers ?? hubConfig.directPeers ?? []) as string[]; const rebuildSyncTrie = cliOptions.rebuildSyncTrie ?? hubConfig.rebuildSyncTrie ?? false; const profileSync = cliOptions.profileSync ?? hubConfig.profileSync ?? false; diff --git a/apps/hubble/src/hubble.ts b/apps/hubble/src/hubble.ts index 77dda369fc..c4fbf4fc2e 100644 --- a/apps/hubble/src/hubble.ts +++ b/apps/hubble/src/hubble.ts @@ -305,7 +305,7 @@ export interface HubOptions { pruneEventsJobCron?: string; /** A list of addresses the node directly peers with, provided in MultiAddr format */ - directPeers?: AddrInfo[]; + directPeers?: string[]; /** If set, snapshot sync is disabled */ disableSnapshotSync?: boolean; diff --git a/apps/hubble/src/network/p2p/gossipNode.ts b/apps/hubble/src/network/p2p/gossipNode.ts index 60b9f6f789..bd886df060 100644 --- a/apps/hubble/src/network/p2p/gossipNode.ts +++ b/apps/hubble/src/network/p2p/gossipNode.ts @@ -77,7 +77,7 @@ export interface NodeOptions { /** A list of peerIds that are not allowed to connect to this node */ deniedPeerIdStrs?: string[] | undefined; /** A list of addresses the node directly peers with, provided in MultiAddr format */ - directPeers?: AddrInfo[] | undefined; + directPeers?: string[] | undefined; /** Override peer scoring. Useful for tests */ scoreThresholds?: Partial; /** A list of PeerIds that will bypass application-specific peer scoring and return the cap. */ diff --git a/apps/hubble/src/network/p2p/gossipNodeWorker.ts b/apps/hubble/src/network/p2p/gossipNodeWorker.ts index 6cb344757c..8861118e5c 100644 --- a/apps/hubble/src/network/p2p/gossipNodeWorker.ts +++ b/apps/hubble/src/network/p2p/gossipNodeWorker.ts @@ -1,5 +1,6 @@ import { parentPort, workerData } from "worker_threads"; -import { peerIdFromBytes } from "@libp2p/peer-id"; +import { peerIdFromBytes, peerIdFromString } from "@libp2p/peer-id"; +import { AddrInfo } from "@chainsafe/libp2p-gossipsub/types"; import { autoNAT } from "@libp2p/autonat"; import { identify } from "@libp2p/identify"; import { ping } from "@libp2p/ping"; @@ -35,7 +36,7 @@ import { Message, toFarcasterTime, } from "@farcaster/hub-nodejs"; -import { addressInfoFromParts, checkNodeAddrs, ipMultiAddrStrFromAddressInfo } from "../../utils/p2p.js"; +import { addressInfoFromParts, checkNodeAddrs, ipMultiAddrStrFromAddressInfo, parseAddress } from "../../utils/p2p.js"; import { createLibp2p, Libp2p } from "libp2p"; import { err, ok, Result, ResultAsync } from "neverthrow"; import { GossipSub, gossipsub, GossipsubEvents } from "@chainsafe/libp2p-gossipsub"; @@ -159,11 +160,37 @@ export class LibP2PNode { ? parseInt(process.env["GOSSIPSUB_SOCKET_TIMEOUT"]) : 30000; + const directPeers = options.directPeers + ?.map((a) => parseAddress(a)) + .map((a) => { + if (a.isErr()) { + logger.warn( + { errorCode: a.error.errCode, message: a.error.message }, + "Couldn't parse direct peer address, ignoring", + ); + } else if (!a.value.getPeerId()) { + logger.warn( + { errorCode: "unavailable", message: "peer id missing from direct peer" }, + "Direct peer missing peer id, ignoring", + ); + } + + return a; + }) + .filter((a) => a.isOk() && a.value.getPeerId()) + .map((a) => a._unsafeUnwrap()) + .map((a) => { + return { + id: peerIdFromString(a.getPeerId() ?? ""), + addrs: [a], + } as AddrInfo; + }); + const gossip = gossipsub({ allowPublishToZeroTopicPeers: true, asyncValidation: true, // Do not forward messages until we've merged it (prevents forwarding known bad messages) canRelayMessage: true, - directPeers: options.directPeers || [], + directPeers: directPeers || [], emitSelf: false, fallbackToFloodsub: fallbackToFloodsub, floodPublish: floodPublish,