Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enhancing protocol peer management with mutex locks #2146

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 18 additions & 23 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import type { Libp2p } from "@libp2p/interface";
import type { Peer, PeerStore, Stream } from "@libp2p/interface";
import type { Peer, Stream } from "@libp2p/interface";
import type {
IBaseProtocolCore,
Libp2pComponents,
PubsubTopic
} from "@waku/interfaces";
import { Logger, pubsubTopicsToShardInfo } from "@waku/utils";
import {
getConnectedPeersForProtocolAndShard,
getPeersForProtocol,
sortPeersByLatency
} from "@waku/utils/libp2p";
import { Logger } from "@waku/utils";
import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p";

import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager/index.js";
Expand All @@ -26,7 +22,7 @@

protected constructor(
public multicodec: string,
private components: Libp2pComponents,
protected components: Libp2pComponents,
private log: Logger,
public readonly pubsubTopics: PubsubTopic[]
) {
Expand All @@ -50,24 +46,24 @@
return this.streamManager.getStream(peer);
}

public get peerStore(): PeerStore {
return this.components.peerStore;
}

//TODO: move to SDK
/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* the class protocol. Waku may or may not be currently connected to these
* peers.
*/
public async allPeers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
return getPeersForProtocol(this.components.peerStore, [this.multicodec]);
}

public async connectedPeers(): Promise<Peer[]> {
public async connectedPeers(withOpenStreams = false): Promise<Peer[]> {

Check failure on line 59 in packages/core/src/lib/base_protocol.ts

View workflow job for this annotation

GitHub Actions / proto

'withOpenStreams' is assigned a value but never used
const peers = await this.allPeers();
return peers.filter((peer) => {
return (
this.components.connectionManager.getConnections(peer.id).length > 0
const connections = this.components.connectionManager.getConnections(
peer.id
);
return connections.some((c) =>
c.streams.some((s) => s.protocol === this.multicodec)
);
});
}
Expand All @@ -77,9 +73,8 @@
*
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.

* @returns A list of peers that support the protocol sorted by latency.
*/
* @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap.
*/
public async getPeers(
{
numPeers,
Expand All @@ -88,29 +83,29 @@
numPeers: number;
maxBootstrapPeers: number;
} = {
maxBootstrapPeers: 1,
maxBootstrapPeers: 0,
numPeers: 0
}
): Promise<Peer[]> {
// Retrieve all connected peers that support the protocol & shard (if configured)
const connectedPeersForProtocolAndShard =

Check failure on line 91 in packages/core/src/lib/base_protocol.ts

View workflow job for this annotation

GitHub Actions / proto

'connectedPeersForProtocolAndShard' is assigned a value but never used
await getConnectedPeersForProtocolAndShard(
this.components.connectionManager.getConnections(),
this.peerStore,
this.components.peerStore,
[this.multicodec],
pubsubTopicsToShardInfo(this.pubsubTopics)
);

// Filter the peers based on discovery & number of peers requested
const filteredPeers = filterPeersByDiscovery(
connectedPeersForProtocolAndShard,
allAvailableConnectedPeers,
numPeers,
maxBootstrapPeers
);

// Sort the peers by latency
const sortedFilteredPeers = await sortPeersByLatency(
this.peerStore,
this.components.peerStore,
filteredPeers
);

Expand Down
15 changes: 13 additions & 2 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
wakuMessage: WakuMessage,
peerIdStr: string
) => Promise<void>,
private handleError: (error: Error) => Promise<void>,
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
Expand Down Expand Up @@ -301,8 +302,18 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
() => {
log.info("Receiving pipe closed.");
},
(e) => {
log.error("Error with receiving pipe", e);
async (e) => {
log.error(
"Error with receiving pipe",
e,
" -- ",
"on peer ",
connection.remotePeer.toString(),
" -- ",
"stream ",
stream
);
await this.handleError(e);
}
);
} catch (e) {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class Metadata extends BaseProtocol implements IMetadata {
pubsubTopicsToShardInfo(this.pubsubTopics)
);

const peer = await this.peerStore.get(peerId);
const peer = await this.libp2pComponents.peerStore.get(peerId);
if (!peer) {
return {
shardInfo: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
numPeers: BigInt(numPeers)
});

const peer = await this.peerStore.get(peerId);
const peer = await this.components.peerStore.get(peerId);
if (!peer) {
return {
peerInfos: null,
Expand Down
18 changes: 3 additions & 15 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Libp2p } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import type { Peer, PeerStore } from "@libp2p/interface";
import type { Peer } from "@libp2p/interface";

import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";
Expand All @@ -18,14 +18,14 @@ export type IBaseProtocolCore = {
multicodec: string;
peerStore: PeerStore;
allPeers: () => Promise<Peer[]>;
connectedPeers: () => Promise<Peer[]>;
connectedPeers: (withOpenStreams?: boolean) => Promise<Peer[]>;
addLibp2pEventListener: Libp2p["addEventListener"];
removeLibp2pEventListener: Libp2p["removeEventListener"];
};

export type IBaseProtocolSDK = {
readonly connectedPeers: Peer[];
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>;
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer | undefined>;
readonly numPeersToUse: number;
};

Expand All @@ -36,10 +36,6 @@ export type NetworkConfig = StaticSharding | AutoSharding;
* Options for using LightPush and Filter
*/
export type ProtocolUseOptions = {
/**
* Optional flag to enable auto-retry with exponential backoff
*/
autoRetry?: boolean;
/**
* Optional flag to force using all available peers
*/
Expand All @@ -48,14 +44,6 @@ export type ProtocolUseOptions = {
* Optional maximum number of attempts for exponential backoff
*/
maxAttempts?: number;
/**
* Optional initial delay in milliseconds for exponential backoff
*/
initialDelay?: number;
/**
* Optional maximum delay in milliseconds for exponential backoff
*/
maxDelay?: number;
};

export type ProtocolCreateOptions = {
Expand Down
5 changes: 3 additions & 2 deletions packages/sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
},
"dependencies": {
"@chainsafe/libp2p-noise": "^15.1.0",
"@libp2p/bootstrap": "^10.1.2",
"@libp2p/bootstrap": "^10",
"@libp2p/identify": "^2.1.2",
"@libp2p/mplex": "^10.1.2",
"@libp2p/ping": "^1.1.2",
Expand All @@ -67,9 +67,10 @@
"@waku/core": "0.0.32",
"@waku/discovery": "0.0.5",
"@waku/interfaces": "0.0.27",
"@waku/message-hash": "0.1.16",
"@waku/proto": "^0.0.8",
"@waku/utils": "0.0.20",
"@waku/message-hash": "0.1.16",
"async-mutex": "^0.5.0",
"libp2p": "^1.8.1"
},
"devDependencies": {
Expand Down
Loading
Loading