diff --git a/packages/sdk/src/content_topic.ts b/packages/sdk/src/content_topic.ts index c4a5c789bf..77d5e07de2 100644 --- a/packages/sdk/src/content_topic.ts +++ b/packages/sdk/src/content_topic.ts @@ -1,7 +1,6 @@ import type { Multiaddr } from "@multiformats/multiaddr"; import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core"; import { - Callback, IDecoder, IFilterSubscription, LightNode, @@ -12,16 +11,9 @@ import { shardInfoToPubsubTopics } from "@waku/utils"; -import { createLightNode } from "./create.js"; - -interface CreateTopicOptions { - waku?: LightNode; - peer: Multiaddr; -} - // Given a Waku node, peer Multiaddr, and content topic, creates a decoder and // subscription for that content topic. -async function prepareSubscription( +export async function prepareSubscription( waku: LightNode, contentTopic: string, peer: Multiaddr @@ -56,66 +48,3 @@ async function prepareSubscription( return { decoder, subscription }; } - -/** - * Creates a subscription and streams all new messages for a content topic. - * Will create a light node configured for the content topic with default settings if a node is not provided in `opts`. - * Assumes node is using autosharding. - * @param contentTopic - * @param opts - */ -export async function streamContentTopic( - contentTopic: string, - opts: CreateTopicOptions -): Promise<[ReadableStream, LightNode]> { - opts.waku = - opts.waku ?? - (await createLightNode({ - shardInfo: { contentTopics: [contentTopic] } - })); - const { decoder, subscription } = await prepareSubscription( - opts.waku, - contentTopic, - opts.peer - ); - - // Create a ReadableStream that receives any messages for the content topic - const messageStream = new ReadableStream({ - async start(controller) { - await subscription.subscribe(decoder, (message) => { - controller.enqueue(message); - }); - }, - cancel() { - return subscription.unsubscribe([contentTopic]); - } - }); - return [messageStream, opts.waku]; -} - -/** - * Subscribes to new messages for a content topic via callback function. - * Will create a light node configured for the content topic with default settings if a node is not provided in `opts`. - * Assumes node is using autosharding. - * @param contentTopic - * @param callback Called every time a new message is received on the content topic - * @param opts - */ -export async function subscribeToContentTopic( - contentTopic: string, - callback: Callback, - opts: CreateTopicOptions -): Promise<{ subscription: IFilterSubscription; waku: LightNode }> { - opts.waku = - opts.waku ?? - (await createLightNode({ - shardInfo: { contentTopics: [contentTopic] } - })); - const { decoder, subscription } = await prepareSubscription( - opts.waku, - contentTopic, - opts.peer - ); - await subscription.subscribe(decoder, callback); - return { subscription, waku: opts.waku }; -} diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/create.ts index 3cdb809ebf..6ba5ea9ca6 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/create.ts @@ -6,12 +6,21 @@ import { mplex } from "@libp2p/mplex"; import { ping } from "@libp2p/ping"; import { webSockets } from "@libp2p/websockets"; import { all as filterAll } from "@libp2p/websockets/filters"; -import { wakuFilter, wakuLightPush, wakuMetadata, wakuStore } from "@waku/core"; +import type { Multiaddr } from "@multiformats/multiaddr"; +import { + DecodedMessage, + wakuFilter, + wakuLightPush, + wakuMetadata, + wakuStore +} from "@waku/core"; import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery"; import { + Callback, type CreateLibp2pOptions, DefaultPubsubTopic, type FullNode, + IFilterSubscription, type IMetadata, type Libp2p, type Libp2pComponents, @@ -26,8 +35,14 @@ import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay"; import { ensureShardingConfigured } from "@waku/utils"; import { createLibp2p } from "libp2p"; +import { prepareSubscription } from "./content_topic.js"; import { DefaultUserAgent, WakuNode, WakuOptions } from "./waku.js"; +interface CreateTopicOptions { + waku?: LightNode; + peer: Multiaddr; +} + const DEFAULT_NODE_REQUIREMENTS = { lightPush: 1, filter: 1, @@ -36,6 +51,17 @@ const DEFAULT_NODE_REQUIREMENTS = { export { Libp2pComponents }; +export async function createApplicationNode( + application: string, + version: string, + options?: ProtocolCreateOptions & + Partial & + Partial +): Promise { + options = options ?? {}; + options.shardInfo = { application, version }; + return createNode(options); +} /** * Create a Waku node configured to use autosharding or static sharding. */ @@ -248,3 +274,66 @@ export async function defaultLibp2p( } }) as any as Libp2p; // TODO: make libp2p include it; } + +/** + * Subscribes to new messages for a content topic via callback function. + * Will create a light node configured for the content topic with default settings if a node is not provided in `opts`. + * Assumes node is using autosharding. + * @param contentTopic + * @param callback Called every time a new message is received on the content topic + * @param opts + */ +export async function subscribeToContentTopic( + contentTopic: string, + callback: Callback, + opts: CreateTopicOptions +): Promise<{ subscription: IFilterSubscription; waku: LightNode }> { + opts.waku = + opts.waku ?? + (await createLightNode({ + shardInfo: { contentTopics: [contentTopic] } + })); + const { decoder, subscription } = await prepareSubscription( + opts.waku, + contentTopic, + opts.peer + ); + await subscription.subscribe(decoder, callback); + return { subscription, waku: opts.waku }; +} + +/** + * Creates a subscription and streams all new messages for a content topic. + * Will create a light node configured for the content topic with default settings if a node is not provided in `opts`. + * Assumes node is using autosharding. + * @param contentTopic + * @param opts + */ +export async function streamContentTopic( + contentTopic: string, + opts: CreateTopicOptions +): Promise<[ReadableStream, LightNode]> { + opts.waku = + opts.waku ?? + (await createLightNode({ + shardInfo: { contentTopics: [contentTopic] } + })); + const { decoder, subscription } = await prepareSubscription( + opts.waku, + contentTopic, + opts.peer + ); + + // Create a ReadableStream that receives any messages for the content topic + const messageStream = new ReadableStream({ + async start(controller) { + await subscription.subscribe(decoder, (message) => { + controller.enqueue(message); + }); + }, + cancel() { + return subscription.unsubscribe([contentTopic]); + } + }); + return [messageStream, opts.waku]; +} diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index a4649b5be3..ff4825c104 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -17,7 +17,7 @@ import type { import { Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; -import { subscribeToContentTopic } from "./content_topic.js"; +import { prepareSubscription } from "./content_topic.js"; export const DefaultPingKeepAliveValueSecs = 5 * 60; export const DefaultRelayKeepAliveValueSecs = 5 * 60; @@ -189,12 +189,13 @@ export class WakuNode implements Waku { peer: Multiaddr, callback: Callback ): Promise { - return ( - await subscribeToContentTopic(contentTopic, callback, { - waku: this as LightNode, - peer - }) - ).subscription; + const { decoder, subscription } = await prepareSubscription( + this as LightNode, + contentTopic, + peer + ); + await subscription.subscribe(decoder, callback); + return subscription; } isStarted(): boolean {