Skip to content

Commit

Permalink
Improve resume reliability of restored runs (#1458)
Browse files Browse the repository at this point in the history
* replay ready for resume

* clear replay state between attempts

* add changeset
  • Loading branch information
nicktrn authored Nov 5, 2024
1 parent adf5970 commit 5368bcf
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 41 deletions.
5 changes: 5 additions & 0 deletions .changeset/sixty-donuts-fail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"trigger.dev": patch
---

Improve resume reliability by replaying ready signal of restored workers
151 changes: 110 additions & 41 deletions packages/cli-v3/src/entryPoints/deploy-run-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ class ProdWorker {
idempotencyKey: string;
}
| undefined;
private readyForResumeReplay:
| {
idempotencyKey: string;
type: WaitReason;
}
| undefined;

#httpPort: number;
#httpServer: ReturnType<typeof createServer>;
Expand Down Expand Up @@ -365,10 +371,18 @@ class ProdWorker {
async #prepareForRetry() {
// Clear state for retrying
this.paused = false;
this.nextResumeAfter = undefined;
this.waitForPostStart = false;
this.executing = false;
this.attemptFriendlyId = undefined;
this.attemptNumber = undefined;

// Clear replay state
this.waitForTaskReplay = undefined;
this.waitForBatchReplay = undefined;
this.readyForLazyAttemptReplay = undefined;
this.durationResumeFallback = undefined;
this.readyForResumeReplay = undefined;
}

// MARK: CHECKPOINT PREP
Expand Down Expand Up @@ -405,13 +419,16 @@ class ProdWorker {
this.waitForPostStart = false;

this.durationResumeFallback = undefined;
this.readyForResumeReplay = undefined;

this._taskRunProcess?.waitCompletedNotification();
}

async #readyForLazyAttempt() {
const idempotencyKey = randomUUID();

logger.log("ready for lazy attempt", { idempotencyKey });

this.readyForLazyAttemptReplay = {
idempotencyKey,
};
Expand All @@ -420,7 +437,7 @@ class ProdWorker {
// ..but we also have to be fast to avoid failing the task due to missing heartbeat
for await (const { delay, retry } of defaultBackoff.min(10).maxRetries(7)) {
if (retry > 0) {
logger.log("retrying ready for lazy attempt", { retry });
logger.log("retrying ready for lazy attempt", { retry, idempotencyKey });
}

this.#coordinatorSocket.socket.emit("READY_FOR_LAZY_ATTEMPT", {
Expand Down Expand Up @@ -453,6 +470,93 @@ class ProdWorker {
this.#failRun(this.runId, "Failed to receive execute request in a reasonable time");
}

async #readyForResume() {
const idempotencyKey = randomUUID();

logger.log("readyForResume()", {
nextResumeAfter: this.nextResumeAfter,
attemptFriendlyId: this.attemptFriendlyId,
attemptNumber: this.attemptNumber,
idempotencyKey,
});

if (!this.nextResumeAfter) {
logger.error("Missing next resume reason", { status: this.#status });

this.#emitUnrecoverableError(
"NoNextResume",
"Next resume reason not set while resuming from paused state"
);

return;
}

if (!this.attemptFriendlyId) {
logger.error("Missing attempt friendly ID", { status: this.#status });

this.#emitUnrecoverableError(
"NoAttemptId",
"Attempt ID not set while resuming from paused state"
);

return;
}

if (!this.attemptNumber) {
logger.error("Missing attempt number", { status: this.#status });

this.#emitUnrecoverableError(
"NoAttemptNumber",
"Attempt number not set while resuming from paused state"
);

return;
}

this.readyForResumeReplay = {
idempotencyKey,
type: this.nextResumeAfter,
};

const lockedMetadata = {
attemptFriendlyId: this.attemptFriendlyId,
attemptNumber: this.attemptNumber,
type: this.nextResumeAfter,
};

// Retry if we don't receive RESUME_AFTER_DEPENDENCY or RESUME_AFTER_DURATION in a reasonable time
// ..but we also have to be fast to avoid failing the task due to missing heartbeat
for await (const { delay, retry } of defaultBackoff.min(10).maxRetries(7)) {
if (retry > 0) {
logger.log("retrying ready for resume", { retry, idempotencyKey });
}

this.#coordinatorSocket.socket.emit("READY_FOR_RESUME", {
version: "v2",
...lockedMetadata,
});

await timeout(delay.milliseconds);

if (!this.readyForResumeReplay) {
logger.log("replay ready for resume cancelled, discarding", {
idempotencyKey,
});

return;
}

if (idempotencyKey !== this.readyForResumeReplay.idempotencyKey) {
logger.log("replay ready for resume idempotency key mismatch, discarding", {
idempotencyKey,
newIdempotencyKey: this.readyForResumeReplay.idempotencyKey,
});

return;
}
}
}

#readyForCheckpoint() {
this.#coordinatorSocket.socket.emit("READY_FOR_CHECKPOINT", { version: "v1" });
}
Expand Down Expand Up @@ -630,6 +734,7 @@ class ProdWorker {
this.paused = false;
this.nextResumeAfter = undefined;
this.waitForPostStart = false;
this.readyForResumeReplay = undefined;

for (let i = 0; i < completions.length; i++) {
const completion = completions[i];
Expand Down Expand Up @@ -845,46 +950,7 @@ class ProdWorker {
}

if (this.paused) {
if (!this.nextResumeAfter) {
logger.error("Missing next resume reason", { status: this.#status });

this.#emitUnrecoverableError(
"NoNextResume",
"Next resume reason not set while resuming from paused state"
);

return;
}

if (!this.attemptFriendlyId) {
logger.error("Missing attempt friendly ID", { status: this.#status });

this.#emitUnrecoverableError(
"NoAttemptId",
"Attempt ID not set while resuming from paused state"
);

return;
}

if (!this.attemptNumber) {
logger.error("Missing attempt number", { status: this.#status });

this.#emitUnrecoverableError(
"NoAttemptNumber",
"Attempt number not set while resuming from paused state"
);

return;
}

socket.emit("READY_FOR_RESUME", {
version: "v2",
attemptFriendlyId: this.attemptFriendlyId,
attemptNumber: this.attemptNumber,
type: this.nextResumeAfter,
});

await this.#readyForResume();
return;
}

Expand Down Expand Up @@ -1293,6 +1359,9 @@ class ProdWorker {
attemptNumber: this.attemptNumber,
waitForTaskReplay: this.waitForTaskReplay,
waitForBatchReplay: this.waitForBatchReplay,
readyForLazyAttemptReplay: this.readyForLazyAttemptReplay,
durationResumeFallback: this.durationResumeFallback,
readyForResumeReplay: this.readyForResumeReplay,
};
}

Expand Down

0 comments on commit 5368bcf

Please sign in to comment.