Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filter): use protocol peer management #2047

Merged
merged 4 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
IBaseProtocolCore,
IBaseProtocolSDK,
ProtocolError,
ProtocolUseOptions,
SDKProtocolResult,
ShardingParams
} from "./protocols.js";
Expand Down Expand Up @@ -34,7 +35,7 @@ export type IFilterSDK = IReceiver &
IBaseProtocolSDK & { protocol: IBaseProtocolCore } & {
createSubscription(
pubsubTopicShardInfo?: ShardingParams | PubsubTopic,
options?: SubscribeOptions
protocolUseOptions?: ProtocolUseOptions
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
): Promise<CreateSubscriptionResult>;
};

Expand Down
27 changes: 27 additions & 0 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,33 @@ export type ApplicationInfo = {

export type ShardingParams = ShardInfo | ContentTopicInfo | ApplicationInfo;

//TODO: merge this with ProtocolCreateOptions or establish distinction
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Options for using LightPush and Filter
*/
export type ProtocolUseOptions = {
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Optional flag to enable auto-retry with exponential backoff
*/
autoRetry?: boolean;
/**
* Optional flag to force using all available peers
*/
forceUseAllPeers?: boolean;
/**
* Optional maximum number of attempts for exponential backoff
*/
maxAttempts?: number;
/**
* Optional initial delay in milliseconds for exponential backoff
*/
initialDelay?: number;
/**
* Optional maximum delay in milliseconds for exponential backoff
*/
maxDelay?: number;
};

export type ProtocolCreateOptions = {
/**
* @deprecated
Expand Down
30 changes: 2 additions & 28 deletions packages/interfaces/src/sender.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,10 @@
import type { IEncoder, IMessage } from "./message.js";
import { SDKProtocolResult } from "./protocols.js";
import { ProtocolUseOptions, SDKProtocolResult } from "./protocols.js";

export interface ISender {
send: (
encoder: IEncoder,
message: IMessage,
sendOptions?: SendOptions
sendOptions?: ProtocolUseOptions
) => Promise<SDKProtocolResult>;
}

/**
* Options for using LightPush
*/
export type SendOptions = {
/**
* Optional flag to enable auto-retry with exponential backoff
*/
autoRetry?: boolean;
/**
* Optional flag to force using all available peers
*/
forceUseAllPeers?: boolean;
/**
* Optional maximum number of attempts for exponential backoff
*/
maxAttempts?: number;
/**
* Optional initial delay in milliseconds for exponential backoff
*/
initialDelay?: number;
/**
* Optional maximum delay in milliseconds for exponential backoff
*/
maxDelay?: number;
};
4 changes: 2 additions & 2 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { IBaseProtocolSDK, SendOptions } from "@waku/interfaces";
import { IBaseProtocolSDK, ProtocolUseOptions } from "@waku/interfaces";
import { delay, Logger } from "@waku/utils";

interface Options {
Expand Down Expand Up @@ -86,7 +86,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100)
*/
protected hasPeers = async (
options: Partial<SendOptions> = {}
options: Partial<ProtocolUseOptions> = {}
): Promise<boolean> => {
const {
autoRetry = false,
Expand Down
48 changes: 26 additions & 22 deletions packages/sdk/src/protocols/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
ProtocolUseOptions,
type PubsubTopic,
SDKProtocolResult,
type ShardingParams,
Expand Down Expand Up @@ -45,7 +46,7 @@ const DEFAULT_SUBSCRIBE_OPTIONS = {
};
export class SubscriptionManager implements ISubscriptionSDK {
private readonly pubsubTopic: PubsubTopic;
readonly peers: Peer[];
private readonly getPeers: () => Peer[];
readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;

Expand All @@ -56,11 +57,11 @@ export class SubscriptionManager implements ISubscriptionSDK {

constructor(
pubsubTopic: PubsubTopic,
remotePeers: Peer[],
getPeers: () => Peer[],
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
private protocol: FilterCore
) {
this.peers = remotePeers;
this.pubsubTopic = pubsubTopic;
this.getPeers = getPeers;
this.subscriptionCallbacks = new Map();
}

Expand Down Expand Up @@ -88,7 +89,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys());

const promises = this.peers.map(async (peer) =>
const promises = this.getPeers().map(async (peer) =>
this.protocol.subscribe(this.pubsubTopic, peer, contentTopics)
);

Expand Down Expand Up @@ -120,7 +121,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
}

async unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) => {
const promises = this.getPeers().map(async (peer) => {
const response = await this.protocol.unsubscribe(
this.pubsubTopic,
peer,
Expand All @@ -145,15 +146,17 @@ export class SubscriptionManager implements ISubscriptionSDK {
}

async ping(): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) => this.protocol.ping(peer));
const promises = this.getPeers().map(async (peer) =>
this.protocol.ping(peer)
);

const results = await Promise.allSettled(promises);

return this.handleResult(results, "ping");
}

async unsubscribeAll(): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) =>
const promises = this.getPeers().map(async (peer) =>
this.protocol.unsubscribeAll(this.pubsubTopic, peer)
);

Expand Down Expand Up @@ -314,42 +317,43 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
* @returns The subscription object.
*/
async createSubscription(
pubsubTopicShardInfo: ShardingParams | PubsubTopic
pubsubTopicShardInfo: ShardingParams | PubsubTopic,
protocolUseOptions?: ProtocolUseOptions
): Promise<CreateSubscriptionResult> {
const options = {
autoRetry: true,
...protocolUseOptions
} as ProtocolUseOptions;

const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0];

ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);

let peers: Peer[] = [];
try {
peers = await this.protocol.getPeers();
} catch (error) {
log.error("Error getting peers to initiate subscription: ", error);
return {
error: ProtocolError.GENERIC_FAIL,
subscription: null
};
}
if (peers.length === 0) {
const hasPeers = await this.hasPeers(options);
if (!hasPeers) {
return {
error: ProtocolError.NO_PEER_AVAILABLE,
subscription: null
};
}

log.info(
`Creating filter subscription with ${peers.length} peers: `,
peers.map((peer) => peer.id.toString())
`Creating filter subscription with ${this.connectedPeers.length} peers: `,
this.connectedPeers.map((peer) => peer.id.toString())
);

const subscription =
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new SubscriptionManager(pubsubTopic, peers, this.protocol)
new SubscriptionManager(
pubsubTopic,
() => this.connectedPeers,
this.protocol
)
);

return {
Expand Down
8 changes: 4 additions & 4 deletions packages/sdk/src/protocols/light_push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import {
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
SDKProtocolResult,
SendOptions
ProtocolUseOptions,
SDKProtocolResult
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";

Expand All @@ -35,12 +35,12 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
async send(
encoder: IEncoder,
message: IMessage,
_options?: SendOptions
_options?: ProtocolUseOptions
): Promise<SDKProtocolResult> {
const options = {
autoRetry: true,
..._options
} as SendOptions;
} as ProtocolUseOptions;

const successes: PeerId[] = [];
const failures: Failure[] = [];
Expand Down
75 changes: 75 additions & 0 deletions packages/tests/tests/filter/peer_management.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { DefaultPubsubTopic, LightNode } from "@waku/interfaces";
import {
createDecoder,
createEncoder,
DecodedMessage,
utf8ToBytes
} from "@waku/sdk";
import { expect } from "chai";
import { describe } from "mocha";

import {
afterEachCustom,
beforeEachCustom,
ServiceNodesFleet
} from "../../src/index.js";
import {
runMultipleNodes,
teardownNodesWithRedundancy
} from "../filter/utils.js";

describe("Waku Filter: Peer Management: E2E", function () {
this.timeout(15000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;

beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
undefined,
undefined,
5
);
});

afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});

const pubsubTopic = DefaultPubsubTopic;
const contentTopic = "/test";

const encoder = createEncoder({
pubsubTopic,
contentTopic
});

const decoder = createDecoder(contentTopic, pubsubTopic);

it("Number of peers are maintained correctly", async function () {
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
const { error, subscription } =
await waku.filter.createSubscription(pubsubTopic);
if (!subscription || error) {
expect.fail("Could not create subscription");
}

const messages: DecodedMessage[] = [];
const { failures, successes } = await subscription.subscribe(
[decoder],
(msg) => {
messages.push(msg);
}
);

await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});

expect(successes.length).to.be.greaterThan(0);
expect(successes.length).to.be.equal(waku.filter.numPeersToUse);

if (failures) {
expect(failures.length).to.equal(0);
}
});
});
Loading