Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filter): peer/subscription renewal with recurring Filter pings #2052

Merged
merged 15 commits into from
Jul 10, 2024
Merged
5 changes: 4 additions & 1 deletion packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { PeerId } from "@libp2p/interface";

import type { IDecodedMessage, IDecoder } from "./message.js";
import type { ContentTopic, PubsubTopic, ThisOrThat } from "./misc.js";
import type {
Expand All @@ -13,6 +15,7 @@ import type { IReceiver } from "./receiver.js";

export type SubscribeOptions = {
keepAlive?: number;
pingsBeforePeerRenewed?: number;
};

export type IFilter = IReceiver & IBaseProtocolCore;
Expand All @@ -26,7 +29,7 @@ export interface ISubscriptionSDK {

unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult>;

ping(): Promise<SDKProtocolResult>;
ping(peerId?: PeerId): Promise<SDKProtocolResult>;

unsubscribeAll(): Promise<SDKProtocolResult>;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export type IBaseProtocolCore = {
};

export type IBaseProtocolSDK = {
renewPeer: (peerToDisconnect: PeerId) => Promise<void>;
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>;
readonly connectedPeers: Peer[];
readonly numPeersToUse: number;
};
Expand Down
39 changes: 21 additions & 18 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,24 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
/**
* Disconnects from a peer and tries to find a new one to replace it.
* @param peerToDisconnect The peer to disconnect from.
* @returns The new peer that was found and connected to.
*/
public async renewPeer(peerToDisconnect: PeerId): Promise<void> {
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer> {
this.log.info(`Renewing peer ${peerToDisconnect}`);
try {
await this.connectionManager.dropConnection(peerToDisconnect);
this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect);
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);

await this.findAndAddPeers(1);
} catch (error) {
this.log.info(
"Peer renewal failed, relying on the interval to find a new peer"
await this.connectionManager.dropConnection(peerToDisconnect);
this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect);
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);

const peer = (await this.findAndAddPeers(1))[0];
if (!peer) {
throw new Error(
"Failed to find a new peer to replace the disconnected one"
);
}

return peer;
}

/**
Expand Down Expand Up @@ -171,14 +173,15 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* Finds and adds new peers to the peers list.
* @param numPeers The number of peers to find and add.
*/
private async findAndAddPeers(numPeers: number): Promise<void> {
private async findAndAddPeers(numPeers: number): Promise<Peer[]> {
this.log.info(`Finding and adding ${numPeers} new peers`);
try {
const additionalPeers = await this.findAdditionalPeers(numPeers);
this.peers = [...this.peers, ...additionalPeers];
this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}`
);
return additionalPeers;
} catch (error) {
this.log.error("Error finding and adding new peers:", error);
throw error;
Expand All @@ -197,20 +200,20 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
try {
let newPeers = await this.core.getPeers({
maxBootstrapPeers: 0,
numPeers: numPeers
numPeers: 0
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
});

if (newPeers.length === 0) {
this.log.warn("No new peers found, trying with bootstrap peers");
newPeers = await this.core.getPeers({
maxBootstrapPeers: numPeers,
numPeers: numPeers
numPeers: 0
});
}

newPeers = newPeers.filter(
(peer) => this.peers.some((p) => p.id === peer.id) === false
);
newPeers = newPeers
.filter((peer) => this.peers.some((p) => p.id === peer.id) === false)
.slice(0, numPeers);
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
return newPeers;
} catch (error) {
this.log.error("Error finding additional peers:", error);
Expand Down
119 changes: 91 additions & 28 deletions packages/sdk/src/protocols/filter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Peer } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, FilterCore } from "@waku/core";
import {
type Callback,
Expand Down Expand Up @@ -41,13 +42,17 @@ type SubscriptionCallback<T extends IDecodedMessage> = {
const log = new Logger("sdk:filter");

const MINUTE = 60 * 1000;
const DEFAULT_MAX_PINGS = 3;

const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: MINUTE
};
export class SubscriptionManager implements ISubscriptionSDK {
private readonly pubsubTopic: PubsubTopic;
readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;
private peerFailures: Map<string, number> = new Map();
private maxPingFailures: number = DEFAULT_MAX_PINGS;

private subscriptionCallbacks: Map<
ContentTopic,
Expand All @@ -56,18 +61,21 @@ export class SubscriptionManager implements ISubscriptionSDK {

constructor(
pubsubTopic: PubsubTopic,
private peers: Peer[],
private protocol: FilterCore
private protocol: FilterCore,
private getPeers: () => Peer[],
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
}

async subscribe<T extends IDecodedMessage>(
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SDKProtocolResult> {
this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS;

const decodersArray = Array.isArray(decoders) ? decoders : [decoders];

// check that all decoders are configured for the same pubsub topic as this subscription
Expand All @@ -87,7 +95,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys());

const promises = this.peers.map(async (peer) =>
const promises = this.getPeers().map(async (peer) =>
this.protocol.subscribe(this.pubsubTopic, peer, contentTopics)
);

Expand All @@ -111,15 +119,17 @@ export class SubscriptionManager implements ISubscriptionSDK {
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});

if (options?.keepAlive) {
this.startKeepAlivePings(options.keepAlive);
if (options.keepAlive) {
this.startKeepAlivePings(options);
}

return finalResult;
}

async unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) => {
public async unsubscribe(
contentTopics: ContentTopic[]
): Promise<SDKProtocolResult> {
const promises = this.getPeers().map(async (peer) => {
const response = await this.protocol.unsubscribe(
this.pubsubTopic,
peer,
Expand All @@ -143,16 +153,17 @@ export class SubscriptionManager implements ISubscriptionSDK {
return finalResult;
}

async ping(): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) => this.protocol.ping(peer));
public async ping(peerId?: PeerId): Promise<SDKProtocolResult> {
const peers = peerId ? [peerId] : this.getPeers().map((peer) => peer.id);

const promises = peers.map((peerId) => this.pingSpecificPeer(peerId));
const results = await Promise.allSettled(promises);

return this.handleResult(results, "ping");
}

async unsubscribeAll(): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) =>
public async unsubscribeAll(): Promise<SDKProtocolResult> {
const promises = this.getPeers().map(async (peer) =>
this.protocol.unsubscribeAll(this.pubsubTopic, peer)
);

Expand Down Expand Up @@ -217,31 +228,78 @@ export class SubscriptionManager implements ISubscriptionSDK {
}
}
}
return result;
}

// TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463)
private async pingSpecificPeer(peerId: PeerId): Promise<CoreProtocolResult> {
const peer = this.getPeers().find((p) => p.id.equals(peerId));
if (!peer) {
return {
success: null,
failure: {
peerId,
error: ProtocolError.NO_PEER_AVAILABLE
}
};
}

return result;
try {
const result = await this.protocol.ping(peer);
if (result.failure) {
await this.handlePeerFailure(peerId);
} else {
this.peerFailures.delete(peerId.toString());
}
return result;
} catch (error) {
await this.handlePeerFailure(peerId);
return {
success: null,
failure: {
peerId,
error: ProtocolError.GENERIC_FAIL
}
};
}
}

private async handlePeerFailure(peerId: PeerId): Promise<void> {
const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1;
this.peerFailures.set(peerId.toString(), failures);

if (failures > this.maxPingFailures) {
try {
await this.renewAndSubscribePeer(peerId);
this.peerFailures.delete(peerId.toString());
} catch (error) {
log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`);
}
}
}

private startKeepAlivePings(interval: number): void {
private async renewAndSubscribePeer(peerId: PeerId): Promise<Peer> {
const newPeer = await this.renewPeer(peerId);
await this.protocol.subscribe(
this.pubsubTopic,
newPeer,
Array.from(this.subscriptionCallbacks.keys())
);

return newPeer;
}

private startKeepAlivePings(options: SubscribeOptions): void {
const { keepAlive } = options;
if (this.keepAliveTimer) {
log.info("Recurring pings already set up.");
return;
}

this.keepAliveTimer = setInterval(() => {
const run = async (): Promise<void> => {
try {
log.info("Recurring ping to peers.");
await this.ping();
} catch (error) {
log.error("Stopping recurring pings due to failure", error);
this.stopKeepAlivePings();
}
};

void run();
}, interval) as unknown as number;
void this.ping().catch((error) => {
log.error("Error in keep-alive ping cycle:", error);
});
}, keepAlive) as unknown as number;
}

private stopKeepAlivePings(): void {
Expand Down Expand Up @@ -345,7 +403,12 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new SubscriptionManager(pubsubTopic, this.connectedPeers, this.protocol)
new SubscriptionManager(
pubsubTopic,
this.protocol,
() => this.connectedPeers,
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
this.renewPeer.bind(this)
)
);

return {
Expand Down
7 changes: 6 additions & 1 deletion packages/sdk/src/protocols/light_push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
}
if (failure) {
if (failure.peerId) {
await this.renewPeer(failure.peerId);
try {
await this.renewPeer(failure.peerId);
log.info("Renewed peer", failure.peerId.toString());
} catch (error) {
log.error("Failed to renew peer", error);
}
}

failures.push(failure);
Expand Down
Loading
Loading