From 79d7e16bc39ae3df708a97ac1c56a2d71739ef09 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Mon, 29 Jul 2024 13:24:19 +0530 Subject: [PATCH 1/3] chore: sort peers by least active connections for protocols --- packages/core/src/lib/base_protocol.ts | 49 ++++++++++++++++---------- packages/utils/src/libp2p/index.ts | 29 +++++++++++++++ 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 64dd435d7a..e9d21ae79c 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -10,12 +10,19 @@ import { ensureShardingConfigured, Logger } from "@waku/utils"; import { getConnectedPeersForProtocolAndShard, getPeersForProtocol, - sortPeersByLatency + sortPeersByLatency, + sortPeersByLeastActiveConnections } from "@waku/utils/libp2p"; import { filterPeersByDiscovery } from "./filterPeers.js"; import { StreamManager } from "./stream_manager/index.js"; +type GetPeersOptions = { + prioritizeLatency?: boolean; + numPeers: number; + maxBootstrapPeers: number; +}; + /** * A class with predefined helpers, to be used as a base to implement Waku * Protocols. @@ -83,21 +90,19 @@ export class BaseProtocol implements IBaseProtocolCore { * @returns A list of peers that support the protocol sorted by latency. */ public async getPeers( - { - numPeers, - maxBootstrapPeers - }: { - numPeers: number; - maxBootstrapPeers: number; - } = { + { prioritizeLatency, numPeers, maxBootstrapPeers }: GetPeersOptions = { + prioritizeLatency: true, maxBootstrapPeers: 1, numPeers: 0 } ): Promise { + const activeConnections = + this.components.connectionManager.getConnections(); + // Retrieve all connected peers that support the protocol & shard (if configured) const connectedPeersForProtocolAndShard = await getConnectedPeersForProtocolAndShard( - this.components.connectionManager.getConnections(), + activeConnections, this.peerStore, [this.multicodec], this.options?.shardInfo @@ -112,24 +117,32 @@ export class BaseProtocol implements IBaseProtocolCore { maxBootstrapPeers ); - // Sort the peers by latency - const sortedFilteredPeers = await sortPeersByLatency( - this.peerStore, - filteredPeers - ); + let filteredAndSortedPeers: Peer[]; + + if (prioritizeLatency) { + filteredAndSortedPeers = await sortPeersByLatency( + this.peerStore, + filteredPeers + ); + } else { + filteredAndSortedPeers = sortPeersByLeastActiveConnections( + filteredPeers, + activeConnections + ); + } - if (sortedFilteredPeers.length === 0) { + if (filteredAndSortedPeers.length === 0) { this.log.warn( "No peers found. Ensure you have a connection to the network." ); } - if (sortedFilteredPeers.length < numPeers) { + if (filteredAndSortedPeers.length < numPeers) { this.log.warn( - `Only ${sortedFilteredPeers.length} peers found. Requested ${numPeers}.` + `Only ${filteredAndSortedPeers.length} peers found. Requested ${numPeers}.` ); } - return sortedFilteredPeers; + return filteredAndSortedPeers; } } diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index fa9b97c85d..a8f925d6a9 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -2,6 +2,7 @@ import type { Connection, Peer, PeerStore } from "@libp2p/interface"; import { ShardInfo } from "@waku/interfaces"; import { bytesToUtf8 } from "../bytes/index.js"; +import { isDefined } from "../common/is_defined.js"; import { decodeRelayShard } from "../common/relay_shard_codec.js"; /** @@ -51,6 +52,34 @@ export async function sortPeersByLatency( .map((result) => result.peer); } +/** + * Sorts the list of peers based on the number of active connections. + * @param peers A list of all available peers, that support the protocol and shard. + * @param connections A list of all active connections. + * @returns A list of peers sorted by the number of active connections. + */ +export function sortPeersByLeastActiveConnections( + peers: Peer[], + connections: Connection[] +): Peer[] { + const activePeers = connections + .filter((conn) => + conn.streams.map((stream) => stream.protocol).filter(isDefined) + ) + .map((conn) => conn.remotePeer); + + return peers.sort((a, b) => { + const aConnections = activePeers.filter((peerId) => + peerId.equals(a.id) + ).length; + const bConnections = activePeers.filter((peerId) => + peerId.equals(b.id) + ).length; + + return bConnections - aConnections; + }); +} + /** * Returns the list of peers that supports the given protocol. */ From a69d0c252a2df3ec91a828d528e599616fd131ce Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Mon, 29 Jul 2024 13:47:15 +0530 Subject: [PATCH 2/3] chore: wrap option in a type --- packages/core/src/lib/base_protocol.ts | 4 +++- packages/interfaces/src/protocols.ts | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index e9d21ae79c..eec9bdce82 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -90,12 +90,14 @@ export class BaseProtocol implements IBaseProtocolCore { * @returns A list of peers that support the protocol sorted by latency. */ public async getPeers( - { prioritizeLatency, numPeers, maxBootstrapPeers }: GetPeersOptions = { + options: GetPeersOptions = { prioritizeLatency: true, maxBootstrapPeers: 1, numPeers: 0 } ): Promise { + const { maxBootstrapPeers, numPeers, prioritizeLatency } = options; + const activeConnections = this.components.connectionManager.getConnections(); diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 6d0a051513..cb9252cecb 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -120,6 +120,10 @@ export type ProtocolCreateOptions = { * Defaults to 3. */ numPeersToUse?: number; + /** + * Prioritize latency over decentralization when selecting peers. + */ + prioritizeLatency?: boolean; /** * Byte array used as key for the noise protocol used for connection encryption * by [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) From be9588d90e26c17adeabe5df26640e7deff5467b Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Tue, 6 Aug 2024 13:31:53 +0700 Subject: [PATCH 3/3] chore: choose peers from the connected pool, instead of dialing them --- packages/core/src/lib/base_protocol.ts | 12 +++++++----- packages/sdk/src/protocols/base_protocol.ts | 16 +++++++++------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index eec9bdce82..eeb73e9200 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -23,6 +23,12 @@ type GetPeersOptions = { maxBootstrapPeers: number; }; +const DEFAULT_GET_PEERS_OPTIONS: GetPeersOptions = { + prioritizeLatency: true, + maxBootstrapPeers: 1, + numPeers: 0 +}; + /** * A class with predefined helpers, to be used as a base to implement Waku * Protocols. @@ -90,11 +96,7 @@ export class BaseProtocol implements IBaseProtocolCore { * @returns A list of peers that support the protocol sorted by latency. */ public async getPeers( - options: GetPeersOptions = { - prioritizeLatency: true, - maxBootstrapPeers: 1, - numPeers: 0 - } + options: GetPeersOptions = DEFAULT_GET_PEERS_OPTIONS ): Promise { const { maxBootstrapPeers, numPeers, prioritizeLatency } = options; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 8b6343a895..e5418b47b8 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -198,11 +198,6 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { this.log.info(`Finding and adding ${numPeers} new peers`); try { const additionalPeers = await this.findAdditionalPeers(numPeers); - const dials = additionalPeers.map((peer) => - this.connectionManager.attemptDial(peer.id) - ); - - await Promise.all(dials); const updatedPeers = [...this.peers, ...additionalPeers]; this.updatePeers(updatedPeers); @@ -227,10 +222,17 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { private async findAdditionalPeers(numPeers: number): Promise { this.log.info(`Finding ${numPeers} additional peers`); try { - let newPeers = await this.core.allPeers(); + let newPeers = await this.core.getPeers({ + maxBootstrapPeers: 0, + numPeers: 0 + }); if (newPeers.length === 0) { - this.log.warn("No new peers found."); + this.log.warn("No new peers found, trying with bootstrap peers"); + newPeers = await this.core.getPeers({ + maxBootstrapPeers: numPeers, + numPeers: 0 + }); } newPeers = newPeers