Skip to content

Commit

Permalink
refactor code, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
weboko committed Sep 30, 2024
1 parent d79df71 commit f536302
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 33 deletions.
1 change: 1 addition & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
}
],
"@typescript-eslint/explicit-member-accessibility": "error",
"@typescript-eslint/no-unused-vars": ["warn", { "argsIgnorePattern": "^_" }],
"prettier/prettier": [
"error",
{
Expand Down
3 changes: 2 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
"@types/uuid": "^9.0.8",
"@waku/build-utils": "*",
"chai": "^4.3.10",
"sinon": "^18.0.0",
"cspell": "^8.6.1",
"fast-check": "^3.19.0",
"ignore-loader": "^0.1.2",
Expand Down
161 changes: 161 additions & 0 deletions packages/core/src/lib/stream_manager/stream_manager.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import { Connection, Peer, PeerId, Stream } from "@libp2p/interface";
import { expect } from "chai";
import sinon from "sinon";

import { StreamManager } from "./stream_manager.js";

const MULTICODEC = "/test";

describe.only("StreamManager", () => {
let eventTarget: EventTarget;
let streamManager: StreamManager;

const mockPeer: Peer = {
id: {
toString() {
return "1";
}
}
} as unknown as Peer;

beforeEach(() => {
eventTarget = new EventTarget();
streamManager = new StreamManager(
MULTICODEC,
() => [],
eventTarget.addEventListener.bind(eventTarget)
);
});

it("should return usable stream attached to connection", async () => {
for (const writeStatus of ["ready", "writing"]) {
const con1 = createMockConnection();
con1.streams = [
createMockStream({ id: "1", protocol: MULTICODEC, writeStatus })
];

streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];

const stream = await streamManager.getStream(mockPeer);

expect(stream).not.to.be.undefined;
expect(stream?.id).to.be.eq("1");
}
});

it("should throw if no connection provided", async () => {
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [];

let error: Error | undefined;
try {
await streamManager.getStream(mockPeer);
} catch (e) {
error = e as Error;
}

expect(error).not.to.be.undefined;
expect(error?.message).to.include(mockPeer.id.toString());
expect(error?.message).to.include(MULTICODEC);
});

it("should create a new stream if no existing for protocol found", async () => {
for (const writeStatus of ["done", "closed", "closing"]) {
const con1 = createMockConnection();
con1.streams = [
createMockStream({ id: "1", protocol: MULTICODEC, writeStatus })
];

const newStreamSpy = sinon.spy(async (_protocol, _options) =>
createMockStream({
id: "2",
protocol: MULTICODEC,
writeStatus: "writable"
})
);

con1.newStream = newStreamSpy;
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];

const stream = await streamManager.getStream(mockPeer);

expect(stream).not.to.be.undefined;
expect(stream?.id).to.be.eq("2");

expect(newStreamSpy.calledOnce).to.be.true;
expect(newStreamSpy.calledWith(MULTICODEC)).to.be.true;
}
});

it("peer:update - should do nothing if another protocol hit", async () => {
const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;
eventTarget.dispatchEvent(
new CustomEvent("peer:update", { detail: { peer: { protocols: [] } } })
);

expect(scheduleNewStreamSpy.calledOnce).to.be.false;
});

it("peer:update - should schedule stream creation IF protocol hit AND no stream found on connection", async () => {
const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;
eventTarget.dispatchEvent(
new CustomEvent("peer:update", {
detail: { peer: { protocols: [MULTICODEC] } }
})
);

expect(scheduleNewStreamSpy.calledOnce).to.be.true;
});

it("peer:update - should not schedule stream creation IF protocol hit AND stream found on connection", async () => {
const con1 = createMockConnection();
con1.streams = [
createMockStream({
id: "1",
protocol: MULTICODEC,
writeStatus: "writable"
})
];
streamManager["getConnections"] = (_id) => [con1];

const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;

eventTarget.dispatchEvent(
new CustomEvent("peer:update", {
detail: { peer: { protocols: [MULTICODEC] } }
})
);

expect(scheduleNewStreamSpy.calledOnce).to.be.false;
});
});

type MockConnectionOptions = {
status?: string;
open?: number;
};

function createMockConnection(options: MockConnectionOptions = {}): Connection {
return {
status: options.status || "open",
timeline: {
open: options.open || 1
}
} as Connection;
}

type MockStreamOptions = {
id?: string;
protocol?: string;
writeStatus?: string;
};

function createMockStream(options: MockStreamOptions): Stream {
return {
id: options.id,
protocol: options.protocol,
writeStatus: options.writeStatus || "ready"
} as Stream;
}
23 changes: 8 additions & 15 deletions packages/core/src/lib/stream_manager/stream_manager.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
import type {
Connection,
Peer,
PeerId,
PeerUpdate,
Stream
} from "@libp2p/interface";
import type { Peer, PeerId, PeerUpdate, Stream } from "@libp2p/interface";
import type { Libp2p } from "@waku/interfaces";
import { Logger } from "@waku/utils";

import { selectConnection } from "./utils.js";
import { selectOpenConnection } from "./utils.js";

export class StreamManager {
private readonly log: Logger;
Expand All @@ -17,9 +11,9 @@ export class StreamManager {
private streamPool: Map<string, Promise<void>> = new Map();

public constructor(
public multicodec: string,
public getConnections: Libp2p["getConnections"],
public addEventListener: Libp2p["addEventListener"]
private multicodec: string,
private getConnections: Libp2p["getConnections"],
private addEventListener: Libp2p["addEventListener"]
) {
this.log = new Logger(`stream-manager:${multicodec}`);
this.addEventListener("peer:update", this.handlePeerUpdateStreamPool);
Expand All @@ -46,7 +40,7 @@ export class StreamManager {

private async createStream(peer: Peer, retries = 0): Promise<Stream> {
const connections = this.getConnections(peer.id);
const connection = selectConnection(connections);
const connection = selectOpenConnection(connections);

if (!connection) {
throw new Error(
Expand Down Expand Up @@ -134,9 +128,8 @@ export class StreamManager {
}

private getOpenStreamForCodec(peerId: PeerId): Stream | undefined {
const connection: Connection | undefined = this.getConnections(peerId).find(
(c) => c.status === "open"
);
const connections = this.getConnections(peerId);
const connection = selectOpenConnection(connections);

if (!connection) {
return;
Expand Down
65 changes: 65 additions & 0 deletions packages/core/src/lib/stream_manager/utils.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { Connection } from "@libp2p/interface";
import { expect } from "chai";

import { selectOpenConnection } from "./utils.js";

describe("selectOpenConnection", () => {
it("returns nothing if no connections present", () => {
const connection = selectOpenConnection([]);

expect(connection).to.be.undefined;
});

it("returns only open connection if one present", () => {
let expectedCon = createMockConnection({ id: "1", status: "closed" });
let actualCon = selectOpenConnection([expectedCon]);

expect(actualCon).to.be.undefined;

expectedCon = createMockConnection({ id: "1", status: "open" });
actualCon = selectOpenConnection([expectedCon]);

expect(actualCon).not.to.be.undefined;
expect(actualCon?.id).to.be.eq("1");
});

it("should return no connections if no open connection provided", () => {
const closedCon1 = createMockConnection({ status: "closed" });
const closedCon2 = createMockConnection({ status: "closed" });
const actualCon = selectOpenConnection([closedCon1, closedCon2]);

expect(actualCon).to.be.undefined;
});

it("should select older connection if present", () => {
const con1 = createMockConnection({
status: "open",
open: 10
});
const con2 = createMockConnection({
status: "open",
open: 15
});

const actualCon = selectOpenConnection([con1, con2]);

expect(actualCon).not.to.be.undefined;
expect(actualCon?.timeline.open).to.be.eq(15);
});
});

type MockConnectionOptions = {
id?: string;
status?: string;
open?: number;
};

function createMockConnection(options: MockConnectionOptions = {}): Connection {
return {
id: options.id,
status: options.status,
timeline: {
open: options.open
}
} as Connection;
}
22 changes: 5 additions & 17 deletions packages/core/src/lib/stream_manager/utils.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,10 @@
import type { Connection } from "@libp2p/interface";

export function selectConnection(
export function selectOpenConnection(
connections: Connection[]
): Connection | undefined {
if (!connections.length) return;
if (connections.length === 1) return connections[0];

let latestConnection: Connection | undefined;

connections.forEach((connection) => {
if (connection.status === "open") {
if (!latestConnection) {
latestConnection = connection;
} else if (connection.timeline.open > latestConnection.timeline.open) {
latestConnection = connection;
}
}
});

return latestConnection;
return connections
.filter((c) => c.status === "open")
.sort((left, right) => right.timeline.open - left.timeline.open)
.at(0);
}

0 comments on commit f536302

Please sign in to comment.