-
Notifications
You must be signed in to change notification settings - Fork 42
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: create node and subscription by content topic
- Loading branch information
1 parent
f6b62b8
commit c4a7ad1
Showing
3 changed files
with
242 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
import type { Multiaddr } from "@multiformats/multiaddr"; | ||
import { | ||
createDecoder, | ||
DecodedMessage, | ||
waitForRemotePeer, | ||
WakuNode | ||
} from "@waku/core"; | ||
import { | ||
Callback, | ||
IDecoder, | ||
IFilterSubscription, | ||
IMetadata, | ||
Protocols | ||
} from "@waku/interfaces"; | ||
import { | ||
contentTopicToPubsubTopic, | ||
shardInfoToPubsubTopics | ||
} from "@waku/utils"; | ||
|
||
import { createLightNode } from "."; | ||
|
||
// Given a Waku node, peer Multiaddr, and content topic, creates a decoder and | ||
// subscription for that content topic. | ||
export async function prepareSubscription( | ||
waku: WakuNode, | ||
contentTopic: string, | ||
peer?: Multiaddr | ||
): Promise<{ | ||
decoder: IDecoder<DecodedMessage>; | ||
subscription: IFilterSubscription; | ||
}> { | ||
// Validate that the Waku node matches assumptions | ||
if (!waku.filter) { | ||
throw new Error("Filter protocol missing from Waku node"); | ||
} | ||
const shardInfo = (waku.libp2p.components.metadata as IMetadata).shardInfo; | ||
if (!shardInfo) { | ||
throw new Error("Shard info missing from Waku node."); | ||
} | ||
|
||
// Validate content topic and ensure node is configured for its corresponding pubsub topic | ||
const pubsubTopics = shardInfoToPubsubTopics(shardInfo); | ||
const pubsubTopic = contentTopicToPubsubTopic(contentTopic); | ||
if (!pubsubTopics.includes(pubsubTopic)) | ||
throw new Error( | ||
"Content topic does not match any pubsub topic in shard info." | ||
); | ||
|
||
// Ensures there is a peer. Without this condition, the subscription will fail to create. | ||
if (peer) { | ||
await waku.dial(peer); | ||
await waitForRemotePeer(waku, [Protocols.Filter]); | ||
} | ||
|
||
// Create decoder and subscription | ||
const decoder = createDecoder(contentTopic, pubsubTopic); | ||
const subscription = await waku.filter.createSubscription(pubsubTopic); | ||
|
||
return { decoder, subscription }; | ||
} | ||
|
||
export async function streamContentTopic( | ||
contentTopic: string, | ||
waku?: WakuNode, | ||
peer?: Multiaddr | ||
): Promise<[ReadableStream<DecodedMessage>, WakuNode]> { | ||
waku = | ||
waku ?? | ||
((await createLightNode({ | ||
shardInfo: { contentTopics: [contentTopic] } | ||
})) as WakuNode); | ||
const { decoder, subscription } = await prepareSubscription( | ||
waku, | ||
contentTopic, | ||
peer | ||
); | ||
|
||
// Create a ReadableStream that receives any messages for the content topic | ||
const messageStream = new ReadableStream<DecodedMessage>({ | ||
async start(controller) { | ||
await subscription.subscribe(decoder, (message) => { | ||
controller.enqueue(message); | ||
}); | ||
}, | ||
cancel() { | ||
return subscription.unsubscribe([contentTopic]); | ||
} | ||
}); | ||
return [messageStream, waku]; | ||
} | ||
|
||
export async function subscribeToContentTopic( | ||
contentTopic: string, | ||
callback: Callback<DecodedMessage>, | ||
waku?: WakuNode, | ||
peer?: Multiaddr | ||
): Promise<[IFilterSubscription, WakuNode]> { | ||
waku = | ||
waku ?? | ||
((await createLightNode({ | ||
shardInfo: { contentTopics: [contentTopic] } | ||
})) as WakuNode); | ||
const { decoder, subscription } = await prepareSubscription( | ||
waku, | ||
contentTopic, | ||
peer | ||
); | ||
await subscription.subscribe(decoder, callback); | ||
return [subscription, waku]; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
import { | ||
bytesToUtf8, | ||
createEncoder, | ||
createLightNode, | ||
DEFAULT_CLUSTER_ID, | ||
Protocols, | ||
streamContentTopic, | ||
subscribeToContentTopic, | ||
utf8ToBytes, | ||
waitForRemotePeer, | ||
WakuNode | ||
} from "@waku/sdk"; | ||
import { | ||
contentTopicToPubsubTopic, | ||
pubsubTopicToSingleShardInfo | ||
} from "@waku/utils"; | ||
import { expect } from "chai"; | ||
|
||
import { makeLogFileName, ServiceNode, tearDownNodes } from "../../src"; | ||
|
||
describe("SDK: Creating by Content Topic", function () { | ||
const ContentTopic = "/myapp/1/latest/proto"; | ||
const testMessage = "Test123"; | ||
let nwaku: ServiceNode; | ||
let waku: WakuNode; | ||
let waku2: WakuNode; | ||
|
||
beforeEach(async function () { | ||
nwaku = new ServiceNode(makeLogFileName(this) + "1"); | ||
await nwaku.start({ | ||
pubsubTopic: [contentTopicToPubsubTopic(ContentTopic)], | ||
lightpush: true, | ||
relay: true, | ||
filter: true, | ||
discv5Discovery: true, | ||
peerExchange: true, | ||
clusterId: DEFAULT_CLUSTER_ID | ||
}); | ||
}); | ||
|
||
afterEach(async function () { | ||
await tearDownNodes(nwaku, [waku, waku2]); | ||
}); | ||
|
||
it("given a content topic, creates a waku node and filter subscription", async function () { | ||
const expectedPubsubTopic = contentTopicToPubsubTopic(ContentTopic); | ||
|
||
[, waku] = await subscribeToContentTopic( | ||
ContentTopic, | ||
() => {}, | ||
undefined, | ||
await nwaku.getMultiaddrWithId() | ||
); | ||
|
||
expect(waku.pubsubTopics).to.include(expectedPubsubTopic); | ||
}); | ||
|
||
it("given a waku node and content topic, creates a filter subscription", async function () { | ||
const expectedPubsubTopic = contentTopicToPubsubTopic(ContentTopic); | ||
|
||
waku = (await createLightNode({ | ||
shardInfo: { contentTopics: [ContentTopic] } | ||
})) as WakuNode; | ||
await subscribeToContentTopic( | ||
ContentTopic, | ||
() => {}, | ||
waku, | ||
await nwaku.getMultiaddrWithId() | ||
); | ||
|
||
expect(waku.pubsubTopics).to.include(expectedPubsubTopic); | ||
}); | ||
|
||
it("receives messages sent to provided content topic through callback", async function () { | ||
const messages: string[] = []; | ||
[, waku] = await subscribeToContentTopic( | ||
ContentTopic, | ||
(msg) => { | ||
messages.push(bytesToUtf8(msg.payload)); | ||
}, | ||
undefined, | ||
await nwaku.getMultiaddrWithId() | ||
); | ||
|
||
waku2 = (await createLightNode({ | ||
shardInfo: { contentTopics: [ContentTopic] } | ||
})) as WakuNode; | ||
await waku2.dial(await nwaku.getMultiaddrWithId()); | ||
await waitForRemotePeer(waku2, [Protocols.LightPush]); | ||
const encoder = createEncoder({ | ||
pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( | ||
contentTopicToPubsubTopic(ContentTopic) | ||
), | ||
contentTopic: ContentTopic | ||
}); | ||
await waku2.lightPush?.send(encoder, { | ||
payload: utf8ToBytes(testMessage) | ||
}); | ||
|
||
expect(messages[0]).to.be.eq(testMessage); | ||
}); | ||
|
||
it("receives messages sent to provided content topic through stream", async function () { | ||
let stream; | ||
[stream, waku] = await streamContentTopic( | ||
ContentTopic, | ||
undefined, | ||
await nwaku.getMultiaddrWithId() | ||
); | ||
|
||
waku2 = (await createLightNode({ | ||
shardInfo: { contentTopics: [ContentTopic] } | ||
})) as WakuNode; | ||
await waku2.dial(await nwaku.getMultiaddrWithId()); | ||
await waitForRemotePeer(waku2, [Protocols.LightPush]); | ||
|
||
const encoder = createEncoder({ | ||
pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( | ||
contentTopicToPubsubTopic(ContentTopic) | ||
), | ||
contentTopic: ContentTopic | ||
}); | ||
await waku2.lightPush?.send(encoder, { | ||
payload: utf8ToBytes(testMessage) | ||
}); | ||
|
||
const reader = stream.getReader(); | ||
const { value: message } = await reader.read(); | ||
expect(bytesToUtf8(message!.payload)).to.be.eq(testMessage); | ||
}); | ||
}); |