Skip to content

Commit

Permalink
refactor: tracker announce & disconnect logic
Browse files Browse the repository at this point in the history
  • Loading branch information
DimaDemchenko committed Jun 20, 2024
1 parent be047d4 commit c5f6a33
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 147 deletions.
254 changes: 115 additions & 139 deletions lib/fast-tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class FastTracker implements Tracker {
public constructor(settings?: Partial<Settings>) {
this.settings = {
maxOffers: 20,
announceInterval: 120,
announceInterval: 20,
...settings,
};
this.startClearPeersInterval();
Expand All @@ -54,15 +54,56 @@ export class FastTracker implements Tracker {
this.clearPeersInterval = setInterval(() => {
const now = performance.now();
for (const peer of this.#peersContext.values()) {
if (now - peer.lastAccessed > this.settings.announceInterval * 2) {
// TODO: disconnect only current peer
this.disconnectPeer(peer.ws as unknown as SocketContext);
if (
now - peer.lastAccessed >
this.settings.announceInterval * 2 * 1000
) {
this.removePeer(peer);
}
}
}, this.settings.announceInterval * 1000);
}
}

private removeEmptySwarm(swarm: Swarm) {
if (swarm.peers.length === 0) {
if (debugEnabled) {
debug(
"disconnect peer: swarm removed (empty)",
Buffer.from(swarm.infoHash).toString("hex"),
);
}
this.#swarms.delete(swarm.infoHash);
}
}

private removePeer(peerContext: PeerContext) {
const swarm = this.#swarms.get(peerContext.swarmInfoHash);

if (swarm === undefined) {
throw new TrackerError("disconnect peer: swarm is undefined");
}

swarm.removePeer(peerContext);

this.removeEmptySwarm(swarm);

const peerId = peerContext.peerId;

if (debugEnabled) {
debug(
"disconnect peer: peer",
Buffer.from(peerId).toString("hex"),
"removed from swarm",
Buffer.from(swarm.infoHash).toString("hex"),
);
}

this.#peersContext.delete(peerId);

delete (peerContext.socketContext as unknown as UnknownObject)[peerId];
}

public get swarms(): ReadonlyMap<string, { peers: readonly PeerContext[] }> {
return this.#swarms;
}
Expand All @@ -82,7 +123,7 @@ export class FastTracker implements Tracker {
} else if (event === "started") {
this.processAnnounce(json, peer);
} else if (event === "stopped") {
this.processStop(json, peer);
this.processStop(json);
} else if (event === "completed") {
this.processAnnounce(json, peer, true);
} else {
Expand All @@ -95,120 +136,94 @@ export class FastTracker implements Tracker {
}
}

public disconnectPeer(peerSocket: SocketContext): void {
const hashesToDelete: string[] = [];

for (const infoHash in peerSocket) {
const swarm = (peerSocket as unknown as UnknownObject)[infoHash];

if (!(swarm instanceof Swarm)) continue;

for (const peerId of peerSocket.peerIdsOnSocket) {
const peer = this.#peersContext.get(peerId);
if (peer === undefined) continue;

const isPeerRemoved = swarm.removePeer(peer);

if (debugEnabled && isPeerRemoved) {
debug(
"disconnect peer: peer",
Buffer.from(peerId).toString("hex"),
"removed from swarm",
Buffer.from(infoHash).toString("hex"),
);
}
}

hashesToDelete.push(infoHash);

if (swarm.peers.length === 0) {
if (debugEnabled) {
debug(
"disconnect peer: swarm removed (empty)",
Buffer.from(swarm.infoHash).toString("hex"),
);
}
this.#swarms.delete(swarm.infoHash);
}
}

if (hashesToDelete.length === 0) return;

for (const infoHash of hashesToDelete) {
delete (peerSocket as unknown as UnknownObject)[infoHash];
}
public disconnectPeersFromSocket(peerSocket: SocketContext): void {
for (const peerId in peerSocket) {
const peerContext = (peerSocket as unknown as UnknownObject)[
peerId
] as PeerContext;

for (const peerId of peerSocket.peerIdsOnSocket) {
this.#peersContext.delete(peerId);
if (peerContext.peerId === undefined) continue;
this.removePeer(peerContext);
}

peerSocket.peerIdsOnSocket.clear();
}

private processAnnounce(
json: UnknownObject,
peer: SocketContext,
completed = false,
): void {
const infoHash = json.info_hash;
const infoHash = json.info_hash as string;
const peerId = json.peer_id as string;
let swarm: unknown = undefined;

if (peer.peerIdsOnSocket === undefined) {
peer.peerIdsOnSocket = new Set([peerId]);
}
peer.peerIdsOnSocket.add(peerId);
let swarm: Swarm | undefined;
const isPeerCompleted = completed || json.left === 0;

const peerContext: PeerContext = {
id: peerId,
sendMessage: peer.sendMessage,
ws: peer.ws,
lastAccessed: performance.now(),
};
let peerContext = (peer as unknown as UnknownObject)[peerId] as
| PeerContext
| undefined;

this.#peersContext.set(peerId, peerContext);
if (peerContext === undefined) {
peerContext = this.#peersContext.get(peerId);

swarm = (peer as unknown as UnknownObject)[infoHash as string];
if (peerContext !== undefined) return;

const isPeerCompleted = completed || json.left === 0;
peerContext = {
peerId,
sendMessage: peer.sendMessage,
socketContext: peer,
lastAccessed: performance.now(),
swarmInfoHash: infoHash,
};

if (swarm === undefined) {
swarm = this.addPeerToSwarm(peer, peerContext, infoHash, isPeerCompleted);
} else if (swarm instanceof Swarm) {
if (debugEnabled) {
debug(
"announce: peer",
Buffer.from(peerId).toString("hex"),
"in swarm",
Buffer.from(infoHash as string).toString("hex"),

(peer as unknown as UnknownObject)[peerId] = peerContext;
this.#peersContext.set(peerId, peerContext);
} else if (peerContext.peerId === peerId) {

Check failure on line 181 in lib/fast-tracker.ts

View workflow job for this annotation

GitHub Actions / build-pr

test/announce.test.ts > announce > should send offers to peers in a swarm

TypeError: Cannot read properties of null (reading 'peerId') ❯ FastTracker.processAnnounce lib/fast-tracker.ts:181:28 ❯ FastTracker.processMessage lib/fast-tracker.ts:124:14 ❯ test/announce.test.ts:181:13

Check failure on line 181 in lib/fast-tracker.ts

View workflow job for this annotation

GitHub Actions / build-pr

test/announce.test.ts > announce > should process answer messages

TypeError: Cannot read properties of null (reading 'peerId') ❯ FastTracker.processAnnounce lib/fast-tracker.ts:181:28 ❯ FastTracker.processMessage lib/fast-tracker.ts:124:14 ❯ test/announce.test.ts:464:13
peerContext.lastAccessed = performance.now();

if (infoHash !== peerContext.swarmInfoHash) {
const oldSwarm = this.#swarms.get(peerContext.swarmInfoHash);

if (oldSwarm === undefined) {
throw new TrackerError("announce: old swarm is undefined");
}

oldSwarm.removePeer(peerContext);

this.removeEmptySwarm(oldSwarm);

peerContext.swarmInfoHash = infoHash;
swarm = this.addPeerToSwarm(
peer,
peerContext,
infoHash,
isPeerCompleted,
);
}
} else {
swarm = this.#swarms.get(peerContext.swarmInfoHash);

if (isPeerCompleted) {
swarm.setCompleted(peerContext);
if (swarm === undefined) {
throw new TrackerError("announce: swarm is undefined");
}

if (isPeerCompleted) swarm.setCompleted(peerContext);
}
} else {
throw new TrackerError("announce: illegal info_hash field");
throw new TrackerError("announce: peerId mismatch");
}

peer.sendMessage(
{
action: "announce",
interval: this.settings.announceInterval,
info_hash: infoHash,
complete: (swarm as Swarm).completedCount,
incomplete:
(swarm as Swarm).peers.length - (swarm as Swarm).completedCount,
complete: swarm.completedCount,
incomplete: swarm.peers.length - swarm.completedCount,
},
peer,
);

this.sendOffersToPeers(
json,
(swarm as Swarm).peers,
peerContext,
infoHash as string,
);
this.sendOffersToPeers(json, swarm.peers, peerContext, infoHash);
}

private addPeerToSwarm(
Expand Down Expand Up @@ -238,14 +253,13 @@ export class FastTracker implements Tracker {
if (debugEnabled) {
debug(
"announce: peer",
Buffer.from(peerContext.id).toString("hex"),
Buffer.from(peerContext.peerId).toString("hex"),
"added to swarm",
Buffer.from(infoHash as string).toString("hex"),
);
}

swarm.addPeer(peerContext, completed);
(peer as unknown as UnknownObject)[infoHash as string] = swarm;
return swarm;
}

Expand Down Expand Up @@ -286,8 +300,8 @@ export class FastTracker implements Tracker {
if (toPeer !== peer) {
sendOffer(
offersIterator.next().value,
peer.id,
toPeer as unknown as SocketContext,
peer.peerId,
toPeer.socketContext,
infoHash,
);
}
Expand All @@ -302,12 +316,7 @@ export class FastTracker implements Tracker {
if (toPeer === peer) {
i--; // do one more iteration
} else {
sendOffer(
offers[i],
peer.id,
toPeer as unknown as SocketContext,
infoHash,
);
sendOffer(offers[i], peer.peerId, toPeer.socketContext, infoHash);
}

peerIndex++;
Expand All @@ -331,7 +340,7 @@ export class FastTracker implements Tracker {
}

delete json.to_peer_id;
toPeer.sendMessage(json, toPeer as unknown as SocketContext);
toPeer.sendMessage(json, toPeer.socketContext);

if (debugEnabled) {
debug(
Expand All @@ -343,42 +352,13 @@ export class FastTracker implements Tracker {
}
}

private processStop(json: UnknownObject, peerSocket: SocketContext): void {
private processStop(json: UnknownObject): void {
const peerId = json.peer_id as string;
const infoHash = json.info_hash as string;

const peer = this.#peersContext.get(peerId);
if (peer === undefined) return;

const swarm = (peerSocket as unknown as UnknownObject)[infoHash];

if (!(swarm instanceof Swarm)) {
throw new TrackerError("stop: peer is not in the swarm");
}

swarm.removePeer(peer);
delete (peerSocket as unknown as UnknownObject)[infoHash];

if (debugEnabled) {
debug(
"stop: peer",
Buffer.from(peerId).toString("hex"),
"removed from swarm",
Buffer.from(infoHash).toString("hex"),
);
}

if (swarm.peers.length === 0) {
if (debugEnabled) {
debug(
"stop: swarm removed (empty)",
Buffer.from(swarm.infoHash).toString("hex"),
);
}
this.#swarms.delete(swarm.infoHash);
}

this.#peersContext.delete(peerId);
this.removePeer(peer);
}

private processScrape(json: UnknownObject, peer: SocketContext): void {
Expand Down Expand Up @@ -455,17 +435,15 @@ class Swarm {
if (this.completedPeers === undefined) {
this.completedPeers = new Set();
}
this.completedPeers.add(peer.id);
this.completedPeers.add(peer.peerId);
this.completedCount++;
}
}

public removePeer(peer: PeerContext): boolean {
public removePeer(peer: PeerContext) {
const index = this.#peers.indexOf(peer);

if (index === -1) return false;

if (this.completedPeers?.delete(peer.id) === true) {
if (this.completedPeers?.delete(peer.peerId) === true) {
this.completedCount--;
}

Expand All @@ -474,17 +452,15 @@ class Swarm {
if (index < this.#peers.length) {
this.#peers[index] = last;
}

return true;
}

public setCompleted(peer: PeerContext): void {
if (this.completedPeers === undefined) {
this.completedPeers = new Set();
}

if (!this.completedPeers.has(peer.id)) {
this.completedPeers.add(peer.id);
if (!this.completedPeers.has(peer.peerId)) {
this.completedPeers.add(peer.peerId);
this.completedCount++;
}
}
Expand Down
Loading

0 comments on commit c5f6a33

Please sign in to comment.