Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lightPush): improve peer usage and improve readability #2155

Merged
merged 12 commits into from
Oct 4, 2024
Merged
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
"it-all": "^3.0.4",
"it-length-prefixed": "^9.0.4",
"it-pipe": "^3.0.1",
"p-event": "^6.0.1",
"uint8arraylist": "^2.4.3",
"uuid": "^9.0.0"
},
Expand Down
4 changes: 1 addition & 3 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ export * as waku_light_push from "./lib/light_push/index.js";
export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";

export * as waku_store from "./lib/store/index.js";
export { StoreCore } from "./lib/store/index.js";

export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";
export { StoreCore, StoreCodec } from "./lib/store/index.js";

export { ConnectionManager } from "./lib/connection_manager.js";

Expand Down
2 changes: 1 addition & 1 deletion packages/interfaces/src/light_push.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IBaseProtocolCore, IBaseProtocolSDK } from "./protocols.js";
import type { ISender } from "./sender.js";

export type ILightPushSDK = ISender &
export type ILightPush = ISender &
weboko marked this conversation as resolved.
Show resolved Hide resolved
IBaseProtocolSDK & { protocol: IBaseProtocolCore };
2 changes: 1 addition & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export type ProtocolCreateOptions = {
* This is used by:
* - Light Push to send messages,
* - Filter to retrieve messages.
* Defaults to 3.
* Defaults to 2.
*/
numPeersToUse?: number;
/**
Expand Down
6 changes: 3 additions & 3 deletions packages/interfaces/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { IConnectionManager } from "./connection_manager.js";
import type { IFilterSDK } from "./filter.js";
import { IHealthManager } from "./health_manager.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPushSDK } from "./light_push.js";
import type { ILightPush } from "./light_push.js";
import { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js";
import type { IStoreSDK } from "./store.js";
Expand All @@ -15,7 +15,7 @@ export interface Waku {
relay?: IRelay;
store?: IStoreSDK;
filter?: IFilterSDK;
lightPush?: ILightPushSDK;
lightPush?: ILightPush;

connectionManager: IConnectionManager;

Expand All @@ -36,7 +36,7 @@ export interface LightNode extends Waku {
relay: undefined;
store: IStoreSDK;
filter: IFilterSDK;
lightPush: ILightPushSDK;
lightPush: ILightPush;
}

export interface RelayNode extends Waku {
Expand Down
3 changes: 2 additions & 1 deletion packages/sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
"@waku/proto": "^0.0.8",
"@waku/utils": "0.0.20",
"@waku/message-hash": "0.1.16",
"libp2p": "^1.8.1"
"libp2p": "^1.8.1",
"p-event": "^6.0.1"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^25.0.7",
Expand Down
6 changes: 4 additions & 2 deletions packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export { waitForRemotePeer, createEncoder, createDecoder } from "@waku/core";
export { createEncoder, createDecoder } from "@waku/core";
export {
DecodedMessage,
Decoder,
Expand All @@ -14,10 +14,12 @@ export {
defaultLibp2p,
createLibp2pAndUpdateOptions
} from "./create/index.js";
export { wakuLightPush } from "./protocols/lightpush/index.js";
export { wakuLightPush } from "./protocols/light_push/index.js";
export { wakuFilter } from "./protocols/filter/index.js";
export { wakuStore } from "./protocols/store/index.js";

export { waitForRemotePeer } from "./wait_for_remote_peer.js";

export * as waku from "@waku/core";
export * as utils from "@waku/utils";
export * from "@waku/interfaces";
2 changes: 1 addition & 1 deletion packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ interface Options {
}

const RENEW_TIME_LOCK_DURATION = 30 * 1000;
const DEFAULT_NUM_PEERS_TO_USE = 2;
export const DEFAULT_NUM_PEERS_TO_USE = 2;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;

export class BaseProtocolSDK implements IBaseProtocolSDK {
Expand Down
1 change: 1 addition & 0 deletions packages/sdk/src/protocols/light_push/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { wakuLightPush } from "./light_push.js";
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, LightPushCore } from "@waku/core";
import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager, LightPushCodec, LightPushCore } from "@waku/core";
import {
Failure,
type IEncoder,
ILightPushSDK,
ILightPush,
type IMessage,
type Libp2p,
type ProtocolCreateOptions,
Expand All @@ -19,14 +19,14 @@ import { BaseProtocolSDK } from "../base_protocol.js";

const log = new Logger("sdk:light-push");

class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
class LightPush extends BaseProtocolSDK implements ILightPush {
public readonly protocol: LightPushCore;

private readonly reliabilityMonitor: SenderReliabilityMonitor;

public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
private libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
super(
Expand All @@ -49,11 +49,6 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
message: IMessage,
_options?: ProtocolUseOptions
): Promise<SDKProtocolResult> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

options are not being accounted for in send anymore

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly! as I mentioned here it will be follow up

#2137 (comment)

const options = {
autoRetry: true,
..._options
} as ProtocolUseOptions;

const successes: PeerId[] = [];
const failures: Failure[] = [];

Expand All @@ -63,17 +58,17 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
} catch (error) {
log.error("Failed to send waku light push: pubsub topic not configured");
return {
successes,
failures: [
{
error: ProtocolError.TOPIC_NOT_CONFIGURED
}
],
successes: []
]
};
}

const hasPeers = await this.hasPeers(options);
if (!hasPeers) {
const peers = await this.getConnectedPeers();
if (peers.length === 0) {
return {
successes,
failures: [
Expand All @@ -84,40 +79,37 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
};
}

const sendPromises = this.connectedPeers.map((peer) =>
this.protocol.send(encoder, message, peer)
const results = await Promise.allSettled(
peers.map((peer) => this.protocol.send(encoder, message, peer))
);

const results = await Promise.allSettled(sendPromises);

for (const result of results) {
if (result.status === "fulfilled") {
const { failure, success } = result.value;
if (success) {
successes.push(success);
}
if (failure) {
failures.push(failure);
if (failure.peerId) {
const peer = this.connectedPeers.find((connectedPeer) =>
connectedPeer.id.equals(failure.peerId)
);
if (peer) {
log.info(`
Failed to send message to peer ${failure.peerId}.
Retrying the message with the same peer in the background.
If this fails, the peer will be renewed.
`);
void this.reliabilityMonitor.attemptRetriesOrRenew(
failure.peerId,
() => this.protocol.send(encoder, message, peer)
);
}
}
}
} else {
if (result.status !== "fulfilled") {
log.error("Failed unexpectedly while sending:", result.reason);
failures.push({ error: ProtocolError.GENERIC_FAIL });
continue;
}

const { failure, success } = result.value;

if (success) {
successes.push(success);
continue;
}

if (failure) {
failures.push(failure);

const connectedPeer = this.connectedPeers.find((connectedPeer) =>
connectedPeer.id.equals(failure.peerId)
);

if (connectedPeer) {
void this.reliabilityMonitor.attemptRetriesOrRenew(
connectedPeer.id,
() => this.protocol.send(encoder, message, connectedPeer)
);
Comment on lines +108 to +110
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we test this?
So seems like we are not relying on PeerManager anymore to retrieve peers to be used for the protocols: moved from hasPeers() which relies on PeerManager -> getConnectedPeers() which gets all available connections, I'm curious how renewing peers would affect management. Wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is something that was there so I assume we should have tests there

for connections - yes, we move away but not from hasPeers but form BaseProtocolSDK.connectedPeers that proved to be out of sync quite often

the reason for it is:

  • to simplify process for LightPush
  • alight with status-go usage of LightPush that proved to be reliable

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe hasPeers would work with tests. Considering if it would be required to double check with getConnectedPeers, especially as we do renewals and what not

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BaseProtocolSDK.connectedPeers() was being inconsistent because of race conditions in shared peer management, which isn't the case with #2137

}
}
}

Expand All @@ -126,11 +118,38 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
failures
};
}

private async getConnectedPeers(): Promise<Peer[]> {
const peerIDs = this.libp2p
.getConnections()
.filter((c) => c.status === "open")
.sort((left, right) => right.timeline.open - left.timeline.open)
.map((c) => c.remotePeer);

weboko marked this conversation as resolved.
Show resolved Hide resolved
if (peerIDs.length === 0) {
return [];
}

const peers = await Promise.all(
peerIDs.map(async (id) => {
try {
return await this.libp2p.peerStore.get(id);
} catch (e) {
return null;
}
})
);

return peers
.filter((p) => !!p)
.filter((p) => (p as Peer).protocols.includes(LightPushCodec))
.slice(0, this.numPeersToUse) as Peer[];
}
}

export function wakuLightPush(
connectionManager: ConnectionManager,
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => ILightPushSDK {
return (libp2p: Libp2p) => new LightPushSDK(connectionManager, libp2p, init);
): (libp2p: Libp2p) => ILightPush {
return (libp2p: Libp2p) => new LightPush(connectionManager, libp2p, init);
}
Loading
Loading