Skip to content

Commit

Permalink
feat: add SDK function for creating node from application and version
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Mar 1, 2024
1 parent 78ee39a commit 93e93ef
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 80 deletions.
73 changes: 1 addition & 72 deletions packages/sdk/src/content_topic.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Multiaddr } from "@multiformats/multiaddr";
import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core";
import {
Callback,
IDecoder,
IFilterSubscription,
LightNode,
Expand All @@ -12,16 +11,9 @@ import {
shardInfoToPubsubTopics
} from "@waku/utils";

import { createLightNode } from "./create.js";

interface CreateTopicOptions {
waku?: LightNode;
peer: Multiaddr;
}

// Given a Waku node, peer Multiaddr, and content topic, creates a decoder and
// subscription for that content topic.
async function prepareSubscription(
export async function prepareSubscription(
waku: LightNode,
contentTopic: string,
peer: Multiaddr
Expand Down Expand Up @@ -56,66 +48,3 @@ async function prepareSubscription(

return { decoder, subscription };
}

/**
* Creates a subscription and streams all new messages for a content topic.
* Will create a light node configured for the content topic with default settings if a node is not provided in `opts`.
* Assumes node is using autosharding.
* @param contentTopic
* @param opts
*/
export async function streamContentTopic(
contentTopic: string,
opts: CreateTopicOptions
): Promise<[ReadableStream<DecodedMessage>, LightNode]> {
opts.waku =
opts.waku ??
(await createLightNode({
shardInfo: { contentTopics: [contentTopic] }
}));
const { decoder, subscription } = await prepareSubscription(
opts.waku,
contentTopic,
opts.peer
);

// Create a ReadableStream that receives any messages for the content topic
const messageStream = new ReadableStream<DecodedMessage>({
async start(controller) {
await subscription.subscribe(decoder, (message) => {
controller.enqueue(message);
});
},
cancel() {
return subscription.unsubscribe([contentTopic]);
}
});
return [messageStream, opts.waku];
}

/**
* Subscribes to new messages for a content topic via callback function.
* Will create a light node configured for the content topic with default settings if a node is not provided in `opts`.
* Assumes node is using autosharding.
* @param contentTopic
* @param callback Called every time a new message is received on the content topic
* @param opts
*/
export async function subscribeToContentTopic(
contentTopic: string,
callback: Callback<DecodedMessage>,
opts: CreateTopicOptions
): Promise<{ subscription: IFilterSubscription; waku: LightNode }> {
opts.waku =
opts.waku ??
(await createLightNode({
shardInfo: { contentTopics: [contentTopic] }
}));
const { decoder, subscription } = await prepareSubscription(
opts.waku,
contentTopic,
opts.peer
);
await subscription.subscribe(decoder, callback);
return { subscription, waku: opts.waku };
}
91 changes: 90 additions & 1 deletion packages/sdk/src/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@ import { mplex } from "@libp2p/mplex";
import { ping } from "@libp2p/ping";
import { webSockets } from "@libp2p/websockets";
import { all as filterAll } from "@libp2p/websockets/filters";
import { wakuFilter, wakuLightPush, wakuMetadata, wakuStore } from "@waku/core";
import type { Multiaddr } from "@multiformats/multiaddr";
import {
DecodedMessage,
wakuFilter,
wakuLightPush,
wakuMetadata,
wakuStore
} from "@waku/core";
import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery";
import {
Callback,
type CreateLibp2pOptions,
DefaultPubsubTopic,
type FullNode,
IFilterSubscription,
type IMetadata,
type Libp2p,
type Libp2pComponents,
Expand All @@ -26,8 +35,14 @@ import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
import { ensureShardingConfigured } from "@waku/utils";
import { createLibp2p } from "libp2p";

import { prepareSubscription } from "./content_topic.js";
import { DefaultUserAgent, WakuNode, WakuOptions } from "./waku.js";

interface CreateTopicOptions {
waku?: LightNode;
peer: Multiaddr;
}

const DEFAULT_NODE_REQUIREMENTS = {
lightPush: 1,
filter: 1,
Expand All @@ -36,6 +51,17 @@ const DEFAULT_NODE_REQUIREMENTS = {

export { Libp2pComponents };

export async function createApplicationNode(
application: string,
version: string,
options?: ProtocolCreateOptions &
Partial<WakuOptions> &
Partial<RelayCreateOptions>
): Promise<LightNode> {
options = options ?? {};
options.shardInfo = { application, version };
return createNode(options);
}
/**
* Create a Waku node configured to use autosharding or static sharding.
*/
Expand Down Expand Up @@ -248,3 +274,66 @@ export async function defaultLibp2p(
}
}) as any as Libp2p; // TODO: make libp2p include it;

Check warning on line 275 in packages/sdk/src/create.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 275 in packages/sdk/src/create.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
}

/**
* Subscribes to new messages for a content topic via callback function.
* Will create a light node configured for the content topic with default settings if a node is not provided in `opts`.
* Assumes node is using autosharding.
* @param contentTopic
* @param callback Called every time a new message is received on the content topic
* @param opts
*/
export async function subscribeToContentTopic(
contentTopic: string,
callback: Callback<DecodedMessage>,
opts: CreateTopicOptions
): Promise<{ subscription: IFilterSubscription; waku: LightNode }> {
opts.waku =
opts.waku ??
(await createLightNode({
shardInfo: { contentTopics: [contentTopic] }
}));
const { decoder, subscription } = await prepareSubscription(
opts.waku,
contentTopic,
opts.peer
);
await subscription.subscribe(decoder, callback);
return { subscription, waku: opts.waku };
}

/**
* Creates a subscription and streams all new messages for a content topic.
* Will create a light node configured for the content topic with default settings if a node is not provided in `opts`.
* Assumes node is using autosharding.
* @param contentTopic
* @param opts
*/
export async function streamContentTopic(
contentTopic: string,
opts: CreateTopicOptions
): Promise<[ReadableStream<DecodedMessage>, LightNode]> {
opts.waku =
opts.waku ??
(await createLightNode({
shardInfo: { contentTopics: [contentTopic] }
}));
const { decoder, subscription } = await prepareSubscription(
opts.waku,
contentTopic,
opts.peer
);

// Create a ReadableStream that receives any messages for the content topic
const messageStream = new ReadableStream<DecodedMessage>({
async start(controller) {
await subscription.subscribe(decoder, (message) => {
controller.enqueue(message);
});
},
cancel() {
return subscription.unsubscribe([contentTopic]);
}
});
return [messageStream, opts.waku];
}
15 changes: 8 additions & 7 deletions packages/sdk/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import type {
import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";

import { subscribeToContentTopic } from "./content_topic.js";
import { prepareSubscription } from "./content_topic.js";

export const DefaultPingKeepAliveValueSecs = 5 * 60;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
Expand Down Expand Up @@ -189,12 +189,13 @@ export class WakuNode implements Waku {
peer: Multiaddr,
callback: Callback<DecodedMessage>
): Promise<IFilterSubscription> {
return (
await subscribeToContentTopic(contentTopic, callback, {
waku: this as LightNode,
peer
})
).subscription;
const { decoder, subscription } = await prepareSubscription(
this as LightNode,
contentTopic,
peer
);
await subscription.subscribe(decoder, callback);
return subscription;
}

isStarted(): boolean {
Expand Down

0 comments on commit 93e93ef

Please sign in to comment.