Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix jobs checking #30

Merged
merged 6 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ function formatDuration(durationInMs: number): string {
return `${inSeconds.toFixed(1)}s`;
}

export class JobsObserver {
export class JobsChecker {
private readonly noisClient: CosmWasmClient;
private readonly gateway: string;

Expand All @@ -42,7 +42,11 @@ export class JobsObserver {
* Checks gateway for pending jobs and returns the rounds of those jobs as a list
*/
public async check(): Promise<number[]> {
const query = { jobs_desc: { offset: null, limit: 50 } };
const queryLimit = 4;

// Use jobs_asc because with jobs_desc all entries in the result might be in the (far) future,
// leading to cases where the unprocesses jobs in the past are not processed anymore.
const query = { jobs_asc: { offset: null, limit: queryLimit } };
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This jobs_desc -> jobs_asc is the main change here

const { jobs }: JobsResponse = await this.noisClient.queryContractSmart(this.gateway, query);
if (jobs.length === 0) return []; // Nothing to do for us

Expand All @@ -51,7 +55,7 @@ export class JobsObserver {
const due = timeOfRound(round) - Date.now();
return `#${round} (due ${formatDuration(due)})`;
});
console.log(`Jobs pending for rounds: %c${roundInfos.join(", ")}`, "color: orange");
console.log(`Top ${queryLimit} pending jobs: %c${roundInfos.join(", ")}`, "color: orange");
return rounds;
}
}
54 changes: 37 additions & 17 deletions main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
sleep,
watch,
} from "./deps.ts";
import { JobsObserver } from "./jobs.ts";
import { JobsChecker } from "./jobs.ts";
import { Submitter } from "./submitter.ts";
import { queryIsAllowlisted, queryIsIncentivized } from "./drand_contract.ts";
import { Config } from "./config.ts";
Expand Down Expand Up @@ -59,6 +59,15 @@ function getNextSignData(): SignerData {
return out;
}

/**
* If this is set to false, the bot will only watch the drand chain and check for
* each round **once** if it is incentivised. Everything that is marked as incentivised
* too late or is still unprocessed for whatever reason will not be submitted.
*
* It's not recommended to change this value for anything else than development.
*/
const cleanupOldJobs = true;

if (import.meta.main) {
const { default: config }: { default: Config } = await import("./config.json", {
assert: { type: "json" },
Expand Down Expand Up @@ -89,6 +98,7 @@ if (import.meta.main) {

const wallet = await DirectSecp256k1HdWallet.fromMnemonic(mnemonic, { prefix: config.prefix });
const [firstAccount] = await wallet.getAccounts();
console.log(`Connecting to ${rpcEndpoint} ...`);
const cometClient = await connectComet(rpcEndpoint);
const client = await SigningCosmWasmClient.createWithSigner(cometClient, wallet, {
gasPrice: GasPrice.fromString(config.gasPrice),
Expand Down Expand Up @@ -148,7 +158,10 @@ if (import.meta.main) {
})(),
]);

const jobs = new JobsObserver(client, gatewayAddress);
let jobsChecker: JobsChecker | undefined;
if (cleanupOldJobs) {
jobsChecker = new JobsChecker(client, gatewayAddress);
}

// Initialize local sign data
await resetSignData();
Expand Down Expand Up @@ -208,22 +221,29 @@ if (import.meta.main) {

const didSubmit = await submitter.handlePublishedBeacon(beacon);

const processJobs = (rounds: number[]): void => {
if (!rounds.length) return;
const past = rounds.filter((r) => r <= n);
const future = rounds.filter((r) => r > n);
console.log(
`Past: %o, Future: %o`,
past,
future,
);
submitter.handlePastRoundsWithJobs(past);
};
if (cleanupOldJobs) {
const processJobs = (rounds: number[]): void => {
if (!rounds.length) return;
const past = rounds.filter((r) => r <= n);
const future = rounds.filter((r) => r > n);
console.log(
`Past (${past.length}): %o, Future (${future.length}): %o`,
past,
future,
);
submitter.handlePastRoundsWithJobs(past);
};

const check = () => {
assert(jobsChecker);
jobsChecker.check().then(processJobs, (err) => console.error(err));
};

// Check jobs every 1.5s, shifted 1200ms from the drand receiving
const shift = 1200;
setTimeout(() => jobs.check().then(processJobs, (err) => console.error(err)), shift);
setTimeout(() => jobs.check().then(processJobs, (err) => console.error(err)), shift + 1500);
// Check jobs every 1.5s, shifted 1200ms from the drand receiving
const shift = 1200;
setTimeout(check, shift);
setTimeout(check, shift + 1500);
}

if (didSubmit) {
// Some seconds after the submission when things are idle, check and log
Expand Down
14 changes: 13 additions & 1 deletion submitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ export class Submitter {

/** Handle jobs for which the round should be public */
public async handlePastRoundsWithJobs(rounds: number[]): Promise<void> {
await Promise.all(rounds.map((round) => this.handlePastRoundWithJobs(round)));
// Do not process those jobs in parallel in order to avoid sequence mismatches.

// We process from new to old for no good reason
const sorted = rounds.sort().reverse();
for (const round of sorted) {
await this.handlePastRoundWithJobs(round);
}
}

private async handlePastRoundWithJobs(round: number): Promise<void> {
Expand Down Expand Up @@ -106,6 +112,12 @@ export class Submitter {
}
}

/**
* Takes a beacon, submits it through a transaction and wait for inclusion in a block.
*
* Do not call this multiple times in parallel in order to avoid race conditions and
* account sequence mismatches.
*/
private async submit(beacon: Pick<RandomnessBeacon, "round" | "signature">) {
if (this.submitted.has(beacon.round)) return;
this.submitted.add(beacon.round);
Expand Down
Loading