Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sort peers for protocol by least active connections #2089

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 38 additions & 21 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,25 @@ import { ensureShardingConfigured, Logger } from "@waku/utils";
import {
getConnectedPeersForProtocolAndShard,
getPeersForProtocol,
sortPeersByLatency
sortPeersByLatency,
sortPeersByLeastActiveConnections
} from "@waku/utils/libp2p";

import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager/index.js";

type GetPeersOptions = {
prioritizeLatency?: boolean;
numPeers: number;
maxBootstrapPeers: number;
};

const DEFAULT_GET_PEERS_OPTIONS: GetPeersOptions = {
prioritizeLatency: true,
maxBootstrapPeers: 1,
numPeers: 0
};

/**
* A class with predefined helpers, to be used as a base to implement Waku
* Protocols.
Expand Down Expand Up @@ -83,21 +96,17 @@ export class BaseProtocol implements IBaseProtocolCore {
* @returns A list of peers that support the protocol sorted by latency.
*/
public async getPeers(
{
numPeers,
maxBootstrapPeers
}: {
numPeers: number;
maxBootstrapPeers: number;
} = {
maxBootstrapPeers: 1,
numPeers: 0
}
options: GetPeersOptions = DEFAULT_GET_PEERS_OPTIONS
): Promise<Peer[]> {
const { maxBootstrapPeers, numPeers, prioritizeLatency } = options;

const activeConnections =
this.components.connectionManager.getConnections();

// Retrieve all connected peers that support the protocol & shard (if configured)
const connectedPeersForProtocolAndShard =
await getConnectedPeersForProtocolAndShard(
this.components.connectionManager.getConnections(),
activeConnections,
this.peerStore,
[this.multicodec],
this.options?.shardInfo
Expand All @@ -112,24 +121,32 @@ export class BaseProtocol implements IBaseProtocolCore {
maxBootstrapPeers
);

// Sort the peers by latency
const sortedFilteredPeers = await sortPeersByLatency(
this.peerStore,
filteredPeers
);
let filteredAndSortedPeers: Peer[];

if (prioritizeLatency) {
filteredAndSortedPeers = await sortPeersByLatency(
this.peerStore,
filteredPeers
);
} else {
filteredAndSortedPeers = sortPeersByLeastActiveConnections(
filteredPeers,
activeConnections
);
}

if (sortedFilteredPeers.length === 0) {
if (filteredAndSortedPeers.length === 0) {
this.log.warn(
"No peers found. Ensure you have a connection to the network."
);
}

if (sortedFilteredPeers.length < numPeers) {
if (filteredAndSortedPeers.length < numPeers) {
this.log.warn(
`Only ${sortedFilteredPeers.length} peers found. Requested ${numPeers}.`
`Only ${filteredAndSortedPeers.length} peers found. Requested ${numPeers}.`
);
}

return sortedFilteredPeers;
return filteredAndSortedPeers;
}
}
4 changes: 4 additions & 0 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ export type ProtocolCreateOptions = {
* Defaults to 3.
*/
numPeersToUse?: number;
/**
* Prioritize latency over decentralization when selecting peers.
*/
prioritizeLatency?: boolean;
/**
* Byte array used as key for the noise protocol used for connection encryption
* by [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)
Expand Down
16 changes: 9 additions & 7 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,6 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
this.log.info(`Finding and adding ${numPeers} new peers`);
try {
const additionalPeers = await this.findAdditionalPeers(numPeers);
const dials = additionalPeers.map((peer) =>
this.connectionManager.attemptDial(peer.id)
);

await Promise.all(dials);

const updatedPeers = [...this.peers, ...additionalPeers];
this.updatePeers(updatedPeers);
Expand All @@ -227,10 +222,17 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
private async findAdditionalPeers(numPeers: number): Promise<Peer[]> {
this.log.info(`Finding ${numPeers} additional peers`);
try {
let newPeers = await this.core.allPeers();
let newPeers = await this.core.getPeers({
maxBootstrapPeers: 0,
numPeers: 0
});

if (newPeers.length === 0) {
this.log.warn("No new peers found.");
this.log.warn("No new peers found, trying with bootstrap peers");
newPeers = await this.core.getPeers({
maxBootstrapPeers: numPeers,
numPeers: 0
});
}

newPeers = newPeers
Expand Down
29 changes: 29 additions & 0 deletions packages/utils/src/libp2p/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Connection, Peer, PeerStore } from "@libp2p/interface";
import { ShardInfo } from "@waku/interfaces";

import { bytesToUtf8 } from "../bytes/index.js";
import { isDefined } from "../common/is_defined.js";
import { decodeRelayShard } from "../common/relay_shard_codec.js";

/**
Expand Down Expand Up @@ -51,6 +52,34 @@ export async function sortPeersByLatency(
.map((result) => result.peer);
}

/**
* Sorts the list of peers based on the number of active connections.
* @param peers A list of all available peers, that support the protocol and shard.
* @param connections A list of all active connections.
* @returns A list of peers sorted by the number of active connections.
*/
export function sortPeersByLeastActiveConnections(
peers: Peer[],
connections: Connection[]
): Peer[] {
const activePeers = connections
.filter((conn) =>
conn.streams.map((stream) => stream.protocol).filter(isDefined)
)
.map((conn) => conn.remotePeer);

return peers.sort((a, b) => {
const aConnections = activePeers.filter((peerId) =>
peerId.equals(a.id)
).length;
const bConnections = activePeers.filter((peerId) =>
peerId.equals(b.id)
).length;

return bConnections - aConnections;
});
}

/**
* Returns the list of peers that supports the given protocol.
*/
Expand Down
Loading