diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 4a315d7279..4c56a3d680 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -1,3 +1,5 @@ +import type { PeerId } from "@libp2p/interface"; + import type { IDecodedMessage, IDecoder } from "./message.js"; import type { ContentTopic, PubsubTopic, ThisOrThat } from "./misc.js"; import type { @@ -13,6 +15,7 @@ import type { IReceiver } from "./receiver.js"; export type SubscribeOptions = { keepAlive?: number; + pingsBeforePeerRenewed?: number; }; export type IFilter = IReceiver & IBaseProtocolCore; @@ -26,7 +29,7 @@ export interface ISubscriptionSDK { unsubscribe(contentTopics: ContentTopic[]): Promise; - ping(): Promise; + ping(peerId?: PeerId): Promise; unsubscribeAll(): Promise; } diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 9aeab47285..01368b7964 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -25,7 +25,7 @@ export type IBaseProtocolCore = { }; export type IBaseProtocolSDK = { - renewPeer: (peerToDisconnect: PeerId) => Promise; + renewPeer: (peerToDisconnect: PeerId) => Promise; readonly connectedPeers: Peer[]; readonly numPeersToUse: number; }; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index b1a845df19..6013e9dfbe 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -42,22 +42,24 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { /** * Disconnects from a peer and tries to find a new one to replace it. * @param peerToDisconnect The peer to disconnect from. + * @returns The new peer that was found and connected to. */ - public async renewPeer(peerToDisconnect: PeerId): Promise { + public async renewPeer(peerToDisconnect: PeerId): Promise { this.log.info(`Renewing peer ${peerToDisconnect}`); - try { - await this.connectionManager.dropConnection(peerToDisconnect); - this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect); - this.log.info( - `Peer ${peerToDisconnect} disconnected and removed from the peer list` - ); - - await this.findAndAddPeers(1); - } catch (error) { - this.log.info( - "Peer renewal failed, relying on the interval to find a new peer" + await this.connectionManager.dropConnection(peerToDisconnect); + this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect); + this.log.info( + `Peer ${peerToDisconnect} disconnected and removed from the peer list` + ); + + const peer = (await this.findAndAddPeers(1))[0]; + if (!peer) { + throw new Error( + "Failed to find a new peer to replace the disconnected one" ); } + + return peer; } /** @@ -171,7 +173,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * Finds and adds new peers to the peers list. * @param numPeers The number of peers to find and add. */ - private async findAndAddPeers(numPeers: number): Promise { + private async findAndAddPeers(numPeers: number): Promise { this.log.info(`Finding and adding ${numPeers} new peers`); try { const additionalPeers = await this.findAdditionalPeers(numPeers); @@ -179,6 +181,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { this.log.info( `Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}` ); + return additionalPeers; } catch (error) { this.log.error("Error finding and adding new peers:", error); throw error; @@ -197,20 +200,20 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { try { let newPeers = await this.core.getPeers({ maxBootstrapPeers: 0, - numPeers: numPeers + numPeers: 0 }); if (newPeers.length === 0) { this.log.warn("No new peers found, trying with bootstrap peers"); newPeers = await this.core.getPeers({ maxBootstrapPeers: numPeers, - numPeers: numPeers + numPeers: 0 }); } - newPeers = newPeers.filter( - (peer) => this.peers.some((p) => p.id === peer.id) === false - ); + newPeers = newPeers + .filter((peer) => this.peers.some((p) => p.id === peer.id) === false) + .slice(0, numPeers); return newPeers; } catch (error) { this.log.error("Error finding additional peers:", error); diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index cd3db714e2..c10e9582db 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -1,4 +1,5 @@ import type { Peer } from "@libp2p/interface"; +import type { PeerId } from "@libp2p/interface"; import { ConnectionManager, FilterCore } from "@waku/core"; import { type Callback, @@ -41,6 +42,8 @@ type SubscriptionCallback = { const log = new Logger("sdk:filter"); const MINUTE = 60 * 1000; +const DEFAULT_MAX_PINGS = 3; + const DEFAULT_SUBSCRIBE_OPTIONS = { keepAlive: MINUTE }; @@ -48,6 +51,8 @@ export class SubscriptionManager implements ISubscriptionSDK { private readonly pubsubTopic: PubsubTopic; readonly receivedMessagesHashStr: string[] = []; private keepAliveTimer: number | null = null; + private peerFailures: Map = new Map(); + private maxPingFailures: number = DEFAULT_MAX_PINGS; private subscriptionCallbacks: Map< ContentTopic, @@ -56,18 +61,21 @@ export class SubscriptionManager implements ISubscriptionSDK { constructor( pubsubTopic: PubsubTopic, - private peers: Peer[], - private protocol: FilterCore + private protocol: FilterCore, + private getPeers: () => Peer[], + private readonly renewPeer: (peerToDisconnect: PeerId) => Promise ) { this.pubsubTopic = pubsubTopic; this.subscriptionCallbacks = new Map(); } - async subscribe( + public async subscribe( decoders: IDecoder | IDecoder[], callback: Callback, options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ): Promise { + this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS; + const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; // check that all decoders are configured for the same pubsub topic as this subscription @@ -87,7 +95,7 @@ export class SubscriptionManager implements ISubscriptionSDK { const decodersGroupedByCT = groupByContentTopic(decodersArray); const contentTopics = Array.from(decodersGroupedByCT.keys()); - const promises = this.peers.map(async (peer) => + const promises = this.getPeers().map(async (peer) => this.protocol.subscribe(this.pubsubTopic, peer, contentTopics) ); @@ -111,15 +119,17 @@ export class SubscriptionManager implements ISubscriptionSDK { this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); }); - if (options?.keepAlive) { - this.startKeepAlivePings(options.keepAlive); + if (options.keepAlive) { + this.startKeepAlivePings(options); } return finalResult; } - async unsubscribe(contentTopics: ContentTopic[]): Promise { - const promises = this.peers.map(async (peer) => { + public async unsubscribe( + contentTopics: ContentTopic[] + ): Promise { + const promises = this.getPeers().map(async (peer) => { const response = await this.protocol.unsubscribe( this.pubsubTopic, peer, @@ -143,16 +153,17 @@ export class SubscriptionManager implements ISubscriptionSDK { return finalResult; } - async ping(): Promise { - const promises = this.peers.map(async (peer) => this.protocol.ping(peer)); + public async ping(peerId?: PeerId): Promise { + const peers = peerId ? [peerId] : this.getPeers().map((peer) => peer.id); + const promises = peers.map((peerId) => this.pingSpecificPeer(peerId)); const results = await Promise.allSettled(promises); return this.handleResult(results, "ping"); } - async unsubscribeAll(): Promise { - const promises = this.peers.map(async (peer) => + public async unsubscribeAll(): Promise { + const promises = this.getPeers().map(async (peer) => this.protocol.unsubscribeAll(this.pubsubTopic, peer) ); @@ -217,31 +228,78 @@ export class SubscriptionManager implements ISubscriptionSDK { } } } + return result; + } - // TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463) + private async pingSpecificPeer(peerId: PeerId): Promise { + const peer = this.getPeers().find((p) => p.id.equals(peerId)); + if (!peer) { + return { + success: null, + failure: { + peerId, + error: ProtocolError.NO_PEER_AVAILABLE + } + }; + } - return result; + try { + const result = await this.protocol.ping(peer); + if (result.failure) { + await this.handlePeerFailure(peerId); + } else { + this.peerFailures.delete(peerId.toString()); + } + return result; + } catch (error) { + await this.handlePeerFailure(peerId); + return { + success: null, + failure: { + peerId, + error: ProtocolError.GENERIC_FAIL + } + }; + } + } + + private async handlePeerFailure(peerId: PeerId): Promise { + const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1; + this.peerFailures.set(peerId.toString(), failures); + + if (failures > this.maxPingFailures) { + try { + await this.renewAndSubscribePeer(peerId); + this.peerFailures.delete(peerId.toString()); + } catch (error) { + log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`); + } + } } - private startKeepAlivePings(interval: number): void { + private async renewAndSubscribePeer(peerId: PeerId): Promise { + const newPeer = await this.renewPeer(peerId); + await this.protocol.subscribe( + this.pubsubTopic, + newPeer, + Array.from(this.subscriptionCallbacks.keys()) + ); + + return newPeer; + } + + private startKeepAlivePings(options: SubscribeOptions): void { + const { keepAlive } = options; if (this.keepAliveTimer) { log.info("Recurring pings already set up."); return; } this.keepAliveTimer = setInterval(() => { - const run = async (): Promise => { - try { - log.info("Recurring ping to peers."); - await this.ping(); - } catch (error) { - log.error("Stopping recurring pings due to failure", error); - this.stopKeepAlivePings(); - } - }; - - void run(); - }, interval) as unknown as number; + void this.ping().catch((error) => { + log.error("Error in keep-alive ping cycle:", error); + }); + }, keepAlive) as unknown as number; } private stopKeepAlivePings(): void { @@ -345,7 +403,12 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { this.getActiveSubscription(pubsubTopic) ?? this.setActiveSubscription( pubsubTopic, - new SubscriptionManager(pubsubTopic, this.connectedPeers, this.protocol) + new SubscriptionManager( + pubsubTopic, + this.protocol, + () => this.connectedPeers, + this.renewPeer.bind(this) + ) ); return { diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/light_push.ts index 0ae05056dd..da1dd62af6 100644 --- a/packages/sdk/src/protocols/light_push.ts +++ b/packages/sdk/src/protocols/light_push.ts @@ -86,7 +86,12 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { } if (failure) { if (failure.peerId) { - await this.renewPeer(failure.peerId); + try { + await this.renewPeer(failure.peerId); + log.info("Renewed peer", failure.peerId.toString()); + } catch (error) { + log.error("Failed to renew peer", error); + } } failures.push(failure); diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 1d4354aeba..eaf1218458 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -1,10 +1,15 @@ -import { DefaultPubsubTopic, LightNode } from "@waku/interfaces"; +import { + DefaultPubsubTopic, + ISubscriptionSDK, + LightNode +} from "@waku/interfaces"; import { createDecoder, createEncoder, DecodedMessage, utf8ToBytes } from "@waku/sdk"; +import { delay } from "@waku/utils"; import { expect } from "chai"; import { describe } from "mocha"; @@ -18,12 +23,21 @@ import { teardownNodesWithRedundancy } from "../filter/utils.js"; -//TODO: add unit tests, - describe("Waku Filter: Peer Management: E2E", function () { this.timeout(15000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; + let subscription: ISubscriptionSDK; + + const pubsubTopic = DefaultPubsubTopic; + const contentTopic = "/test"; + + const encoder = createEncoder({ + pubsubTopic, + contentTopic + }); + + const decoder = createDecoder(contentTopic, pubsubTopic); beforeEachCustom(this, async () => { [serviceNodes, waku] = await runMultipleNodes( @@ -32,29 +46,19 @@ describe("Waku Filter: Peer Management: E2E", function () { undefined, 5 ); + const { error, subscription: sub } = + await waku.filter.createSubscription(pubsubTopic); + if (!sub || error) { + throw new Error("Could not create subscription"); + } + subscription = sub; }); afterEachCustom(this, async () => { await teardownNodesWithRedundancy(serviceNodes, waku); }); - const pubsubTopic = DefaultPubsubTopic; - const contentTopic = "/test"; - - const encoder = createEncoder({ - pubsubTopic, - contentTopic - }); - - const decoder = createDecoder(contentTopic, pubsubTopic); - it("Number of peers are maintained correctly", async function () { - const { error, subscription } = - await waku.filter.createSubscription(pubsubTopic); - if (!subscription || error) { - expect.fail("Could not create subscription"); - } - const messages: DecodedMessage[] = []; const { failures, successes } = await subscription.subscribe( [decoder], @@ -74,4 +78,103 @@ describe("Waku Filter: Peer Management: E2E", function () { expect(failures.length).to.equal(0); } }); + + it("Ping succeeds for all connected peers", async function () { + await subscription.subscribe([decoder], () => {}); + const pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); + expect(pingResult.failures.length).to.equal(0); + }); + + it("Ping fails for unsubscribed peers", async function () { + const pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(0); + expect(pingResult.failures.length).to.be.greaterThan(0); + }); + + it("Keep-alive pings maintain the connection", async function () { + await subscription.subscribe([decoder], () => {}, { keepAlive: 100 }); + + await delay(1000); + + const pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); + expect(pingResult.failures.length).to.equal(0); + }); + + it("Renews peer on consistent ping failures", async function () { + const maxPingFailures = 3; + await subscription.subscribe([decoder], () => {}, { + pingsBeforePeerRenewed: maxPingFailures + }); + + const disconnectedNodePeerId = waku.filter.connectedPeers[0].id; + await waku.connectionManager.dropConnection(disconnectedNodePeerId); + + // Ping multiple times to exceed max failures + for (let i = 0; i <= maxPingFailures; i++) { + await subscription.ping(); + await delay(100); + } + + const pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); + expect(pingResult.failures.length).to.equal(0); + + expect(waku.filter.connectedPeers.length).to.equal( + waku.filter.numPeersToUse + ); + expect( + waku.filter.connectedPeers.some((peer) => + peer.id.equals(disconnectedNodePeerId) + ) + ).to.eq(false); + }); + + it("Tracks peer failures correctly", async function () { + const maxPingFailures = 3; + await subscription.subscribe([decoder], () => {}, { + pingsBeforePeerRenewed: maxPingFailures + }); + + const targetPeer = waku.filter.connectedPeers[0]; + await waku.connectionManager.dropConnection(targetPeer.id); + + for (let i = 0; i < maxPingFailures; i++) { + await subscription.ping(targetPeer.id); + } + + // At this point, the peer should not be renewed yet + expect( + waku.filter.connectedPeers.some((peer) => peer.id.equals(targetPeer.id)) + ).to.be.true; + + // One more failure should trigger renewal + await subscription.ping(targetPeer.id); + + expect( + waku.filter.connectedPeers.some((peer) => peer.id.equals(targetPeer.id)) + ).to.be.false; + expect(waku.filter.connectedPeers.length).to.equal( + waku.filter.numPeersToUse + ); + }); + + it("Maintains correct number of peers after multiple subscribe/unsubscribe cycles", async function () { + for (let i = 0; i < 3; i++) { + await subscription.subscribe([decoder], () => {}); + let pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); + + await subscription.unsubscribe([contentTopic]); + pingResult = await subscription.ping(); + expect(pingResult.failures.length).to.be.greaterThan(0); + } + + await subscription.subscribe([decoder], () => {}); + const finalPingResult = await subscription.ping(); + expect(finalPingResult.successes.length).to.equal( + waku.filter.numPeersToUse + ); + }); });