From 2ec8ebd77caf2d2d872c0b5bdbcb7766c72a05e5 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Fri, 21 Jun 2024 17:40:42 -0400 Subject: [PATCH 1/4] feat: leverage protocol peer management --- packages/interfaces/src/filter.ts | 3 +- packages/interfaces/src/protocols.ts | 27 ++++++++++++ packages/interfaces/src/sender.ts | 30 +------------ packages/sdk/src/protocols/base_protocol.ts | 4 +- packages/sdk/src/protocols/filter.ts | 48 +++++++++++---------- packages/sdk/src/protocols/light_push.ts | 8 ++-- 6 files changed, 63 insertions(+), 57 deletions(-) diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index b4071a4d26..34b4bfdcd5 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -5,6 +5,7 @@ import type { IBaseProtocolCore, IBaseProtocolSDK, ProtocolError, + ProtocolUseOptions, SDKProtocolResult, ShardingParams } from "./protocols.js"; @@ -34,7 +35,7 @@ export type IFilterSDK = IReceiver & IBaseProtocolSDK & { protocol: IBaseProtocolCore } & { createSubscription( pubsubTopicShardInfo?: ShardingParams | PubsubTopic, - options?: SubscribeOptions + protocolUseOptions?: ProtocolUseOptions ): Promise; }; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 0371587ae7..7d43c095af 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -43,6 +43,33 @@ export type ApplicationInfo = { export type ShardingParams = ShardInfo | ContentTopicInfo | ApplicationInfo; +//TODO: merge this with ProtocolCreateOptions or establish distinction +/** + * Options for using LightPush and Filter + */ +export type ProtocolUseOptions = { + /** + * Optional flag to enable auto-retry with exponential backoff + */ + autoRetry?: boolean; + /** + * Optional flag to force using all available peers + */ + forceUseAllPeers?: boolean; + /** + * Optional maximum number of attempts for exponential backoff + */ + maxAttempts?: number; + /** + * Optional initial delay in milliseconds for exponential backoff + */ + initialDelay?: number; + /** + * Optional maximum delay in milliseconds for exponential backoff + */ + maxDelay?: number; +}; + export type ProtocolCreateOptions = { /** * @deprecated diff --git a/packages/interfaces/src/sender.ts b/packages/interfaces/src/sender.ts index 672395e84c..c195403a7e 100644 --- a/packages/interfaces/src/sender.ts +++ b/packages/interfaces/src/sender.ts @@ -1,36 +1,10 @@ import type { IEncoder, IMessage } from "./message.js"; -import { SDKProtocolResult } from "./protocols.js"; +import { ProtocolUseOptions, SDKProtocolResult } from "./protocols.js"; export interface ISender { send: ( encoder: IEncoder, message: IMessage, - sendOptions?: SendOptions + sendOptions?: ProtocolUseOptions ) => Promise; } - -/** - * Options for using LightPush - */ -export type SendOptions = { - /** - * Optional flag to enable auto-retry with exponential backoff - */ - autoRetry?: boolean; - /** - * Optional flag to force using all available peers - */ - forceUseAllPeers?: boolean; - /** - * Optional maximum number of attempts for exponential backoff - */ - maxAttempts?: number; - /** - * Optional initial delay in milliseconds for exponential backoff - */ - initialDelay?: number; - /** - * Optional maximum delay in milliseconds for exponential backoff - */ - maxDelay?: number; -}; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 9688859cea..b1a845df19 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -1,7 +1,7 @@ import type { Peer, PeerId } from "@libp2p/interface"; import { ConnectionManager } from "@waku/core"; import { BaseProtocol } from "@waku/core/lib/base_protocol"; -import { IBaseProtocolSDK, SendOptions } from "@waku/interfaces"; +import { IBaseProtocolSDK, ProtocolUseOptions } from "@waku/interfaces"; import { delay, Logger } from "@waku/utils"; interface Options { @@ -86,7 +86,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100) */ protected hasPeers = async ( - options: Partial = {} + options: Partial = {} ): Promise => { const { autoRetry = false, diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 559279c314..cbb1123f23 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -14,6 +14,7 @@ import { type Libp2p, type ProtocolCreateOptions, ProtocolError, + ProtocolUseOptions, type PubsubTopic, SDKProtocolResult, type ShardingParams, @@ -45,7 +46,7 @@ const DEFAULT_SUBSCRIBE_OPTIONS = { }; export class SubscriptionManager implements ISubscriptionSDK { private readonly pubsubTopic: PubsubTopic; - readonly peers: Peer[]; + private readonly getPeers: () => Peer[]; readonly receivedMessagesHashStr: string[] = []; private keepAliveTimer: number | null = null; @@ -56,11 +57,11 @@ export class SubscriptionManager implements ISubscriptionSDK { constructor( pubsubTopic: PubsubTopic, - remotePeers: Peer[], + getPeers: () => Peer[], private protocol: FilterCore ) { - this.peers = remotePeers; this.pubsubTopic = pubsubTopic; + this.getPeers = getPeers; this.subscriptionCallbacks = new Map(); } @@ -88,7 +89,7 @@ export class SubscriptionManager implements ISubscriptionSDK { const decodersGroupedByCT = groupByContentTopic(decodersArray); const contentTopics = Array.from(decodersGroupedByCT.keys()); - const promises = this.peers.map(async (peer) => + const promises = this.getPeers().map(async (peer) => this.protocol.subscribe(this.pubsubTopic, peer, contentTopics) ); @@ -120,7 +121,7 @@ export class SubscriptionManager implements ISubscriptionSDK { } async unsubscribe(contentTopics: ContentTopic[]): Promise { - const promises = this.peers.map(async (peer) => { + const promises = this.getPeers().map(async (peer) => { const response = await this.protocol.unsubscribe( this.pubsubTopic, peer, @@ -145,7 +146,9 @@ export class SubscriptionManager implements ISubscriptionSDK { } async ping(): Promise { - const promises = this.peers.map(async (peer) => this.protocol.ping(peer)); + const promises = this.getPeers().map(async (peer) => + this.protocol.ping(peer) + ); const results = await Promise.allSettled(promises); @@ -153,7 +156,7 @@ export class SubscriptionManager implements ISubscriptionSDK { } async unsubscribeAll(): Promise { - const promises = this.peers.map(async (peer) => + const promises = this.getPeers().map(async (peer) => this.protocol.unsubscribeAll(this.pubsubTopic, peer) ); @@ -314,8 +317,14 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { * @returns The subscription object. */ async createSubscription( - pubsubTopicShardInfo: ShardingParams | PubsubTopic + pubsubTopicShardInfo: ShardingParams | PubsubTopic, + protocolUseOptions?: ProtocolUseOptions ): Promise { + const options = { + autoRetry: true, + ...protocolUseOptions + } as ProtocolUseOptions; + const pubsubTopic = typeof pubsubTopicShardInfo == "string" ? pubsubTopicShardInfo @@ -323,17 +332,8 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); - let peers: Peer[] = []; - try { - peers = await this.protocol.getPeers(); - } catch (error) { - log.error("Error getting peers to initiate subscription: ", error); - return { - error: ProtocolError.GENERIC_FAIL, - subscription: null - }; - } - if (peers.length === 0) { + const hasPeers = await this.hasPeers(options); + if (!hasPeers) { return { error: ProtocolError.NO_PEER_AVAILABLE, subscription: null @@ -341,15 +341,19 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { } log.info( - `Creating filter subscription with ${peers.length} peers: `, - peers.map((peer) => peer.id.toString()) + `Creating filter subscription with ${this.connectedPeers.length} peers: `, + this.connectedPeers.map((peer) => peer.id.toString()) ); const subscription = this.getActiveSubscription(pubsubTopic) ?? this.setActiveSubscription( pubsubTopic, - new SubscriptionManager(pubsubTopic, peers, this.protocol) + new SubscriptionManager( + pubsubTopic, + () => this.connectedPeers, + this.protocol + ) ); return { diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/light_push.ts index ae52e06e49..0ae05056dd 100644 --- a/packages/sdk/src/protocols/light_push.ts +++ b/packages/sdk/src/protocols/light_push.ts @@ -8,8 +8,8 @@ import { type Libp2p, type ProtocolCreateOptions, ProtocolError, - SDKProtocolResult, - SendOptions + ProtocolUseOptions, + SDKProtocolResult } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; @@ -35,12 +35,12 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { async send( encoder: IEncoder, message: IMessage, - _options?: SendOptions + _options?: ProtocolUseOptions ): Promise { const options = { autoRetry: true, ..._options - } as SendOptions; + } as ProtocolUseOptions; const successes: PeerId[] = []; const failures: Failure[] = []; From 18d34e14ffb1c62e6a128654ecbdf3f1dafb04a8 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Fri, 21 Jun 2024 17:42:38 -0400 Subject: [PATCH 2/4] chore: add test --- .../tests/filter/peer_management.spec.ts | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 packages/tests/tests/filter/peer_management.spec.ts diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts new file mode 100644 index 0000000000..7f6c8642b8 --- /dev/null +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -0,0 +1,75 @@ +import { DefaultPubsubTopic, LightNode } from "@waku/interfaces"; +import { + createDecoder, + createEncoder, + DecodedMessage, + utf8ToBytes +} from "@waku/sdk"; +import { expect } from "chai"; +import { describe } from "mocha"; + +import { + afterEachCustom, + beforeEachCustom, + ServiceNodesFleet +} from "../../src/index.js"; +import { + runMultipleNodes, + teardownNodesWithRedundancy +} from "../filter/utils.js"; + +describe("Waku Filter: Peer Management: E2E", function () { + this.timeout(15000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + + beforeEachCustom(this, async () => { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + undefined, + undefined, + 5 + ); + }); + + afterEachCustom(this, async () => { + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + const pubsubTopic = DefaultPubsubTopic; + const contentTopic = "/test"; + + const encoder = createEncoder({ + pubsubTopic, + contentTopic + }); + + const decoder = createDecoder(contentTopic, pubsubTopic); + + it("Number of peers are maintained correctly", async function () { + const { error, subscription } = + await waku.filter.createSubscription(pubsubTopic); + if (!subscription || error) { + expect.fail("Could not create subscription"); + } + + const messages: DecodedMessage[] = []; + const { failures, successes } = await subscription.subscribe( + [decoder], + (msg) => { + messages.push(msg); + } + ); + + await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello_World") + }); + + expect(successes.length).to.be.greaterThan(0); + expect(successes.length).to.be.equal(waku.filter.numPeersToUse); + + if (failures) { + expect(failures.length).to.equal(0); + } + }); +}); From 4e26a7809a12a04e74e64d573522305111ed42f1 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 2 Jul 2024 19:32:34 +0530 Subject: [PATCH 3/4] chore: address comments --- packages/interfaces/src/filter.ts | 2 +- packages/interfaces/src/protocols.ts | 2 +- packages/sdk/src/protocols/filter.ts | 26 +++++++++----------------- 3 files changed, 11 insertions(+), 19 deletions(-) diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 34b4bfdcd5..4a315d7279 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -35,7 +35,7 @@ export type IFilterSDK = IReceiver & IBaseProtocolSDK & { protocol: IBaseProtocolCore } & { createSubscription( pubsubTopicShardInfo?: ShardingParams | PubsubTopic, - protocolUseOptions?: ProtocolUseOptions + options?: ProtocolUseOptions ): Promise; }; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 7d43c095af..9aeab47285 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -43,7 +43,7 @@ export type ApplicationInfo = { export type ShardingParams = ShardInfo | ContentTopicInfo | ApplicationInfo; -//TODO: merge this with ProtocolCreateOptions or establish distinction +//TODO: merge this with ProtocolCreateOptions or establish distinction: https://github.com/waku-org/js-waku/issues/2048 /** * Options for using LightPush and Filter */ diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index cbb1123f23..cd3db714e2 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -46,7 +46,6 @@ const DEFAULT_SUBSCRIBE_OPTIONS = { }; export class SubscriptionManager implements ISubscriptionSDK { private readonly pubsubTopic: PubsubTopic; - private readonly getPeers: () => Peer[]; readonly receivedMessagesHashStr: string[] = []; private keepAliveTimer: number | null = null; @@ -57,11 +56,10 @@ export class SubscriptionManager implements ISubscriptionSDK { constructor( pubsubTopic: PubsubTopic, - getPeers: () => Peer[], + private peers: Peer[], private protocol: FilterCore ) { this.pubsubTopic = pubsubTopic; - this.getPeers = getPeers; this.subscriptionCallbacks = new Map(); } @@ -89,7 +87,7 @@ export class SubscriptionManager implements ISubscriptionSDK { const decodersGroupedByCT = groupByContentTopic(decodersArray); const contentTopics = Array.from(decodersGroupedByCT.keys()); - const promises = this.getPeers().map(async (peer) => + const promises = this.peers.map(async (peer) => this.protocol.subscribe(this.pubsubTopic, peer, contentTopics) ); @@ -121,7 +119,7 @@ export class SubscriptionManager implements ISubscriptionSDK { } async unsubscribe(contentTopics: ContentTopic[]): Promise { - const promises = this.getPeers().map(async (peer) => { + const promises = this.peers.map(async (peer) => { const response = await this.protocol.unsubscribe( this.pubsubTopic, peer, @@ -146,9 +144,7 @@ export class SubscriptionManager implements ISubscriptionSDK { } async ping(): Promise { - const promises = this.getPeers().map(async (peer) => - this.protocol.ping(peer) - ); + const promises = this.peers.map(async (peer) => this.protocol.ping(peer)); const results = await Promise.allSettled(promises); @@ -156,7 +152,7 @@ export class SubscriptionManager implements ISubscriptionSDK { } async unsubscribeAll(): Promise { - const promises = this.getPeers().map(async (peer) => + const promises = this.peers.map(async (peer) => this.protocol.unsubscribeAll(this.pubsubTopic, peer) ); @@ -318,11 +314,11 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { */ async createSubscription( pubsubTopicShardInfo: ShardingParams | PubsubTopic, - protocolUseOptions?: ProtocolUseOptions + options?: ProtocolUseOptions ): Promise { - const options = { + options = { autoRetry: true, - ...protocolUseOptions + ...options } as ProtocolUseOptions; const pubsubTopic = @@ -349,11 +345,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { this.getActiveSubscription(pubsubTopic) ?? this.setActiveSubscription( pubsubTopic, - new SubscriptionManager( - pubsubTopic, - () => this.connectedPeers, - this.protocol - ) + new SubscriptionManager(pubsubTopic, this.connectedPeers, this.protocol) ); return { From 0787e1e5cee537cd3b94b61c352a4baf23e0ca71 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 3 Jul 2024 11:52:22 +0530 Subject: [PATCH 4/4] chore: add todo --- packages/tests/tests/filter/peer_management.spec.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 7f6c8642b8..1d4354aeba 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -18,6 +18,8 @@ import { teardownNodesWithRedundancy } from "../filter/utils.js"; +//TODO: add unit tests, + describe("Waku Filter: Peer Management: E2E", function () { this.timeout(15000); let waku: LightNode;