Skip to content

Commit

Permalink
feat(queue-worker/crawl): solidify redirect behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
mogery committed Dec 10, 2024
1 parent ce460a3 commit d9e017e
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions apps/api/src/services/queue-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ import { getRateLimiterPoints } from "./rate-limiter";
import { cleanOldConcurrencyLimitEntries, pushConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob, takeConcurrencyLimitedJob } from "../lib/concurrency-limit";
configDotenv();

class RacedRedirectError extends Error {
constructor() {
super("Raced redirect error")
}
}

const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000;
Expand Down Expand Up @@ -434,8 +440,17 @@ async function processJob(job: Job & { id: string }, token: string) {
// Remove the old URL from visited unique due to checking for limit
// Do not remove from :visited otherwise it will keep crawling the original URL (sourceURL)
await redisConnection.srem("crawl:" + job.data.crawl_id + ":visited_unique", normalizeURL(doc.metadata.sourceURL, sc));

const p1 = generateURLPermutations(normalizeURL(doc.metadata.url, sc));
const p2 = generateURLPermutations(normalizeURL(doc.metadata.sourceURL, sc));

// In crawls, we should only crawl a redirected page once, no matter how many; times it is redirected to, or if it's been discovered by the crawler before.
// This can prevent flakiness with race conditions.
// Lock the new URL
await lockURL(job.data.crawl_id, sc, doc.metadata.url);
const lockRes = await lockURL(job.data.crawl_id, sc, doc.metadata.url);
if (job.data.crawlerOptions !== null && !lockRes && JSON.stringify(p1) !== JSON.stringify(p2)) {
throw new RacedRedirectError();
}
}

logger.debug("Logging job to DB...");
Expand All @@ -455,7 +470,7 @@ async function processJob(job: Job & { id: string }, token: string) {
}, true);

logger.debug("Declaring job as done...");
await addCrawlJobDone(job.data.crawl_id, job.id);
await addCrawlJobDone(job.data.crawl_id, job.id, true);

if (job.data.crawlerOptions !== null) {
if (!sc.cancelled) {
Expand Down Expand Up @@ -520,7 +535,11 @@ async function processJob(job: Job & { id: string }, token: string) {
} catch (error) {
const isEarlyTimeout = error instanceof Error && error.message === "timeout";

if (!isEarlyTimeout) {
if (isEarlyTimeout) {
logger.error(`🐂 Job timed out ${job.id}`);
} else if (error instanceof RacedRedirectError) {
logger.warn(`🐂 Job got redirect raced ${job.id}, silently failing`);
} else {
logger.error(`🐂 Job errored ${job.id} - ${error}`, { error });

Sentry.captureException(error, {
Expand All @@ -537,8 +556,6 @@ async function processJob(job: Job & { id: string }, token: string) {
if (error.stack) {
logger.error(error.stack);
}
} else {
logger.error(`🐂 Job timed out ${job.id}`);
}

const data = {
Expand Down Expand Up @@ -573,7 +590,7 @@ async function processJob(job: Job & { id: string }, token: string) {
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;

logger.debug("Declaring job as done...");
await addCrawlJobDone(job.data.crawl_id, job.id);
await addCrawlJobDone(job.data.crawl_id, job.id, false);

logger.debug("Logging job to DB...");
await logJob({
Expand Down

0 comments on commit d9e017e

Please sign in to comment.