Skip to content

Commit

Permalink
feat(shuttle): Parametrize hub connection timeouts for HubSubscriber …
Browse files Browse the repository at this point in the history
…& MessageReconciliation (#2367)

## Why is this change needed?

Running the latest version of Shuttle with its fixed 5000ms hub
connection timeout for reconciliation has resulted in a lot of timeout
failures even against a seemingly healthy hub. I've increased the
default connection timeout of MessageReconciliation from 5000ms to
30000ms to match HubSubscriber's and parameterized both.

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [ ] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [x] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
This PR introduces parameterized connection timeouts for the
`HubSubscriber` and `MessageReconciliation` classes, allowing for more
flexible timeout settings in handling unresponsive connections.

### Detailed summary
- Added `connectionTimeout` parameter to the constructors of
`MessageReconciliation` and `BaseHubSubscriber`, defaulting to 30000
milliseconds.
- Updated timeout handling in `MessageReconciliation` to use
`connectionTimeout`.
- Adjusted timeout in tests to account for new configurable timeout
settings.

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
tybook authored Oct 16, 2024
1 parent 91b7720 commit 727041c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 10 deletions.
5 changes: 5 additions & 0 deletions .changeset/tricky-badgers-tan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/shuttle": patch
---

feat(shuttle): Parametrize hub connection timeouts for HubSubscriber and MessageReconciliation
12 changes: 6 additions & 6 deletions packages/shuttle/src/shuttle.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,8 @@ describe("shuttle", () => {
);
},
getAllCastMessagesByFid: async (_request: FidRequest, _metadata: Metadata, _options: Partial<CallOptions>) => {
// force wait for 2 seconds to trigger failure
await new Promise((resolve) => setTimeout(resolve, 5000));
// force wait longer than MessageReconciliation's configured timeout to trigger failure
await new Promise((resolve) => setTimeout(resolve, 550));
return ok(
MessagesResponse.create({
messages: [
Expand Down Expand Up @@ -743,8 +743,8 @@ describe("shuttle", () => {
_metadata: Metadata,
_options: Partial<CallOptions>,
) => {
// force wait for 2 seconds to trigger failure
await new Promise((resolve) => setTimeout(resolve, 5000));
// force wait longer than MessageReconciliation's configured timeout to trigger failure
await new Promise((resolve) => setTimeout(resolve, 550));
return ok(
MessagesResponse.create({
messages: [],
Expand All @@ -767,7 +767,7 @@ describe("shuttle", () => {
};

// Only include 2 of the 3 messages in the time window
const reconciler = new MessageReconciliation(mockRPCClient as unknown as HubRpcClient, db, log);
const reconciler = new MessageReconciliation(mockRPCClient as unknown as HubRpcClient, db, log, 500);
const messagesOnHub: Message[] = [];
const messagesInDb: {
hash: Uint8Array;
Expand All @@ -791,7 +791,7 @@ describe("shuttle", () => {
startTimestamp,
),
).rejects.toThrow();
}, 15000); // Need to make sure this is long enough to handle the timeout termination
}, 5000); // Need to make sure this is long enough to handle the timeout termination

test("marks messages as pruned", async () => {
const addMessage = await Factories.ReactionAddMessage.create({}, { transient: { signer } });
Expand Down
7 changes: 5 additions & 2 deletions packages/shuttle/src/shuttle/hubSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class BaseHubSubscriber extends HubSubscriber {
private stream: ClientReadableStream<HubEvent> | null = null;
private totalShards: number | undefined;
private shardIndex: number | undefined;
private connectionTimeout: number; // milliseconds

constructor(
label: string,
Expand All @@ -63,6 +64,7 @@ export class BaseHubSubscriber extends HubSubscriber {
eventTypes?: HubEventType[],
totalShards?: number,
shardIndex?: number,
connectionTimeout = 30000,
) {
super();
this.label = label;
Expand All @@ -71,6 +73,7 @@ export class BaseHubSubscriber extends HubSubscriber {
this.totalShards = totalShards;
this.shardIndex = shardIndex;
this.eventTypes = eventTypes || DEFAULT_EVENT_TYPES;
this.connectionTimeout = connectionTimeout;
}

public override stop() {
Expand Down Expand Up @@ -156,13 +159,13 @@ export class BaseHubSubscriber extends HubSubscriber {
// Do not allow hanging unresponsive connections to linger:
let cancel = setTimeout(() => {
this.destroy();
}, 30000);
}, this.connectionTimeout);
for await (const event of stream) {
await this.processHubEvent(event);
clearTimeout(cancel);
cancel = setTimeout(() => {
this.destroy();
}, 30000);
}, this.connectionTimeout);
}
clearTimeout(cancel);
// biome-ignore lint/suspicious/noExplicitAny: error catching
Expand Down
9 changes: 7 additions & 2 deletions packages/shuttle/src/shuttle/messageReconciliation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ export class MessageReconciliation {
private stream: ClientDuplexStream<StreamFetchRequest, StreamFetchResponse> | undefined;
private db: DB;
private log: pino.Logger;
private connectionTimeout: number; // milliseconds

constructor(client: HubRpcClient, db: DB, log: pino.Logger) {
constructor(client: HubRpcClient, db: DB, log: pino.Logger, connectionTimeout = 30000) {
this.client = client;
this.db = db;
this.log = log;
this.connectionTimeout = connectionTimeout;
this.establishStream();
}

Expand Down Expand Up @@ -182,7 +184,10 @@ export class MessageReconciliation {
const id = randomUUID();
const result = new Promise<HubResult<MessagesResponse>>((resolve) => {
// Do not allow hanging unresponsive connections to linger:
const cancel = setTimeout(() => resolve(err(new HubError("unavailable", "server timeout"))), 5000);
const cancel = setTimeout(
() => resolve(err(new HubError("unavailable", "server timeout"))),
this.connectionTimeout,
);

if (!this.stream) {
fallback()
Expand Down

0 comments on commit 727041c

Please sign in to comment.