Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(hubble): improve libp2p sync #1912

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ jobs:
strategy:
matrix:
include:
- node_version: 18
- node_version: 21
runs_on: 'buildjet-8vcpu-ubuntu-2204'
- node_version: 20
- node_version: 21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep both 20 and 21 (but can remove 18) from the matrix

runs_on: 'buildjet-8vcpu-ubuntu-2204'
- node_version: 21
runs_on: 'buildjet-16vcpu-ubuntu-2204-arm'
Expand Down
22 changes: 11 additions & 11 deletions apps/hubble/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
"test:ci": "yarn build:all && yarn test:rust --release && ENVIRONMENT=test NODE_OPTIONS=\"--experimental-vm-modules --max-old-space-size=4096\" jest --ci --forceExit --coverage -w 2"
},
"devDependencies": {
"@libp2p/interface-mocks": "^9.0.0",
"@types/async-lock": "^1.4.0",
"@types/chance": "^1.1.3",
"@types/cli-progress": "^3.11.0",
Expand All @@ -69,31 +68,32 @@
},
"dependencies": {
"@aws-sdk/client-s3": "^3.400.0",
"@aws-sdk/client-sts": "^3.398.0",
"@aws-sdk/lib-storage": "^3.504.0",
"@chainsafe/libp2p-gossipsub": "6.1.0",
"@chainsafe/libp2p-noise": "^11.0.0 ",
"@chainsafe/libp2p-gossipsub": "13.0.0",
"@chainsafe/libp2p-noise": "^15.0.0",
"@faker-js/faker": "~7.6.0",
"@farcaster/hub-nodejs": "^0.11.9",
"@fastify/cors": "^8.4.0",
"@figma/hot-shots": "^9.0.0-figma.1",
"@grpc/grpc-js": "~1.8.21",
"@libp2p/interface-connection": "^3.0.2",
"@libp2p/interface-peer-id": "^2.0.1",
"@libp2p/mplex": "^7.0.0",
"@libp2p/peer-id-factory": "^2.0.0",
"@libp2p/tcp": "^6.0.0",
"@libp2p/utils": "^3.0.2",
"@libp2p/interface-compliance-tests": "^5.3.4",
"@libp2p/mplex": "^10.0.18",
"@libp2p/peer-id-factory": "^4.0.9",
"@libp2p/tcp": "^9.0.18",
"@libp2p/utils": "^5.2.8",
"@libp2p/mdns": "^10.0.20",
"@libp2p/kad-dht": "^12.0.13",
"@multiformats/multiaddr": "^11.0.0",
"@noble/curves": "^1.0.0",
"@noble/ed25519": "^2.1.0",
"abitype": "^0.8.3",
"async-lock": "^1.4.0",
"axios": "^1.6.0",
"bs58": "^5.0.0",
"cli-progress": "^3.12.0",
"commander": "~10.0.0",
"fastify": "^4.22.0",
"libp2p": "0.43.4",
"libp2p": "^1.3.2",
"neverthrow": "~6.0.0",
"node-cron": "~3.0.2",
"pino": "~8.11.0",
Expand Down
4 changes: 2 additions & 2 deletions apps/hubble/src/cli.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { FarcasterNetwork, farcasterNetworkFromJSON } from "@farcaster/hub-nodejs";
import { peerIdFromString } from "@libp2p/peer-id";
import { PeerId } from "@libp2p/interface-peer-id";
import { PeerId } from "@libp2p/interface";
import { createEd25519PeerId, createFromProtobuf, exportToProtobuf } from "@libp2p/peer-id-factory";
import { AddrInfo } from "@chainsafe/libp2p-gossipsub/types";
import { Command } from "commander";
Expand Down Expand Up @@ -313,7 +313,7 @@ app
}

// Read PeerID from 1. CLI option, 2. Environment variable, 3. Config file
let peerId;
let peerId: PeerId;
if (cliOptions.id) {
const peerIdR = await ResultAsync.fromPromise(readPeerId(resolve(cliOptions.id)), (e) => e);
if (peerIdR.isErr()) {
Expand Down
22 changes: 17 additions & 5 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import {
validations,
HashScheme,
} from "@farcaster/hub-nodejs";
import { PeerId } from "@libp2p/interface-peer-id";
import { Connection, PeerId } from "@libp2p/interface";
import { peerIdFromBytes, peerIdFromString } from "@libp2p/peer-id";
import { publicAddressesFirst } from "@libp2p/utils/address-sort";
import { unmarshalPrivateKey, unmarshalPublicKey } from "@libp2p/crypto/keys";
import { Multiaddr, multiaddr } from "@multiformats/multiaddr";
import { Result, ResultAsync, err, ok } from "neverthrow";
import { GossipNode, MAX_MESSAGE_QUEUE_SIZE, GOSSIP_SEEN_TTL } from "./network/p2p/gossipNode.js";
import { GossipNode, MAX_MESSAGE_QUEUE_SIZE, GOSSIP_SEEN_TTL, GossipNodeConfig } from "./network/p2p/gossipNode.js";
import { PeriodicSyncJobScheduler } from "./network/sync/periodicSyncJob.js";
import SyncEngine from "./network/sync/syncEngine.js";
import AdminServer from "./rpc/adminServer.js";
Expand Down Expand Up @@ -416,7 +416,11 @@ export class Hub implements HubInterface {
}

this.rocksDB = new RocksDB(options.rocksDBName ? options.rocksDBName : randomDbName());
this.gossipNode = new GossipNode(this.rocksDB, this.options.network);
const gossipNodeConfig: GossipNodeConfig = {
db: this.rocksDB,
network: this.options.network,
};
this.gossipNode = new GossipNode(gossipNodeConfig);

const eventHandler = new StoreEventHandler(this.rocksDB, {
lockMaxPending: options.commitLockMaxPending,
Expand Down Expand Up @@ -1583,18 +1587,26 @@ export class Hub implements HubInterface {
);
});

this.gossipNode.on("peerConnect", async () => {
this.gossipNode.on("peerConnect", async (details) => {
// When we connect to a new node, gossip out our contact info 1 second later.
// The setTimeout is to ensure that we have a chance to receive the peer's info properly.
setTimeout(async () => {
await this.gossipContactInfo();
}, 1 * 1000);

if (details.remotePeer && details.remoteAddr) {
await this.gossipNode.addPeerToAddressBook(details.remotePeer, details.remoteAddr);
}
statsd().increment("peer_connect.count");
});

this.gossipNode.on("peerDisconnect", async (connection) => {
// Remove this peer's connection
this.syncEngine.removeContactInfoForPeerId(connection.remotePeer.toString());
if (connection.remotePeer) {
this.syncEngine.removeContactInfoForPeerId(connection.remotePeer.toString());
} else {
logger.warn("Peer disconnected without remote peer", connection.id);
}
statsd().increment("peer_disconnect.count");
});
}
Expand Down
10 changes: 5 additions & 5 deletions apps/hubble/src/network/p2p/connectionFilter.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { mockMultiaddrConnPair } from "@libp2p/interface-mocks";
import { PeerId } from "@libp2p/interface-peer-id";
import { mockMultiaddrConnPair } from "@libp2p/interface-compliance-tests/mocks";
import { PeerId } from "@libp2p/interface";
import { createEd25519PeerId } from "@libp2p/peer-id-factory";
import { multiaddr } from "@multiformats/multiaddr";
import { ConnectionFilter } from "./connectionFilter.js";
Expand Down Expand Up @@ -28,7 +28,7 @@ describe("connectionFilter tests", () => {
remotePeer: allowedPeerId,
});
await expect(filter.denyDialPeer(allowedPeerId)).resolves.toBeTruthy();
await expect(filter.denyDialMultiaddr(allowedPeerId, multiaddr(allowedMultiAddrStr))).resolves.toBeTruthy();
await expect(filter.denyDialMultiaddr(multiaddr(allowedMultiAddrStr))).resolves.toBeTruthy();
// Incepient Inbound Connections are always allowed
await expect(filter.denyInboundConnection(remoteConnection)).resolves.toBeFalsy();
await expect(filter.denyInboundEncryptedConnection(allowedPeerId, remoteConnection)).resolves.toBeTruthy();
Expand All @@ -46,7 +46,7 @@ describe("connectionFilter tests", () => {
remotePeer: allowedPeerId,
});
await expect(filter.denyDialPeer(allowedPeerId)).resolves.toBeFalsy();
await expect(filter.denyDialMultiaddr(allowedPeerId, multiaddr(allowedMultiAddrStr))).resolves.toBeFalsy();
await expect(filter.denyDialMultiaddr(multiaddr(allowedMultiAddrStr))).resolves.toBeFalsy();
// Incepient Inbound Connections are always allowed
await expect(filter.denyInboundConnection(remoteConnection)).resolves.toBeFalsy();
await expect(filter.denyInboundEncryptedConnection(allowedPeerId, remoteConnection)).resolves.toBeFalsy();
Expand Down Expand Up @@ -92,7 +92,7 @@ describe("connectionFilter tests", () => {
remotePeer: blockedPeerId,
});
await expect(filter.denyDialPeer(blockedPeerId)).resolves.toBeTruthy();
await expect(filter.denyDialMultiaddr(blockedPeerId, multiaddr(allowedMultiAddrStr))).resolves.toBeTruthy();
await expect(filter.denyDialMultiaddr(multiaddr(filteredMultiAddrStr))).resolves.toBeTruthy();
// Incepient Inbound Connections are always allowed
await expect(filter.denyInboundConnection(remoteConnection)).resolves.toBeFalsy();
await expect(filter.denyOutboundConnection(blockedPeerId, remoteConnection)).resolves.toBeTruthy();
Expand Down
13 changes: 5 additions & 8 deletions apps/hubble/src/network/p2p/connectionFilter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ConnectionGater, MultiaddrConnection } from "@libp2p/interface-connection";
import { PeerId } from "@libp2p/interface-peer-id";
import { ConnectionGater, MultiaddrConnection } from "@libp2p/interface";
import { PeerId } from "@libp2p/interface";
import { Multiaddr } from "@multiformats/multiaddr";
import { logger } from "../../utils/logger.js";

Expand Down Expand Up @@ -42,12 +42,9 @@ export class ConnectionFilter implements ConnectionGater {
return deny;
};

denyDialMultiaddr = async (peerId: PeerId, _multiaddr: Multiaddr): Promise<boolean> => {
const deny = this.shouldDeny(peerId.toString());
if (deny) {
log.info({ peerId, filter: "denyDialMultiaddr" }, "denied a connection");
}
return deny;
denyDialMultiaddr = async (_multiaddr: Multiaddr): Promise<boolean> => {
const peerID = _multiaddr.getPeerId();
return peerID !== null && this.shouldDeny(peerID.toString());
};

denyInboundConnection = async (_maConn: MultiaddrConnection): Promise<boolean> => {
Expand Down
22 changes: 14 additions & 8 deletions apps/hubble/src/network/p2p/gossipNode.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,28 @@ import {
OnChainEvent,
} from "@farcaster/hub-nodejs";
import { multiaddr } from "@multiformats/multiaddr/";
import { GossipNode } from "./gossipNode.js";
import { GossipNode, GossipNodeConfig } from "./gossipNode.js";
import Server from "../../rpc/server.js";
import { jestRocksDB } from "../../storage/db/jestUtils.js";
import { MockHub } from "../../test/mocks.js";
import SyncEngine from "../sync/syncEngine.js";
import { PeerId } from "@libp2p/interface-peer-id";
import { PeerId } from "@libp2p/interface";
import { sleepWhile } from "../../utils/crypto.js";
import { createEd25519PeerId } from "@libp2p/peer-id-factory";
import { LibP2PNode } from "./gossipNodeWorker.js";

const TEST_TIMEOUT_SHORT = 10 * 1000;
const TEST_TIMEOUT_LONG = 30 * 1000;
const db = jestRocksDB("network.p2p.gossipNode.test");
const config: GossipNodeConfig = {
network: FarcasterNetwork.DEVNET,
};

describe("GossipNode", () => {
let node: GossipNode;

beforeEach(() => {
node = new GossipNode();
node = new GossipNode(config);
});

afterEach(async () => {
Expand Down Expand Up @@ -61,7 +64,7 @@ describe("GossipNode", () => {
let result = await node.connectAddress(multiaddr());
expect(result.isErr()).toBeTruthy();

const offlineNode = new GossipNode();
const offlineNode = new GossipNode(config);
result = await node.connect(offlineNode);
expect(result.isErr()).toBeTruthy();
});
Expand All @@ -71,11 +74,11 @@ describe("GossipNode", () => {
async () => {
expect((await node.start([])).isOk()).toBeTruthy();

const node2 = new GossipNode();
const node2 = new GossipNode(config);
expect((await node2.start([])).isOk()).toBeTruthy();

// node 3 has node 1 in its allow list, but not node 2
const node3 = new GossipNode();
const node3 = new GossipNode(config);

if (node.peerId()) {
expect((await node3.start([], { allowedPeerIdStrs: [node.peerId()?.toString() ?? ""] })).isOk()).toBeTruthy();
Expand Down Expand Up @@ -105,7 +108,10 @@ describe("GossipNode", () => {
async () => {
await node.start([]);

const node2 = new GossipNode();
const config: GossipNodeConfig = {
network: FarcasterNetwork.DEVNET,
};
const node2 = new GossipNode(config);
await node2.start([]);

try {
Expand Down Expand Up @@ -138,7 +144,7 @@ describe("GossipNode", () => {
);

describe("gossip messages", () => {
const network = FarcasterNetwork.TESTNET;
const network = config.network ?? FarcasterNetwork.DEVNET;
const fid = Factories.Fid.build();
const signer = Factories.Ed25519Signer.build();
const custodySigner = Factories.Eip712Signer.build();
Expand Down
43 changes: 29 additions & 14 deletions apps/hubble/src/network/p2p/gossipNode.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PublishResult } from "@libp2p/interface-pubsub";
import { PublishResult } from "@libp2p/interface";
import { Worker } from "worker_threads";
import {
ContactInfoContent,
Expand All @@ -12,8 +12,8 @@ import {
Message,
MessageBundle,
} from "@farcaster/hub-nodejs";
import { Connection } from "@libp2p/interface-connection";
import { PeerId } from "@libp2p/interface-peer-id";
import { Connection } from "@libp2p/interface";
import { PeerId } from "@libp2p/interface";
import { peerIdFromBytes, peerIdFromString } from "@libp2p/peer-id";
import { multiaddr, Multiaddr } from "@multiformats/multiaddr";
import { err, ok, Result } from "neverthrow";
Expand Down Expand Up @@ -149,6 +149,11 @@ export type LibP2PNodeMethodGenericMessage = {
};
}[LibP2PInterfaceMethodNames];

export interface GossipNodeConfig {
db?: RocksDB;
network?: FarcasterNetwork;
}

/**
* A GossipNode allows a Hubble instance to connect and gossip messages to its peers.
*
Expand Down Expand Up @@ -179,11 +184,11 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
private _multiaddrs?: Multiaddr[];
private _isStarted = false;

constructor(db?: RocksDB, network?: FarcasterNetwork) {
constructor(config: GossipNodeConfig = {}) {
super();

this._db = db;
this._network = network ?? FarcasterNetwork.NONE;
this._db = config.db;
this._network = config.network ?? FarcasterNetwork.NONE;

// Create a worker thread to run the libp2p node. The path is relative to the current file
// We use the "../../../" so that it works when running tests from the root directory
Expand Down Expand Up @@ -498,12 +503,14 @@ export class GossipNode extends TypedEmitter<NodeEvents> {

async registerListeners() {
this._nodeEvents?.addListener("peer:connect", (detail) => {
// console.log("Peer Connected", JSON.stringify(detail, null, 2));
log.info(
{
peer: detail.remotePeer,
addrs: detail.remoteAddr,
type: detail.stat.direction,
type: detail.direction,
detail: JSON.stringify(detail, null, 2),
detail_remote: detail.remotePeer,
detail_remoteAddr: detail.remoteAddr,
},
"P2P Connection established",
);
Expand All @@ -512,9 +519,13 @@ export class GossipNode extends TypedEmitter<NodeEvents> {

// When we successfully connect to a peer, we store it in the DB, so we can connect to it again
// if we restart
this.putPeerAddrToDB(detail.remotePeer.toString(), detail.remoteAddr.toString());
if (detail.remotePeer && detail.remoteAddr) {
this.putPeerAddrToDB(detail.remotePeer.toString(), detail.remoteAddr.toString());
} else {
log.warn("No peerId or address in connection details");
}
});
this._nodeEvents?.addListener("peer:disconnect", (detail) => {
this._nodeEvents?.addListener("peer:disconnect", (detail: Connection) => {
log.info({ peer: detail.remotePeer }, "P2P Connection disconnected");
this.emit("peerDisconnect", detail);
this.updateStatsdPeerGauges();
Expand Down Expand Up @@ -566,17 +577,21 @@ export class GossipNode extends TypedEmitter<NodeEvents> {

registerDebugListeners() {
this._nodeEvents?.addListener("peer:discovery", (detail) => {
log.info({ identity: this.identity }, `Found peer: ${detail.multiaddrs} }`);
// log.info({ identity: this.identity }, `Found peer: ${detail.multiaddrs} }`);
});
this._nodeEvents?.addListener("peer:connect", (detail) => {
log.info({ identity: this.identity }, `Connection established to: ${detail.remotePeer.toString()}`);
// log.info({ identity: this.identity }, `Connection established to: ${detail.remotePeer.toString()}`);
});
this._nodeEvents?.addListener("peer:disconnect", (detail) => {
log.info({ identity: this.identity }, `Disconnected from: ${detail.remotePeer.toString()} `);
// const conn: Connection = detail;
// log.info({
// peer: conn.remotePeer ? conn.remotePeer.toString() : "unknown-remote-peer",
// identity: this.identity
// }, `Disconnected connection with id ${conn.id} `);
});
this._nodeEvents?.addListener("message", (detail) => {
log.info(
// biome-ignore lint/suspicious/noExplicitAny: legacy code, avoid using ignore for new code
// biome-ignore lint/suspicious/noExplicitAny: legacy code, avoid using ignore for new code
{ identity: this.identity, from: (detail as any)["from"] },
`Received message for topic: ${detail.topic}`,
);
Expand Down
Loading
Loading