From a24abfe5f3495c363758a041b6e4c34a7aa6f37d Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Mon, 24 Jun 2024 14:50:34 -0400 Subject: [PATCH 01/15] chore: renewPeer() returns the new found peer --- packages/interfaces/src/protocols.ts | 2 +- packages/sdk/src/protocols/base_protocol.ts | 34 +++++++++------------ 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 9aeab47285..01368b7964 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -25,7 +25,7 @@ export type IBaseProtocolCore = { }; export type IBaseProtocolSDK = { - renewPeer: (peerToDisconnect: PeerId) => Promise; + renewPeer: (peerToDisconnect: PeerId) => Promise; readonly connectedPeers: Peer[]; readonly numPeersToUse: number; }; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index b1a845df19..7f2240fb94 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -42,22 +42,17 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { /** * Disconnects from a peer and tries to find a new one to replace it. * @param peerToDisconnect The peer to disconnect from. + * @returns The new peer that was found and connected to. */ - public async renewPeer(peerToDisconnect: PeerId): Promise { + public async renewPeer(peerToDisconnect: PeerId): Promise { this.log.info(`Renewing peer ${peerToDisconnect}`); - try { - await this.connectionManager.dropConnection(peerToDisconnect); - this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect); - this.log.info( - `Peer ${peerToDisconnect} disconnected and removed from the peer list` - ); + await this.connectionManager.dropConnection(peerToDisconnect); + this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect); + this.log.info( + `Peer ${peerToDisconnect} disconnected and removed from the peer list` + ); - await this.findAndAddPeers(1); - } catch (error) { - this.log.info( - "Peer renewal failed, relying on the interval to find a new peer" - ); - } + return (await this.findAndAddPeers(1))[0]; } /** @@ -171,7 +166,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * Finds and adds new peers to the peers list. * @param numPeers The number of peers to find and add. */ - private async findAndAddPeers(numPeers: number): Promise { + private async findAndAddPeers(numPeers: number): Promise { this.log.info(`Finding and adding ${numPeers} new peers`); try { const additionalPeers = await this.findAdditionalPeers(numPeers); @@ -179,6 +174,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { this.log.info( `Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}` ); + return additionalPeers; } catch (error) { this.log.error("Error finding and adding new peers:", error); throw error; @@ -197,20 +193,20 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { try { let newPeers = await this.core.getPeers({ maxBootstrapPeers: 0, - numPeers: numPeers + numPeers: 0 }); if (newPeers.length === 0) { this.log.warn("No new peers found, trying with bootstrap peers"); newPeers = await this.core.getPeers({ maxBootstrapPeers: numPeers, - numPeers: numPeers + numPeers: 0 }); } - newPeers = newPeers.filter( - (peer) => this.peers.some((p) => p.id === peer.id) === false - ); + newPeers = newPeers + .filter((peer) => this.peers.some((p) => p.id === peer.id) === false) + .slice(0, numPeers); return newPeers; } catch (error) { this.log.error("Error finding additional peers:", error); From e5229426018091ed0fc235b2bd13cb2c4046a24e Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Mon, 24 Jun 2024 15:07:13 -0400 Subject: [PATCH 02/15] feat: ping & peer renewal --- packages/interfaces/src/filter.ts | 1 + packages/sdk/src/protocols/filter.ts | 106 ++++++++++++++++++++++----- 2 files changed, 88 insertions(+), 19 deletions(-) diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 4a315d7279..2446ce08a9 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -13,6 +13,7 @@ import type { IReceiver } from "./receiver.js"; export type SubscribeOptions = { keepAlive?: number; + pingsBeforePeerRenewed?: number; }; export type IFilter = IReceiver & IBaseProtocolCore; diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index cd3db714e2..ed26f5d66b 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -1,4 +1,5 @@ import type { Peer } from "@libp2p/interface"; +import type { PeerId } from "@libp2p/interface"; import { ConnectionManager, FilterCore } from "@waku/core"; import { type Callback, @@ -42,7 +43,8 @@ const log = new Logger("sdk:filter"); const MINUTE = 60 * 1000; const DEFAULT_SUBSCRIBE_OPTIONS = { - keepAlive: MINUTE + keepAlive: MINUTE, + pingsBeforePeerRenewed: 3 }; export class SubscriptionManager implements ISubscriptionSDK { private readonly pubsubTopic: PubsubTopic; @@ -56,8 +58,9 @@ export class SubscriptionManager implements ISubscriptionSDK { constructor( pubsubTopic: PubsubTopic, + private protocol: FilterCore, private peers: Peer[], - private protocol: FilterCore + private readonly renewPeer: (peerToDisconnect: PeerId) => Promise ) { this.pubsubTopic = pubsubTopic; this.subscriptionCallbacks = new Map(); @@ -111,8 +114,8 @@ export class SubscriptionManager implements ISubscriptionSDK { this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); }); - if (options?.keepAlive) { - this.startKeepAlivePings(options.keepAlive); + if (options.keepAlive) { + this.startKeepAlivePings(options); } return finalResult; @@ -143,8 +146,31 @@ export class SubscriptionManager implements ISubscriptionSDK { return finalResult; } - async ping(): Promise { - const promises = this.peers.map(async (peer) => this.protocol.ping(peer)); + async ping(peerId?: PeerId): Promise { + if (peerId) { + const peer = this.peers.find((p) => p.id.equals(peerId)); + if (!peer) { + log.warn( + "Peer not found in connected peers. Looks like the peer is already disconnected." + ); + return { + failures: [ + { + peerId, + error: ProtocolError.NO_PEER_AVAILABLE + } + ], + successes: [] + }; + } + + const { failure, success } = await this.protocol.ping(peer); + return failure + ? { failures: [failure], successes: [] } + : { failures: [], successes: [success] }; + } + + const promises = this.peers.map((peer) => this.protocol.ping(peer)); const results = await Promise.allSettled(promises); @@ -223,25 +249,62 @@ export class SubscriptionManager implements ISubscriptionSDK { return result; } - private startKeepAlivePings(interval: number): void { + private startKeepAlivePings(options: SubscribeOptions): void { + const { keepAlive, pingsBeforePeerRenewed = 1 } = options; if (this.keepAliveTimer) { log.info("Recurring pings already set up."); return; } - this.keepAliveTimer = setInterval(() => { - const run = async (): Promise => { - try { - log.info("Recurring ping to peers."); - await this.ping(); - } catch (error) { - log.error("Stopping recurring pings due to failure", error); - this.stopKeepAlivePings(); + const pingCycle = async (): Promise => { + log.info("Starting keep-alive ping cycle"); + const { failures } = await this.ping(); + for (const failure of failures) { + if (failure.peerId) { + let attempts = pingsBeforePeerRenewed; + while (attempts > 0) { + const { failures: pingFailures } = await this.ping(failure.peerId); + if (pingFailures.length === 0) { + log.info( + `Successfully pinged peer ${failure.peerId.toString()} after failure.` + ); + break; + } + log.error( + `Failed to ping peer ${failure.peerId.toString()}, retrying.` + ); + attempts--; + } + if (attempts === 0) { + log.error( + `Failed to ping peer ${failure.peerId.toString()} after multiple attempts. Renewing peer.` + ); + try { + const newPeer = await this.renewPeer(failure.peerId); + await this.protocol.subscribe( + this.pubsubTopic, + newPeer, + Array.from(this.subscriptionCallbacks.keys()) + ); + log.info( + `Renewed and resubscribed peer ${newPeer.id.toString()}` + ); + } catch (error) { + log.error( + `Failed to renew peer ${failure.peerId.toString()}: ${error}` + ); + } + } } - }; + } + log.info("Finished keep-alive ping cycle"); + }; - void run(); - }, interval) as unknown as number; + this.keepAliveTimer = setInterval(() => { + void pingCycle().catch((error) => { + log.error("Error in keep-alive ping cycle:", error); + }); + }, keepAlive) as unknown as number; } private stopKeepAlivePings(): void { @@ -345,7 +408,12 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { this.getActiveSubscription(pubsubTopic) ?? this.setActiveSubscription( pubsubTopic, - new SubscriptionManager(pubsubTopic, this.connectedPeers, this.protocol) + new SubscriptionManager( + pubsubTopic, + this.protocol, + this.connectedPeers, + this.renewPeer.bind(this) + ) ); return { From 04f806789247dad0dd65efc4253d619fb9003ec9 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Mon, 24 Jun 2024 15:11:31 -0400 Subject: [PATCH 03/15] chore: add tests --- .../tests/filter/peer_management.spec.ts | 105 +++++++++++++++--- 1 file changed, 87 insertions(+), 18 deletions(-) diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 1d4354aeba..758817d29d 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -1,10 +1,15 @@ -import { DefaultPubsubTopic, LightNode } from "@waku/interfaces"; +import { + DefaultPubsubTopic, + ISubscriptionSDK, + LightNode +} from "@waku/interfaces"; import { createDecoder, createEncoder, DecodedMessage, utf8ToBytes } from "@waku/sdk"; +import { delay } from "@waku/utils"; import { expect } from "chai"; import { describe } from "mocha"; @@ -21,9 +26,20 @@ import { //TODO: add unit tests, describe("Waku Filter: Peer Management: E2E", function () { - this.timeout(15000); + this.timeout(30000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; + let subscription: ISubscriptionSDK; + + const pubsubTopic = DefaultPubsubTopic; + const contentTopic = "/test"; + + const encoder = createEncoder({ + pubsubTopic, + contentTopic + }); + + const decoder = createDecoder(contentTopic, pubsubTopic); beforeEachCustom(this, async () => { [serviceNodes, waku] = await runMultipleNodes( @@ -32,29 +48,19 @@ describe("Waku Filter: Peer Management: E2E", function () { undefined, 5 ); + const { error, subscription: sub } = + await waku.filter.createSubscription(pubsubTopic); + if (!sub || error) { + throw new Error("Could not create subscription"); + } + subscription = sub; }); afterEachCustom(this, async () => { await teardownNodesWithRedundancy(serviceNodes, waku); }); - const pubsubTopic = DefaultPubsubTopic; - const contentTopic = "/test"; - - const encoder = createEncoder({ - pubsubTopic, - contentTopic - }); - - const decoder = createDecoder(contentTopic, pubsubTopic); - it("Number of peers are maintained correctly", async function () { - const { error, subscription } = - await waku.filter.createSubscription(pubsubTopic); - if (!subscription || error) { - expect.fail("Could not create subscription"); - } - const messages: DecodedMessage[] = []; const { failures, successes } = await subscription.subscribe( [decoder], @@ -74,4 +80,67 @@ describe("Waku Filter: Peer Management: E2E", function () { expect(failures.length).to.equal(0); } }); + + it("Ping succeeds for all connected peers", async function () { + await subscription.subscribe([decoder], () => {}); + const pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); + expect(pingResult.failures.length).to.equal(0); + }); + + it("Ping fails for unsubscribed peers", async function () { + const pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(0); + expect(pingResult.failures.length).to.be.greaterThan(0); + }); + + it("Keep-alive pings maintain the connection", async function () { + await subscription.subscribe([decoder], () => {}, { keepAlive: 100 }); + + await delay(1000); + + const pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); + expect(pingResult.failures.length).to.equal(0); + }); + + it("Renews peer on consistent ping failures", async function () { + await subscription.subscribe([decoder], () => {}, { keepAlive: 100 }); + + const disconnectedNodePeerId = waku.filter.connectedPeers[0].id; + await waku.connectionManager.dropConnection(disconnectedNodePeerId); + + await delay(1000); + + const pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); + expect(pingResult.failures.length).to.equal(0); + + expect(waku.filter.connectedPeers.length).to.equal( + waku.filter.numPeersToUse + ); + expect( + waku.filter.connectedPeers.some((peer) => + peer.id.equals(disconnectedNodePeerId) + ) + ).to.eq(false); + }); + + it("Maintains correct number of peers after multiple subscribe/unsubscribe cycles", async function () { + for (let i = 0; i < 3; i++) { + await subscription.subscribe([decoder], () => {}); + let pingResult = await subscription.ping(); + expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); + + await subscription.unsubscribe([contentTopic]); + pingResult = await subscription.ping(); + expect(pingResult.failures.length).to.be.greaterThan(0); + } + + await subscription.subscribe([decoder], () => {}); + const finalPingResult = await subscription.ping(); + expect(finalPingResult.successes.length).to.equal( + waku.filter.numPeersToUse + ); + }); }); From 37f4d1bdd001e9d0a033b3ef5a35466e0a91ce18 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 3 Jul 2024 15:52:06 +0530 Subject: [PATCH 04/15] fix: tests --- packages/sdk/src/protocols/filter.ts | 15 ++++++++------- .../tests/tests/filter/peer_management.spec.ts | 6 +++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index ed26f5d66b..9510a1bcea 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -59,7 +59,8 @@ export class SubscriptionManager implements ISubscriptionSDK { constructor( pubsubTopic: PubsubTopic, private protocol: FilterCore, - private peers: Peer[], + // private peers: Peer[], + private getPeers: () => Peer[], private readonly renewPeer: (peerToDisconnect: PeerId) => Promise ) { this.pubsubTopic = pubsubTopic; @@ -90,7 +91,7 @@ export class SubscriptionManager implements ISubscriptionSDK { const decodersGroupedByCT = groupByContentTopic(decodersArray); const contentTopics = Array.from(decodersGroupedByCT.keys()); - const promises = this.peers.map(async (peer) => + const promises = this.getPeers().map(async (peer) => this.protocol.subscribe(this.pubsubTopic, peer, contentTopics) ); @@ -122,7 +123,7 @@ export class SubscriptionManager implements ISubscriptionSDK { } async unsubscribe(contentTopics: ContentTopic[]): Promise { - const promises = this.peers.map(async (peer) => { + const promises = this.getPeers().map(async (peer) => { const response = await this.protocol.unsubscribe( this.pubsubTopic, peer, @@ -148,7 +149,7 @@ export class SubscriptionManager implements ISubscriptionSDK { async ping(peerId?: PeerId): Promise { if (peerId) { - const peer = this.peers.find((p) => p.id.equals(peerId)); + const peer = this.getPeers().find((p) => p.id.equals(peerId)); if (!peer) { log.warn( "Peer not found in connected peers. Looks like the peer is already disconnected." @@ -170,7 +171,7 @@ export class SubscriptionManager implements ISubscriptionSDK { : { failures: [], successes: [success] }; } - const promises = this.peers.map((peer) => this.protocol.ping(peer)); + const promises = this.getPeers().map((peer) => this.protocol.ping(peer)); const results = await Promise.allSettled(promises); @@ -178,7 +179,7 @@ export class SubscriptionManager implements ISubscriptionSDK { } async unsubscribeAll(): Promise { - const promises = this.peers.map(async (peer) => + const promises = this.getPeers().map(async (peer) => this.protocol.unsubscribeAll(this.pubsubTopic, peer) ); @@ -411,7 +412,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { new SubscriptionManager( pubsubTopic, this.protocol, - this.connectedPeers, + () => this.connectedPeers, this.renewPeer.bind(this) ) ); diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 758817d29d..3b0e387301 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -104,13 +104,13 @@ describe("Waku Filter: Peer Management: E2E", function () { expect(pingResult.failures.length).to.equal(0); }); - it("Renews peer on consistent ping failures", async function () { - await subscription.subscribe([decoder], () => {}, { keepAlive: 100 }); + it.only("Renews peer on consistent ping failures", async function () { + await subscription.subscribe([decoder], () => {}, { keepAlive: 300 }); const disconnectedNodePeerId = waku.filter.connectedPeers[0].id; await waku.connectionManager.dropConnection(disconnectedNodePeerId); - await delay(1000); + await delay(5700); const pingResult = await subscription.ping(); expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); From cf6e67a0fb45beea89bffb18b9fb0b1797bcfaa3 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 3 Jul 2024 15:56:40 +0530 Subject: [PATCH 05/15] chore: remove only --- packages/tests/tests/filter/peer_management.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 3b0e387301..b4636e7ef9 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -104,13 +104,13 @@ describe("Waku Filter: Peer Management: E2E", function () { expect(pingResult.failures.length).to.equal(0); }); - it.only("Renews peer on consistent ping failures", async function () { + it("Renews peer on consistent ping failures", async function () { await subscription.subscribe([decoder], () => {}, { keepAlive: 300 }); const disconnectedNodePeerId = waku.filter.connectedPeers[0].id; await waku.connectionManager.dropConnection(disconnectedNodePeerId); - await delay(5700); + await delay(1000); const pingResult = await subscription.ping(); expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); From b985d267b7da00f7c9dcd69d2c70be9a0a65014a Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 3 Jul 2024 16:11:31 +0530 Subject: [PATCH 06/15] chore: remove comments --- packages/sdk/src/protocols/filter.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 9510a1bcea..2446fd83ab 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -59,7 +59,6 @@ export class SubscriptionManager implements ISubscriptionSDK { constructor( pubsubTopic: PubsubTopic, private protocol: FilterCore, - // private peers: Peer[], private getPeers: () => Peer[], private readonly renewPeer: (peerToDisconnect: PeerId) => Promise ) { From 515b975e707c392d0fa949b4c924e4ffcff5865b Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 3 Jul 2024 18:57:13 +0530 Subject: [PATCH 07/15] chore(tests): decrease timeout --- packages/tests/tests/filter/peer_management.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index b4636e7ef9..4894e9e9b5 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -25,8 +25,8 @@ import { //TODO: add unit tests, -describe("Waku Filter: Peer Management: E2E", function () { - this.timeout(30000); +describe.only("Waku Filter: Peer Management: E2E", function () { + this.timeout(15000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; let subscription: ISubscriptionSDK; From e6569fb9668dc87ad23bcde95d823170163d4ca1 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 3 Jul 2024 19:08:01 +0530 Subject: [PATCH 08/15] chore: add array index validation --- packages/sdk/src/protocols/base_protocol.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 7f2240fb94..6013e9dfbe 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -52,7 +52,14 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { `Peer ${peerToDisconnect} disconnected and removed from the peer list` ); - return (await this.findAndAddPeers(1))[0]; + const peer = (await this.findAndAddPeers(1))[0]; + if (!peer) { + throw new Error( + "Failed to find a new peer to replace the disconnected one" + ); + } + + return peer; } /** From e060a63bc5029a439114a6188b4d77062bf51f53 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 3 Jul 2024 20:27:55 +0530 Subject: [PATCH 09/15] chore: remove only --- packages/tests/tests/filter/peer_management.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 4894e9e9b5..7a666e2348 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -25,7 +25,7 @@ import { //TODO: add unit tests, -describe.only("Waku Filter: Peer Management: E2E", function () { +describe("Waku Filter: Peer Management: E2E", function () { this.timeout(15000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; From d52851459dea0ac80518ebe99211d294f77d6aed Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Mon, 8 Jul 2024 13:27:31 +0530 Subject: [PATCH 10/15] chore: move defaults into a separate variable --- packages/sdk/src/protocols/filter.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 2446fd83ab..2ce91ee882 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -42,9 +42,10 @@ type SubscriptionCallback = { const log = new Logger("sdk:filter"); const MINUTE = 60 * 1000; +const DEFAULT_PINGS = 3; const DEFAULT_SUBSCRIBE_OPTIONS = { keepAlive: MINUTE, - pingsBeforePeerRenewed: 3 + pingsBeforePeerRenewed: DEFAULT_PINGS }; export class SubscriptionManager implements ISubscriptionSDK { private readonly pubsubTopic: PubsubTopic; From 6a4888f98a113d5b6baa4c1fad06526e7fe5692d Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Mon, 8 Jul 2024 13:31:07 +0530 Subject: [PATCH 11/15] chore: update lightpush with new API --- packages/sdk/src/protocols/filter.ts | 1 + packages/sdk/src/protocols/light_push.ts | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 2ce91ee882..1c56e2854f 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -43,6 +43,7 @@ const log = new Logger("sdk:filter"); const MINUTE = 60 * 1000; const DEFAULT_PINGS = 3; + const DEFAULT_SUBSCRIBE_OPTIONS = { keepAlive: MINUTE, pingsBeforePeerRenewed: DEFAULT_PINGS diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/light_push.ts index 0ae05056dd..da1dd62af6 100644 --- a/packages/sdk/src/protocols/light_push.ts +++ b/packages/sdk/src/protocols/light_push.ts @@ -86,7 +86,12 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { } if (failure) { if (failure.peerId) { - await this.renewPeer(failure.peerId); + try { + await this.renewPeer(failure.peerId); + log.info("Renewed peer", failure.peerId.toString()); + } catch (error) { + log.error("Failed to renew peer", error); + } } failures.push(failure); From 13dd02590d226f9a8aeb5326ca0707992cf0b363 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 9 Jul 2024 18:00:03 +0530 Subject: [PATCH 12/15] chore: include peer renewals within `ping` instead of `interval` --- packages/sdk/src/protocols/filter.ts | 154 +++++++++++++-------------- 1 file changed, 77 insertions(+), 77 deletions(-) diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 1c56e2854f..fc366a1331 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -42,16 +42,17 @@ type SubscriptionCallback = { const log = new Logger("sdk:filter"); const MINUTE = 60 * 1000; -const DEFAULT_PINGS = 3; +const DEFAULT_MAX_PINGS = 3; const DEFAULT_SUBSCRIBE_OPTIONS = { - keepAlive: MINUTE, - pingsBeforePeerRenewed: DEFAULT_PINGS + keepAlive: MINUTE }; export class SubscriptionManager implements ISubscriptionSDK { private readonly pubsubTopic: PubsubTopic; readonly receivedMessagesHashStr: string[] = []; private keepAliveTimer: number | null = null; + private peerFailures: Map = new Map(); + private maxPingFailures: number = DEFAULT_MAX_PINGS; private subscriptionCallbacks: Map< ContentTopic, @@ -68,11 +69,13 @@ export class SubscriptionManager implements ISubscriptionSDK { this.subscriptionCallbacks = new Map(); } - async subscribe( + public async subscribe( decoders: IDecoder | IDecoder[], callback: Callback, options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ): Promise { + this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS; + const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; // check that all decoders are configured for the same pubsub topic as this subscription @@ -123,7 +126,9 @@ export class SubscriptionManager implements ISubscriptionSDK { return finalResult; } - async unsubscribe(contentTopics: ContentTopic[]): Promise { + public async unsubscribe( + contentTopics: ContentTopic[] + ): Promise { const promises = this.getPeers().map(async (peer) => { const response = await this.protocol.unsubscribe( this.pubsubTopic, @@ -148,38 +153,23 @@ export class SubscriptionManager implements ISubscriptionSDK { return finalResult; } - async ping(peerId?: PeerId): Promise { + public async ping(peerId?: PeerId): Promise { if (peerId) { - const peer = this.getPeers().find((p) => p.id.equals(peerId)); - if (!peer) { - log.warn( - "Peer not found in connected peers. Looks like the peer is already disconnected." - ); - return { - failures: [ - { - peerId, - error: ProtocolError.NO_PEER_AVAILABLE - } - ], - successes: [] - }; - } - - const { failure, success } = await this.protocol.ping(peer); - return failure - ? { failures: [failure], successes: [] } - : { failures: [], successes: [success] }; + const result = await this.pingSpecificPeer(peerId); + return result.failure + ? { failures: [result.failure], successes: [] } + : { failures: [], successes: [result.success!] }; } - const promises = this.getPeers().map((peer) => this.protocol.ping(peer)); - + const promises = this.getPeers().map((peer) => + this.pingSpecificPeer(peer.id) + ); const results = await Promise.allSettled(promises); return this.handleResult(results, "ping"); } - async unsubscribeAll(): Promise { + public async unsubscribeAll(): Promise { const promises = this.getPeers().map(async (peer) => this.protocol.unsubscribeAll(this.pubsubTopic, peer) ); @@ -245,65 +235,75 @@ export class SubscriptionManager implements ISubscriptionSDK { } } } + return result; + } - // TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463) + private async pingSpecificPeer(peerId: PeerId): Promise { + const peer = this.getPeers().find((p) => p.id.equals(peerId)); + if (!peer) { + return { + success: null, + failure: { + peerId, + error: ProtocolError.NO_PEER_AVAILABLE + } + }; + } - return result; + try { + const result = await this.protocol.ping(peer); + if (result.failure) { + await this.handlePeerFailure(peerId); + } else { + this.peerFailures.delete(peerId.toString()); + } + return result; + } catch (error) { + await this.handlePeerFailure(peerId); + return { + success: null, + failure: { + peerId, + error: ProtocolError.GENERIC_FAIL + } + }; + } + } + + private async handlePeerFailure(peerId: PeerId): Promise { + const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1; + this.peerFailures.set(peerId.toString(), failures); + + if (failures > this.maxPingFailures) { + try { + await this._renewPeer(peerId); + this.peerFailures.delete(peerId.toString()); + } catch (error) { + log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`); + } + } + } + + private async _renewPeer(peerId: PeerId): Promise { + const newPeer = await this.renewPeer(peerId); + await this.protocol.subscribe( + this.pubsubTopic, + newPeer, + Array.from(this.subscriptionCallbacks.keys()) + ); + + return newPeer; } private startKeepAlivePings(options: SubscribeOptions): void { - const { keepAlive, pingsBeforePeerRenewed = 1 } = options; + const { keepAlive } = options; if (this.keepAliveTimer) { log.info("Recurring pings already set up."); return; } - const pingCycle = async (): Promise => { - log.info("Starting keep-alive ping cycle"); - const { failures } = await this.ping(); - for (const failure of failures) { - if (failure.peerId) { - let attempts = pingsBeforePeerRenewed; - while (attempts > 0) { - const { failures: pingFailures } = await this.ping(failure.peerId); - if (pingFailures.length === 0) { - log.info( - `Successfully pinged peer ${failure.peerId.toString()} after failure.` - ); - break; - } - log.error( - `Failed to ping peer ${failure.peerId.toString()}, retrying.` - ); - attempts--; - } - if (attempts === 0) { - log.error( - `Failed to ping peer ${failure.peerId.toString()} after multiple attempts. Renewing peer.` - ); - try { - const newPeer = await this.renewPeer(failure.peerId); - await this.protocol.subscribe( - this.pubsubTopic, - newPeer, - Array.from(this.subscriptionCallbacks.keys()) - ); - log.info( - `Renewed and resubscribed peer ${newPeer.id.toString()}` - ); - } catch (error) { - log.error( - `Failed to renew peer ${failure.peerId.toString()}: ${error}` - ); - } - } - } - } - log.info("Finished keep-alive ping cycle"); - }; - this.keepAliveTimer = setInterval(() => { - void pingCycle().catch((error) => { + void this.ping().catch((error) => { log.error("Error in keep-alive ping cycle:", error); }); }, keepAlive) as unknown as number; From e69ac454e1588525bdb2f73c636d0f314f3c1f79 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 9 Jul 2024 18:02:41 +0530 Subject: [PATCH 13/15] chore: update tests --- .../tests/tests/filter/peer_management.spec.ts | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 7a666e2348..dd3fd739b6 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -23,9 +23,7 @@ import { teardownNodesWithRedundancy } from "../filter/utils.js"; -//TODO: add unit tests, - -describe("Waku Filter: Peer Management: E2E", function () { +describe.only("Waku Filter: Peer Management: E2E", function () { this.timeout(15000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; @@ -105,12 +103,19 @@ describe("Waku Filter: Peer Management: E2E", function () { }); it("Renews peer on consistent ping failures", async function () { - await subscription.subscribe([decoder], () => {}, { keepAlive: 300 }); + const maxPingFailures = 3; + await subscription.subscribe([decoder], () => {}, { + pingsBeforePeerRenewed: maxPingFailures + }); const disconnectedNodePeerId = waku.filter.connectedPeers[0].id; await waku.connectionManager.dropConnection(disconnectedNodePeerId); - await delay(1000); + // Ping multiple times to exceed max failures + for (let i = 0; i <= maxPingFailures; i++) { + await subscription.ping(); + await delay(100); + } const pingResult = await subscription.ping(); expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse); From 438ac03b1827fd05608c2a2de19ddb3148f1cddb Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 9 Jul 2024 18:06:48 +0530 Subject: [PATCH 14/15] chore: add new test --- packages/interfaces/src/filter.ts | 4 ++- .../tests/filter/peer_management.spec.ts | 29 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 2446ce08a9..4c56a3d680 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -1,3 +1,5 @@ +import type { PeerId } from "@libp2p/interface"; + import type { IDecodedMessage, IDecoder } from "./message.js"; import type { ContentTopic, PubsubTopic, ThisOrThat } from "./misc.js"; import type { @@ -27,7 +29,7 @@ export interface ISubscriptionSDK { unsubscribe(contentTopics: ContentTopic[]): Promise; - ping(): Promise; + ping(peerId?: PeerId): Promise; unsubscribeAll(): Promise; } diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index dd3fd739b6..f5e9c47cb2 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -131,6 +131,35 @@ describe.only("Waku Filter: Peer Management: E2E", function () { ).to.eq(false); }); + it("Tracks peer failures correctly", async function () { + const maxPingFailures = 3; + await subscription.subscribe([decoder], () => {}, { + pingsBeforePeerRenewed: maxPingFailures + }); + + const targetPeer = waku.filter.connectedPeers[0]; + await waku.connectionManager.dropConnection(targetPeer.id); + + for (let i = 0; i < maxPingFailures; i++) { + await subscription.ping(targetPeer.id); + } + + // At this point, the peer should not be renewed yet + expect( + waku.filter.connectedPeers.some((peer) => peer.id.equals(targetPeer.id)) + ).to.be.true; + + // One more failure should trigger renewal + await subscription.ping(targetPeer.id); + + expect( + waku.filter.connectedPeers.some((peer) => peer.id.equals(targetPeer.id)) + ).to.be.false; + expect(waku.filter.connectedPeers.length).to.equal( + waku.filter.numPeersToUse + ); + }); + it("Maintains correct number of peers after multiple subscribe/unsubscribe cycles", async function () { for (let i = 0; i < 3; i++) { await subscription.subscribe([decoder], () => {}); From 3516b43f4bcf443784ad6823afd2996937165c65 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 10 Jul 2024 14:19:14 +0530 Subject: [PATCH 15/15] chore: address comments --- packages/sdk/src/protocols/filter.ts | 15 ++++----------- .../tests/tests/filter/peer_management.spec.ts | 2 +- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index fc366a1331..c10e9582db 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -154,16 +154,9 @@ export class SubscriptionManager implements ISubscriptionSDK { } public async ping(peerId?: PeerId): Promise { - if (peerId) { - const result = await this.pingSpecificPeer(peerId); - return result.failure - ? { failures: [result.failure], successes: [] } - : { failures: [], successes: [result.success!] }; - } + const peers = peerId ? [peerId] : this.getPeers().map((peer) => peer.id); - const promises = this.getPeers().map((peer) => - this.pingSpecificPeer(peer.id) - ); + const promises = peers.map((peerId) => this.pingSpecificPeer(peerId)); const results = await Promise.allSettled(promises); return this.handleResult(results, "ping"); @@ -276,7 +269,7 @@ export class SubscriptionManager implements ISubscriptionSDK { if (failures > this.maxPingFailures) { try { - await this._renewPeer(peerId); + await this.renewAndSubscribePeer(peerId); this.peerFailures.delete(peerId.toString()); } catch (error) { log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`); @@ -284,7 +277,7 @@ export class SubscriptionManager implements ISubscriptionSDK { } } - private async _renewPeer(peerId: PeerId): Promise { + private async renewAndSubscribePeer(peerId: PeerId): Promise { const newPeer = await this.renewPeer(peerId); await this.protocol.subscribe( this.pubsubTopic, diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index f5e9c47cb2..eaf1218458 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -23,7 +23,7 @@ import { teardownNodesWithRedundancy } from "../filter/utils.js"; -describe.only("Waku Filter: Peer Management: E2E", function () { +describe("Waku Filter: Peer Management: E2E", function () { this.timeout(15000); let waku: LightNode; let serviceNodes: ServiceNodesFleet;