diff --git a/jobs.ts b/jobs.ts index b86cd95..2b4388a 100644 --- a/jobs.ts +++ b/jobs.ts @@ -26,7 +26,7 @@ function formatDuration(durationInMs: number): string { return `${inSeconds.toFixed(1)}s`; } -export class JobsObserver { +export class JobsChecker { private readonly noisClient: CosmWasmClient; private readonly gateway: string; @@ -42,7 +42,11 @@ export class JobsObserver { * Checks gateway for pending jobs and returns the rounds of those jobs as a list */ public async check(): Promise { - const query = { jobs_desc: { offset: null, limit: 50 } }; + const queryLimit = 4; + + // Use jobs_asc because with jobs_desc all entries in the result might be in the (far) future, + // leading to cases where the unprocesses jobs in the past are not processed anymore. + const query = { jobs_asc: { offset: null, limit: queryLimit } }; const { jobs }: JobsResponse = await this.noisClient.queryContractSmart(this.gateway, query); if (jobs.length === 0) return []; // Nothing to do for us @@ -51,7 +55,7 @@ export class JobsObserver { const due = timeOfRound(round) - Date.now(); return `#${round} (due ${formatDuration(due)})`; }); - console.log(`Jobs pending for rounds: %c${roundInfos.join(", ")}`, "color: orange"); + console.log(`Top ${queryLimit} pending jobs: %c${roundInfos.join(", ")}`, "color: orange"); return rounds; } } diff --git a/main.ts b/main.ts index 4d2a355..21db27d 100644 --- a/main.ts +++ b/main.ts @@ -24,7 +24,7 @@ import { sleep, watch, } from "./deps.ts"; -import { JobsObserver } from "./jobs.ts"; +import { JobsChecker } from "./jobs.ts"; import { Submitter } from "./submitter.ts"; import { queryIsAllowlisted, queryIsIncentivized } from "./drand_contract.ts"; import { Config } from "./config.ts"; @@ -59,6 +59,15 @@ function getNextSignData(): SignerData { return out; } +/** + * If this is set to false, the bot will only watch the drand chain and check for + * each round **once** if it is incentivised. Everything that is marked as incentivised + * too late or is still unprocessed for whatever reason will not be submitted. + * + * It's not recommended to change this value for anything else than development. + */ +const cleanupOldJobs = true; + if (import.meta.main) { const { default: config }: { default: Config } = await import("./config.json", { assert: { type: "json" }, @@ -89,6 +98,7 @@ if (import.meta.main) { const wallet = await DirectSecp256k1HdWallet.fromMnemonic(mnemonic, { prefix: config.prefix }); const [firstAccount] = await wallet.getAccounts(); + console.log(`Connecting to ${rpcEndpoint} ...`); const cometClient = await connectComet(rpcEndpoint); const client = await SigningCosmWasmClient.createWithSigner(cometClient, wallet, { gasPrice: GasPrice.fromString(config.gasPrice), @@ -148,7 +158,10 @@ if (import.meta.main) { })(), ]); - const jobs = new JobsObserver(client, gatewayAddress); + let jobsChecker: JobsChecker | undefined; + if (cleanupOldJobs) { + jobsChecker = new JobsChecker(client, gatewayAddress); + } // Initialize local sign data await resetSignData(); @@ -208,22 +221,29 @@ if (import.meta.main) { const didSubmit = await submitter.handlePublishedBeacon(beacon); - const processJobs = (rounds: number[]): void => { - if (!rounds.length) return; - const past = rounds.filter((r) => r <= n); - const future = rounds.filter((r) => r > n); - console.log( - `Past: %o, Future: %o`, - past, - future, - ); - submitter.handlePastRoundsWithJobs(past); - }; + if (cleanupOldJobs) { + const processJobs = (rounds: number[]): void => { + if (!rounds.length) return; + const past = rounds.filter((r) => r <= n); + const future = rounds.filter((r) => r > n); + console.log( + `Past (${past.length}): %o, Future (${future.length}): %o`, + past, + future, + ); + submitter.handlePastRoundsWithJobs(past); + }; + + const check = () => { + assert(jobsChecker); + jobsChecker.check().then(processJobs, (err) => console.error(err)); + }; - // Check jobs every 1.5s, shifted 1200ms from the drand receiving - const shift = 1200; - setTimeout(() => jobs.check().then(processJobs, (err) => console.error(err)), shift); - setTimeout(() => jobs.check().then(processJobs, (err) => console.error(err)), shift + 1500); + // Check jobs every 1.5s, shifted 1200ms from the drand receiving + const shift = 1200; + setTimeout(check, shift); + setTimeout(check, shift + 1500); + } if (didSubmit) { // Some seconds after the submission when things are idle, check and log diff --git a/submitter.ts b/submitter.ts index 46e8c4b..6a97537 100644 --- a/submitter.ts +++ b/submitter.ts @@ -64,7 +64,13 @@ export class Submitter { /** Handle jobs for which the round should be public */ public async handlePastRoundsWithJobs(rounds: number[]): Promise { - await Promise.all(rounds.map((round) => this.handlePastRoundWithJobs(round))); + // Do not process those jobs in parallel in order to avoid sequence mismatches. + + // We process from new to old for no good reason + const sorted = rounds.sort().reverse(); + for (const round of sorted) { + await this.handlePastRoundWithJobs(round); + } } private async handlePastRoundWithJobs(round: number): Promise { @@ -106,6 +112,12 @@ export class Submitter { } } + /** + * Takes a beacon, submits it through a transaction and wait for inclusion in a block. + * + * Do not call this multiple times in parallel in order to avoid race conditions and + * account sequence mismatches. + */ private async submit(beacon: Pick) { if (this.submitted.has(beacon.round)) return; this.submitted.add(beacon.round);