Skip to content

Commit

Permalink
filter: createSubscription takes shardInfo instead of pubsubTopicStr
Browse files Browse the repository at this point in the history
  • Loading branch information
danisharora099 committed Nov 27, 2023
1 parent ab4831c commit fe81a26
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 9 deletions.
8 changes: 7 additions & 1 deletion packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import type {
PeerIdStr,
ProtocolCreateOptions,
PubsubTopic,
SingleTopicShardInfo,
Unsubscribe
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
singleTopicShardInfoToPubsubTopic,
toAsyncIterator
} from "@waku/utils";
import { Logger } from "@waku/utils";
Expand Down Expand Up @@ -289,8 +291,12 @@ class Filter extends BaseProtocol implements IReceiver {
}

async createSubscription(
pubsubTopic: string = DefaultPubsubTopic
pubsubTopicShardInfo?: SingleTopicShardInfo
): Promise<Subscription> {
const pubsubTopic = pubsubTopicShardInfo
? singleTopicShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic;

ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics);

//TODO: get a relevant peer for the topic/shard
Expand Down
8 changes: 6 additions & 2 deletions packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import type { PeerId } from "@libp2p/interface/peer-id";

import type { IDecodedMessage, IDecoder } from "./message.js";
import type {
IDecodedMessage,
IDecoder,
SingleTopicShardInfo
} from "./message.js";
import type { ContentTopic } from "./misc.js";
import type { Callback, IBaseProtocol } from "./protocols.js";
import type { IReceiver } from "./receiver.js";
Expand All @@ -25,7 +29,7 @@ export interface IFilterSubscription {
export type IFilter = IReceiver &
IBaseProtocol & {
createSubscription(
pubsubTopic?: string,
pubsubTopicShardInfo?: SingleTopicShardInfo,
peerId?: PeerId
): Promise<IFilterSubscription>;
};
16 changes: 11 additions & 5 deletions packages/tests/tests/filter/multiple_pubsub.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import type {
SingleTopicShardInfo
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { singleTopicShardInfoToPubsubTopic } from "@waku/utils";
import {
pubsubTopicToSingleTopicShardInfo,
singleTopicShardInfoToPubsubTopic
} from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";

Expand Down Expand Up @@ -65,7 +68,9 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
[customPubsubTopic1, customPubsubTopic2],
shardInfo
);
subscription = await waku.filter.createSubscription(customPubsubTopic1);
subscription = await waku.filter.createSubscription(
pubsubTopicToSingleTopicShardInfo(customPubsubTopic1)
);
messageCollector = new MessageCollector();
});

Expand All @@ -89,8 +94,9 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);

// Subscribe from the same lightnode to the 2nd pubsubtopic
const subscription2 =
await waku.filter.createSubscription(customPubsubTopic2);
const subscription2 = await waku.filter.createSubscription(
pubsubTopicToSingleTopicShardInfo(customPubsubTopic2)
);

const messageCollector2 = new MessageCollector();

Expand Down Expand Up @@ -131,7 +137,7 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {

// Subscribe from the same lightnode to the new nwaku on the new pubsubtopic
const subscription2 = await waku.filter.createSubscription(
customPubsubTopic2,
pubsubTopicToSingleTopicShardInfo(customPubsubTopic2),
await nwaku2.getPeerId()
);
await nwaku2.ensureSubscriptions([customPubsubTopic2]);
Expand Down
2 changes: 1 addition & 1 deletion packages/tests/tests/filter/subscribe.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ describe("Waku Filter V2: Subscribe", function () {
await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
const subscription2 = await waku.filter.createSubscription(
DefaultPubsubTopic,
undefined,
await nwaku2.getPeerId()
);
await nwaku2.ensureSubscriptions([DefaultPubsubTopic]);
Expand Down
1 change: 1 addition & 0 deletions packages/tests/tests/filter/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export async function validatePingError(

export async function runNodes(
context: Context,
//TODO: change this to use `ShardInfo` instead of `string[]`
pubsubTopics: string[],
shardInfo?: ShardInfo
): Promise<[NimGoNode, LightNode]> {
Expand Down

0 comments on commit fe81a26

Please sign in to comment.