From c5f6a33efb850231478b347c16bf6015c3404f92 Mon Sep 17 00:00:00 2001 From: DimaDemchenko Date: Thu, 20 Jun 2024 16:25:12 +0300 Subject: [PATCH] refactor: tracker announce & disconnect logic --- lib/fast-tracker.ts | 254 ++++++++++++++++++++------------------------ lib/tracker.ts | 10 +- lib/uws-tracker.ts | 4 +- 3 files changed, 121 insertions(+), 147 deletions(-) diff --git a/lib/fast-tracker.ts b/lib/fast-tracker.ts index 1674899..626ce1f 100644 --- a/lib/fast-tracker.ts +++ b/lib/fast-tracker.ts @@ -43,7 +43,7 @@ export class FastTracker implements Tracker { public constructor(settings?: Partial) { this.settings = { maxOffers: 20, - announceInterval: 120, + announceInterval: 20, ...settings, }; this.startClearPeersInterval(); @@ -54,15 +54,56 @@ export class FastTracker implements Tracker { this.clearPeersInterval = setInterval(() => { const now = performance.now(); for (const peer of this.#peersContext.values()) { - if (now - peer.lastAccessed > this.settings.announceInterval * 2) { - // TODO: disconnect only current peer - this.disconnectPeer(peer.ws as unknown as SocketContext); + if ( + now - peer.lastAccessed > + this.settings.announceInterval * 2 * 1000 + ) { + this.removePeer(peer); } } }, this.settings.announceInterval * 1000); } } + private removeEmptySwarm(swarm: Swarm) { + if (swarm.peers.length === 0) { + if (debugEnabled) { + debug( + "disconnect peer: swarm removed (empty)", + Buffer.from(swarm.infoHash).toString("hex"), + ); + } + this.#swarms.delete(swarm.infoHash); + } + } + + private removePeer(peerContext: PeerContext) { + const swarm = this.#swarms.get(peerContext.swarmInfoHash); + + if (swarm === undefined) { + throw new TrackerError("disconnect peer: swarm is undefined"); + } + + swarm.removePeer(peerContext); + + this.removeEmptySwarm(swarm); + + const peerId = peerContext.peerId; + + if (debugEnabled) { + debug( + "disconnect peer: peer", + Buffer.from(peerId).toString("hex"), + "removed from swarm", + Buffer.from(swarm.infoHash).toString("hex"), + ); + } + + this.#peersContext.delete(peerId); + + delete (peerContext.socketContext as unknown as UnknownObject)[peerId]; + } + public get swarms(): ReadonlyMap { return this.#swarms; } @@ -82,7 +123,7 @@ export class FastTracker implements Tracker { } else if (event === "started") { this.processAnnounce(json, peer); } else if (event === "stopped") { - this.processStop(json, peer); + this.processStop(json); } else if (event === "completed") { this.processAnnounce(json, peer, true); } else { @@ -95,54 +136,15 @@ export class FastTracker implements Tracker { } } - public disconnectPeer(peerSocket: SocketContext): void { - const hashesToDelete: string[] = []; - - for (const infoHash in peerSocket) { - const swarm = (peerSocket as unknown as UnknownObject)[infoHash]; - - if (!(swarm instanceof Swarm)) continue; - - for (const peerId of peerSocket.peerIdsOnSocket) { - const peer = this.#peersContext.get(peerId); - if (peer === undefined) continue; - - const isPeerRemoved = swarm.removePeer(peer); - - if (debugEnabled && isPeerRemoved) { - debug( - "disconnect peer: peer", - Buffer.from(peerId).toString("hex"), - "removed from swarm", - Buffer.from(infoHash).toString("hex"), - ); - } - } - - hashesToDelete.push(infoHash); - - if (swarm.peers.length === 0) { - if (debugEnabled) { - debug( - "disconnect peer: swarm removed (empty)", - Buffer.from(swarm.infoHash).toString("hex"), - ); - } - this.#swarms.delete(swarm.infoHash); - } - } - - if (hashesToDelete.length === 0) return; - - for (const infoHash of hashesToDelete) { - delete (peerSocket as unknown as UnknownObject)[infoHash]; - } + public disconnectPeersFromSocket(peerSocket: SocketContext): void { + for (const peerId in peerSocket) { + const peerContext = (peerSocket as unknown as UnknownObject)[ + peerId + ] as PeerContext; - for (const peerId of peerSocket.peerIdsOnSocket) { - this.#peersContext.delete(peerId); + if (peerContext.peerId === undefined) continue; + this.removePeer(peerContext); } - - peerSocket.peerIdsOnSocket.clear(); } private processAnnounce( @@ -150,45 +152,64 @@ export class FastTracker implements Tracker { peer: SocketContext, completed = false, ): void { - const infoHash = json.info_hash; + const infoHash = json.info_hash as string; const peerId = json.peer_id as string; - let swarm: unknown = undefined; - - if (peer.peerIdsOnSocket === undefined) { - peer.peerIdsOnSocket = new Set([peerId]); - } - peer.peerIdsOnSocket.add(peerId); + let swarm: Swarm | undefined; + const isPeerCompleted = completed || json.left === 0; - const peerContext: PeerContext = { - id: peerId, - sendMessage: peer.sendMessage, - ws: peer.ws, - lastAccessed: performance.now(), - }; + let peerContext = (peer as unknown as UnknownObject)[peerId] as + | PeerContext + | undefined; - this.#peersContext.set(peerId, peerContext); + if (peerContext === undefined) { + peerContext = this.#peersContext.get(peerId); - swarm = (peer as unknown as UnknownObject)[infoHash as string]; + if (peerContext !== undefined) return; - const isPeerCompleted = completed || json.left === 0; + peerContext = { + peerId, + sendMessage: peer.sendMessage, + socketContext: peer, + lastAccessed: performance.now(), + swarmInfoHash: infoHash, + }; - if (swarm === undefined) { swarm = this.addPeerToSwarm(peer, peerContext, infoHash, isPeerCompleted); - } else if (swarm instanceof Swarm) { - if (debugEnabled) { - debug( - "announce: peer", - Buffer.from(peerId).toString("hex"), - "in swarm", - Buffer.from(infoHash as string).toString("hex"), + + (peer as unknown as UnknownObject)[peerId] = peerContext; + this.#peersContext.set(peerId, peerContext); + } else if (peerContext.peerId === peerId) { + peerContext.lastAccessed = performance.now(); + + if (infoHash !== peerContext.swarmInfoHash) { + const oldSwarm = this.#swarms.get(peerContext.swarmInfoHash); + + if (oldSwarm === undefined) { + throw new TrackerError("announce: old swarm is undefined"); + } + + oldSwarm.removePeer(peerContext); + + this.removeEmptySwarm(oldSwarm); + + peerContext.swarmInfoHash = infoHash; + swarm = this.addPeerToSwarm( + peer, + peerContext, + infoHash, + isPeerCompleted, ); - } + } else { + swarm = this.#swarms.get(peerContext.swarmInfoHash); - if (isPeerCompleted) { - swarm.setCompleted(peerContext); + if (swarm === undefined) { + throw new TrackerError("announce: swarm is undefined"); + } + + if (isPeerCompleted) swarm.setCompleted(peerContext); } } else { - throw new TrackerError("announce: illegal info_hash field"); + throw new TrackerError("announce: peerId mismatch"); } peer.sendMessage( @@ -196,19 +217,13 @@ export class FastTracker implements Tracker { action: "announce", interval: this.settings.announceInterval, info_hash: infoHash, - complete: (swarm as Swarm).completedCount, - incomplete: - (swarm as Swarm).peers.length - (swarm as Swarm).completedCount, + complete: swarm.completedCount, + incomplete: swarm.peers.length - swarm.completedCount, }, peer, ); - this.sendOffersToPeers( - json, - (swarm as Swarm).peers, - peerContext, - infoHash as string, - ); + this.sendOffersToPeers(json, swarm.peers, peerContext, infoHash); } private addPeerToSwarm( @@ -238,14 +253,13 @@ export class FastTracker implements Tracker { if (debugEnabled) { debug( "announce: peer", - Buffer.from(peerContext.id).toString("hex"), + Buffer.from(peerContext.peerId).toString("hex"), "added to swarm", Buffer.from(infoHash as string).toString("hex"), ); } swarm.addPeer(peerContext, completed); - (peer as unknown as UnknownObject)[infoHash as string] = swarm; return swarm; } @@ -286,8 +300,8 @@ export class FastTracker implements Tracker { if (toPeer !== peer) { sendOffer( offersIterator.next().value, - peer.id, - toPeer as unknown as SocketContext, + peer.peerId, + toPeer.socketContext, infoHash, ); } @@ -302,12 +316,7 @@ export class FastTracker implements Tracker { if (toPeer === peer) { i--; // do one more iteration } else { - sendOffer( - offers[i], - peer.id, - toPeer as unknown as SocketContext, - infoHash, - ); + sendOffer(offers[i], peer.peerId, toPeer.socketContext, infoHash); } peerIndex++; @@ -331,7 +340,7 @@ export class FastTracker implements Tracker { } delete json.to_peer_id; - toPeer.sendMessage(json, toPeer as unknown as SocketContext); + toPeer.sendMessage(json, toPeer.socketContext); if (debugEnabled) { debug( @@ -343,42 +352,13 @@ export class FastTracker implements Tracker { } } - private processStop(json: UnknownObject, peerSocket: SocketContext): void { + private processStop(json: UnknownObject): void { const peerId = json.peer_id as string; - const infoHash = json.info_hash as string; const peer = this.#peersContext.get(peerId); if (peer === undefined) return; - const swarm = (peerSocket as unknown as UnknownObject)[infoHash]; - - if (!(swarm instanceof Swarm)) { - throw new TrackerError("stop: peer is not in the swarm"); - } - - swarm.removePeer(peer); - delete (peerSocket as unknown as UnknownObject)[infoHash]; - - if (debugEnabled) { - debug( - "stop: peer", - Buffer.from(peerId).toString("hex"), - "removed from swarm", - Buffer.from(infoHash).toString("hex"), - ); - } - - if (swarm.peers.length === 0) { - if (debugEnabled) { - debug( - "stop: swarm removed (empty)", - Buffer.from(swarm.infoHash).toString("hex"), - ); - } - this.#swarms.delete(swarm.infoHash); - } - - this.#peersContext.delete(peerId); + this.removePeer(peer); } private processScrape(json: UnknownObject, peer: SocketContext): void { @@ -455,17 +435,15 @@ class Swarm { if (this.completedPeers === undefined) { this.completedPeers = new Set(); } - this.completedPeers.add(peer.id); + this.completedPeers.add(peer.peerId); this.completedCount++; } } - public removePeer(peer: PeerContext): boolean { + public removePeer(peer: PeerContext) { const index = this.#peers.indexOf(peer); - if (index === -1) return false; - - if (this.completedPeers?.delete(peer.id) === true) { + if (this.completedPeers?.delete(peer.peerId) === true) { this.completedCount--; } @@ -474,8 +452,6 @@ class Swarm { if (index < this.#peers.length) { this.#peers[index] = last; } - - return true; } public setCompleted(peer: PeerContext): void { @@ -483,8 +459,8 @@ class Swarm { this.completedPeers = new Set(); } - if (!this.completedPeers.has(peer.id)) { - this.completedPeers.add(peer.id); + if (!this.completedPeers.has(peer.peerId)) { + this.completedPeers.add(peer.peerId); this.completedCount++; } } diff --git a/lib/tracker.ts b/lib/tracker.ts index 5bd2e9b..e9d25c6 100644 --- a/lib/tracker.ts +++ b/lib/tracker.ts @@ -14,25 +14,23 @@ * limitations under the License. */ -import { WebSocket } from "uWebSockets.js"; - export interface SocketContext { sendMessage: (json: object, peer: SocketContext) => void; - peerIdsOnSocket: Set; } export interface PeerContext { - id: string; + peerId: string; sendMessage: (json: object, peer: SocketContext) => void; - ws: WebSocket; + socketContext: SocketContext; lastAccessed: number; + swarmInfoHash: string; } export interface Tracker { readonly swarms: ReadonlyMap; readonly settings: object; processMessage: (json: object, peer: SocketContext) => void; - disconnectPeer: (peer: SocketContext) => void; + disconnectPeersFromSocket: (peer: SocketContext) => void; } export class TrackerError extends Error {} diff --git a/lib/uws-tracker.ts b/lib/uws-tracker.ts index 36cf054..81a0f88 100644 --- a/lib/uws-tracker.ts +++ b/lib/uws-tracker.ts @@ -276,7 +276,7 @@ export class UWebSocketsTracker { ); } - response.upgrade>( + response.upgrade>( { sendMessage, }, @@ -336,7 +336,7 @@ export class UWebSocketsTracker { this.webSocketsCount--; if (ws.getUserData().sendMessage !== undefined) { - this.tracker.disconnectPeer(ws as unknown as SocketContext); + this.tracker.disconnectPeersFromSocket(ws as unknown as SocketContext); } debugWebSockets("closed with code", code);