Skip to content

Commit

Permalink
Merge pull request #30 from noislabs/job-checking
Browse files Browse the repository at this point in the history
Fix jobs checking
  • Loading branch information
webmaster128 committed Apr 15, 2024
2 parents 8aeb822 + 4586dc9 commit 41113c3
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 21 deletions.
10 changes: 7 additions & 3 deletions jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ function formatDuration(durationInMs: number): string {
return `${inSeconds.toFixed(1)}s`;
}

export class JobsObserver {
export class JobsChecker {
private readonly noisClient: CosmWasmClient;
private readonly gateway: string;

Expand All @@ -42,7 +42,11 @@ export class JobsObserver {
* Checks gateway for pending jobs and returns the rounds of those jobs as a list
*/
public async check(): Promise<number[]> {
const query = { jobs_desc: { offset: null, limit: 50 } };
const queryLimit = 4;

// Use jobs_asc because with jobs_desc all entries in the result might be in the (far) future,
// leading to cases where the unprocesses jobs in the past are not processed anymore.
const query = { jobs_asc: { offset: null, limit: queryLimit } };
const { jobs }: JobsResponse = await this.noisClient.queryContractSmart(this.gateway, query);
if (jobs.length === 0) return []; // Nothing to do for us

Expand All @@ -51,7 +55,7 @@ export class JobsObserver {
const due = timeOfRound(round) - Date.now();
return `#${round} (due ${formatDuration(due)})`;
});
console.log(`Jobs pending for rounds: %c${roundInfos.join(", ")}`, "color: orange");
console.log(`Top ${queryLimit} pending jobs: %c${roundInfos.join(", ")}`, "color: orange");
return rounds;
}
}
54 changes: 37 additions & 17 deletions main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
sleep,
watch,
} from "./deps.ts";
import { JobsObserver } from "./jobs.ts";
import { JobsChecker } from "./jobs.ts";
import { Submitter } from "./submitter.ts";
import { queryIsAllowlisted, queryIsIncentivized } from "./drand_contract.ts";
import { Config } from "./config.ts";
Expand Down Expand Up @@ -59,6 +59,15 @@ function getNextSignData(): SignerData {
return out;
}

/**
* If this is set to false, the bot will only watch the drand chain and check for
* each round **once** if it is incentivised. Everything that is marked as incentivised
* too late or is still unprocessed for whatever reason will not be submitted.
*
* It's not recommended to change this value for anything else than development.
*/
const cleanupOldJobs = true;

if (import.meta.main) {
const { default: config }: { default: Config } = await import("./config.json", {
assert: { type: "json" },
Expand Down Expand Up @@ -89,6 +98,7 @@ if (import.meta.main) {

const wallet = await DirectSecp256k1HdWallet.fromMnemonic(mnemonic, { prefix: config.prefix });
const [firstAccount] = await wallet.getAccounts();
console.log(`Connecting to ${rpcEndpoint} ...`);
const cometClient = await connectComet(rpcEndpoint);
const client = await SigningCosmWasmClient.createWithSigner(cometClient, wallet, {
gasPrice: GasPrice.fromString(config.gasPrice),
Expand Down Expand Up @@ -148,7 +158,10 @@ if (import.meta.main) {
})(),
]);

const jobs = new JobsObserver(client, gatewayAddress);
let jobsChecker: JobsChecker | undefined;
if (cleanupOldJobs) {
jobsChecker = new JobsChecker(client, gatewayAddress);
}

// Initialize local sign data
await resetSignData();
Expand Down Expand Up @@ -208,22 +221,29 @@ if (import.meta.main) {

const didSubmit = await submitter.handlePublishedBeacon(beacon);

const processJobs = (rounds: number[]): void => {
if (!rounds.length) return;
const past = rounds.filter((r) => r <= n);
const future = rounds.filter((r) => r > n);
console.log(
`Past: %o, Future: %o`,
past,
future,
);
submitter.handlePastRoundsWithJobs(past);
};
if (cleanupOldJobs) {
const processJobs = (rounds: number[]): void => {
if (!rounds.length) return;
const past = rounds.filter((r) => r <= n);
const future = rounds.filter((r) => r > n);
console.log(
`Past (${past.length}): %o, Future (${future.length}): %o`,
past,
future,
);
submitter.handlePastRoundsWithJobs(past);
};

const check = () => {
assert(jobsChecker);
jobsChecker.check().then(processJobs, (err) => console.error(err));
};

// Check jobs every 1.5s, shifted 1200ms from the drand receiving
const shift = 1200;
setTimeout(() => jobs.check().then(processJobs, (err) => console.error(err)), shift);
setTimeout(() => jobs.check().then(processJobs, (err) => console.error(err)), shift + 1500);
// Check jobs every 1.5s, shifted 1200ms from the drand receiving
const shift = 1200;
setTimeout(check, shift);
setTimeout(check, shift + 1500);
}

if (didSubmit) {
// Some seconds after the submission when things are idle, check and log
Expand Down
14 changes: 13 additions & 1 deletion submitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ export class Submitter {

/** Handle jobs for which the round should be public */
public async handlePastRoundsWithJobs(rounds: number[]): Promise<void> {
await Promise.all(rounds.map((round) => this.handlePastRoundWithJobs(round)));
// Do not process those jobs in parallel in order to avoid sequence mismatches.

// We process from new to old for no good reason
const sorted = rounds.sort().reverse();
for (const round of sorted) {
await this.handlePastRoundWithJobs(round);
}
}

private async handlePastRoundWithJobs(round: number): Promise<void> {
Expand Down Expand Up @@ -106,6 +112,12 @@ export class Submitter {
}
}

/**
* Takes a beacon, submits it through a transaction and wait for inclusion in a block.
*
* Do not call this multiple times in parallel in order to avoid race conditions and
* account sequence mismatches.
*/
private async submit(beacon: Pick<RandomnessBeacon, "round" | "signature">) {
if (this.submitted.has(beacon.round)) return;
this.submitted.add(beacon.round);
Expand Down

0 comments on commit 41113c3

Please sign in to comment.