From fe81a26595708b676d8a347239bf0d78cf32afb3 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Mon, 27 Nov 2023 18:33:28 +0530 Subject: [PATCH] filter: createSubscription takes shardInfo instead of pubsubTopicStr --- packages/core/src/lib/filter/index.ts | 8 +++++++- packages/interfaces/src/filter.ts | 8 ++++++-- .../tests/filter/multiple_pubsub.node.spec.ts | 16 +++++++++++----- .../tests/tests/filter/subscribe.node.spec.ts | 2 +- packages/tests/tests/filter/utils.ts | 1 + 5 files changed, 26 insertions(+), 9 deletions(-) diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 0b8f3377f7..16b53deba1 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -14,12 +14,14 @@ import type { PeerIdStr, ProtocolCreateOptions, PubsubTopic, + SingleTopicShardInfo, Unsubscribe } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { ensurePubsubTopicIsConfigured, groupByContentTopic, + singleTopicShardInfoToPubsubTopic, toAsyncIterator } from "@waku/utils"; import { Logger } from "@waku/utils"; @@ -289,8 +291,12 @@ class Filter extends BaseProtocol implements IReceiver { } async createSubscription( - pubsubTopic: string = DefaultPubsubTopic + pubsubTopicShardInfo?: SingleTopicShardInfo ): Promise { + const pubsubTopic = pubsubTopicShardInfo + ? singleTopicShardInfoToPubsubTopic(pubsubTopicShardInfo) + : DefaultPubsubTopic; + ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics); //TODO: get a relevant peer for the topic/shard diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 34dad0f87c..580c14cdb9 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -1,6 +1,10 @@ import type { PeerId } from "@libp2p/interface/peer-id"; -import type { IDecodedMessage, IDecoder } from "./message.js"; +import type { + IDecodedMessage, + IDecoder, + SingleTopicShardInfo +} from "./message.js"; import type { ContentTopic } from "./misc.js"; import type { Callback, IBaseProtocol } from "./protocols.js"; import type { IReceiver } from "./receiver.js"; @@ -25,7 +29,7 @@ export interface IFilterSubscription { export type IFilter = IReceiver & IBaseProtocol & { createSubscription( - pubsubTopic?: string, + pubsubTopicShardInfo?: SingleTopicShardInfo, peerId?: PeerId ): Promise; }; diff --git a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts index 65bebd25ff..76b9121a65 100644 --- a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts @@ -6,7 +6,10 @@ import type { SingleTopicShardInfo } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; -import { singleTopicShardInfoToPubsubTopic } from "@waku/utils"; +import { + pubsubTopicToSingleTopicShardInfo, + singleTopicShardInfoToPubsubTopic +} from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -65,7 +68,9 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { [customPubsubTopic1, customPubsubTopic2], shardInfo ); - subscription = await waku.filter.createSubscription(customPubsubTopic1); + subscription = await waku.filter.createSubscription( + pubsubTopicToSingleTopicShardInfo(customPubsubTopic1) + ); messageCollector = new MessageCollector(); }); @@ -89,8 +94,9 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { await subscription.subscribe([customDecoder1], messageCollector.callback); // Subscribe from the same lightnode to the 2nd pubsubtopic - const subscription2 = - await waku.filter.createSubscription(customPubsubTopic2); + const subscription2 = await waku.filter.createSubscription( + pubsubTopicToSingleTopicShardInfo(customPubsubTopic2) + ); const messageCollector2 = new MessageCollector(); @@ -131,7 +137,7 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic const subscription2 = await waku.filter.createSubscription( - customPubsubTopic2, + pubsubTopicToSingleTopicShardInfo(customPubsubTopic2), await nwaku2.getPeerId() ); await nwaku2.ensureSubscriptions([customPubsubTopic2]); diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index a3aa765b53..5853158221 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -377,7 +377,7 @@ describe("Waku Filter V2: Subscribe", function () { await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); const subscription2 = await waku.filter.createSubscription( - DefaultPubsubTopic, + undefined, await nwaku2.getPeerId() ); await nwaku2.ensureSubscriptions([DefaultPubsubTopic]); diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 86ce149306..d9cb4f0135 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -48,6 +48,7 @@ export async function validatePingError( export async function runNodes( context: Context, + //TODO: change this to use `ShardInfo` instead of `string[]` pubsubTopics: string[], shardInfo?: ShardInfo ): Promise<[NimGoNode, LightNode]> {