Skip to content

Commit

Permalink
remove timeout and use only open peers
Browse files Browse the repository at this point in the history
  • Loading branch information
weboko committed Sep 30, 2024
1 parent 116fd9f commit c2e277c
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions packages/core/src/lib/stream_manager/stream_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import { Logger } from "@waku/utils";

import { selectConnection } from "./utils.js";

const CONNECTION_TIMEOUT = 5_000;

export class StreamManager {
private readonly log: Logger;

Expand All @@ -34,7 +32,7 @@ export class StreamManager {
this.streamPool.delete(peerId);
await scheduledStream;

const stream = this.getStreamForCodec(peer.id);
const stream = this.getOpenStreamForCodec(peer.id);

if (stream) {
this.log.info(
Expand Down Expand Up @@ -109,7 +107,7 @@ export class StreamManager {
return;
}

const stream = this.getStreamForCodec(peer.id);
const stream = this.getOpenStreamForCodec(peer.id);

if (stream) {
return;
Expand All @@ -123,19 +121,15 @@ export class StreamManager {
`Scheduling creation of a stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);

const timeoutPromise = new Promise<undefined>((resolve) =>
setTimeout(() => resolve(undefined), CONNECTION_TIMEOUT)
);

const streamPromise = Promise.race([
this.createStreamWithLock(peer),
timeoutPromise
]);
// abandon previous attempt
if (this.streamPool.has(peer.id.toString())) {
this.streamPool.delete(peer.id.toString());
}

this.streamPool.set(peer.id.toString(), streamPromise);
this.streamPool.set(peer.id.toString(), this.createStreamWithLock(peer));
}

private getStreamForCodec(peerId: PeerId): Stream | undefined {
private getOpenStreamForCodec(peerId: PeerId): Stream | undefined {
const connection: Connection | undefined = this.getConnections(peerId).find(
(c) => c.status === "open"
);
Expand All @@ -148,6 +142,13 @@ export class StreamManager {
(s) => s.protocol === this.multicodec
);

const isStreamUnusable = ["done", "closed", "closing"].includes(
stream?.writeStatus || ""
);
if (isStreamUnusable) {
return;
}

return stream;
}
}

0 comments on commit c2e277c

Please sign in to comment.