Skip to content

Commit

Permalink
[ch] backfill queries (#5624)
Browse files Browse the repository at this point in the history
convert queries to CH, query both rockset and clickhouse in all queries
  • Loading branch information
clee2000 authored Sep 10, 2024
1 parent 4e2a8e1 commit 332b5a9
Showing 1 changed file with 212 additions and 34 deletions.
246 changes: 212 additions & 34 deletions torchci/scripts/backfillJobs.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import { DynamoDB } from "@aws-sdk/client-dynamodb";
import { DynamoDBDocument } from "@aws-sdk/lib-dynamodb";
import { createClient } from "@clickhouse/client";
import { createAppAuth } from "@octokit/auth-app";
import rockset from "@rockset/client";
import { App, Octokit } from "octokit";
Expand All @@ -17,6 +18,26 @@ function getDynamoClient() {
);
}

function getClickhouseClient() {
return createClient({
host: process.env.CLICKHOUSE_HUD_USER_URL,
username: process.env.CLICKHOUSE_HUD_USER_USERNAME,
password: process.env.CLICKHOUSE_HUD_USER_PASSWORD,
});
}

export async function queryClickhouse(query, params) {
const clickhouseClient = getClickhouseClient();
const res = await clickhouseClient.query({
query,
format: "JSONEachRow",
query_params: params,
clickhouse_settings: { output_format_json_quote_64bit_integers: 0 },
});

return await res.json();
}

async function getOctokit(owner, repo) {
let privateKey = process.env.PRIVATE_KEY;
privateKey = Buffer.from(privateKey, "base64").toString();
Expand Down Expand Up @@ -83,30 +104,44 @@ async function backfillWorkflowJob(
console.log(`Failed to find job id ${id}: ${error}`);
console.log(`Marking job id ${id} as incomplete`);
console.log(`Querying dynamo entry for job id ${id}`);
const dynamoEntry = await client.queries.query({
sql: {
query: `

let rows = await queryClickhouse(
`SELECT * FROM workflow_job j final WHERE j.dynamoKey = '${dynamo_key}'`,
{}
);

if (rows.length === 0) {
console.log(`No entry found in CH for job id ${id}`);
rows = (
await client.queries.query({
sql: {
query: `
SELECT
*
FROM
workflow_job j
WHERE
j.dynamoKey = '${dynamo_key}'
`,
},
});
if (dynamoEntry.results.length === 0) {
console.log(`No dynamo entry found for job id ${id}`);
},
})
).results;
}

if (rows.length === 0) {
console.log(`No entry found in Rockset for job id ${id}`);
return;
}
const result = dynamoEntry.results[0];

const result = rows[0];

console.log(`Writing job ${id} to DynamoDB:`);
const thing = {
TableName: table,
Item: {
...result,
data_quality: "incomplete",
backfill: "not-found",
backfill: false,
},
};
console.log(thing);
Expand All @@ -116,9 +151,10 @@ WHERE
}

console.log("::group::Backfilling jobs without a conclusion...");
const jobsWithNoConclusion = await client.queries.query({
sql: {
query: `
const jobsWithNoConclusion = (
await client.queries.query({
sql: {
query: `
SELECT
j.id,
w.repository.name as repo_name,
Expand All @@ -137,20 +173,70 @@ ORDER BY
j._event_time ASC
LIMIT 200
`,
},
});
},
})
).results;

const chJobsWithNoConclusion = await queryClickhouse(
`with pending_jobs as (
SELECT
j.id as id,
j.run_id as run_id,
j.dynamoKey as dynamoKey
FROM
workflow_job j final
WHERE
j.conclusion = ''
and j.backfill
and j.id in (
select
id
from
materialized_views.workflow_job_by_started_at
where
started_at < CURRENT_TIMESTAMP() - INTERVAL 3 HOUR
and started_at > CURRENT_TIMESTAMP() - INTERVAL 1 DAY
)
ORDER BY
j.started_at ASC
LIMIT
200
)
SELECT
j.id as id,
w. repository. 'name' as repo_name,
w. repository. 'owner'.'login' as owner,
j.dynamoKey as dynamo_key
FROM
workflow_run w final
INNER JOIN pending_jobs j on j.run_id = w.id
WHERE
w.id in (
select
run_id
from
pending_jobs
)
and w.repository. 'name' = 'pytorch'
`,
{}
);
// Add jobs that CH found but Rockset didn't
for (const job of chJobsWithNoConclusion) {
const { dynamo_key } = job;
if (jobsWithNoConclusion.find((job) => job.dynamo_key === dynamo_key)) {
continue;
} else {
jobsWithNoConclusion.push(job);
}
}

// Await in a loop???
// Yes: when GitHub has outages and fails to deliver webhooks en masse, we can
// get rate limited while trying to backfill. Since backfilling is not
// latency-sensitive, it's fine to just processed them serially to ensure we
// make forward progress.
for (const {
id,
repo_name,
owner,
dynamo_key,
} of jobsWithNoConclusion.results) {
for (const { id, repo_name, owner, dynamo_key } of jobsWithNoConclusion) {
// Some jobs just never get marked completed due to bugs in the GHA backend.
// Just skip them.
await backfillWorkflowJob(
Expand All @@ -166,9 +252,10 @@ console.log("::endgroup::");
console.log("::group::Backfilling queued jobs...");
// Also try to backfill queued jobs specifically, with a tighter time bound.
// This is so our queue time stats are as accurate as possible.
const queuedJobs = await client.queries.query({
sql: {
query: `
const queuedJobs = (
await client.queries.query({
sql: {
query: `
SELECT
j.id,
w.repository.name as repo_name,
Expand All @@ -188,11 +275,58 @@ ORDER BY
j._event_time ASC
LIMIT 200
`,
},
});
},
})
).results;
const chQueuedJobs = await queryClickhouse(
`with pending_jobs as (
SELECT
j.id as id,
j.run_id as run_id,
j.dynamoKey as dynamoKey
FROM
workflow_job j final
WHERE
j.status = 'queued'
and j.backfill
and j.id in (
select
id
from
materialized_views.workflow_job_by_started_at
where
started_at < CURRENT_TIMESTAMP() - INTERVAL 5 MINUTE
and started_at > CURRENT_TIMESTAMP() - INTERVAL 7 DAY
)
)
SELECT
j.id as id,
w.repository. 'name' as repo_name,
w.repository. 'owner'.'login' as owner,
j.dynamoKey as dynamo_key
FROM
workflow_run w final
INNER JOIN pending_jobs j on j.run_id = w.id
WHERE
w.status != 'completed'
AND w.repository. 'name' = 'pytorch'
AND w.id in (select run_id from pending_jobs)
LIMIT
200`,
{}
);
// Add jobs that CH found but Rockset didn't
for (const job of chQueuedJobs) {
const { dynamo_key } = job;
if (queuedJobs.find((job) => job.dynamo_key === dynamo_key)) {
continue;
} else {
queuedJobs.push(job);
}
}

// See above for why we're awaiting in a loop.
for (const { id, repo_name, owner, dynamo_key } of queuedJobs.results) {
for (const { id, repo_name, owner, dynamo_key } of queuedJobs) {
await backfillWorkflowJob(
id,
repo_name,
Expand All @@ -204,9 +338,10 @@ for (const { id, repo_name, owner, dynamo_key } of queuedJobs.results) {
console.log("::endgroup::");

console.log("::group::Backfill unclassified logs...");
const unclassifiedJobs = await client.queries.query({
sql: {
query: `
const unclassifiedJobs = (
await client.queries.query({
sql: {
query: `
select
j.id,
from
Expand All @@ -223,12 +358,55 @@ where
and w.head_repository.full_name = 'pytorch/pytorch'
AND j.backfill IS NULL
`,
},
});
console.log(
`There are ${unclassifiedJobs.results.length} jobs with unclassified logs`
},
})
).results;
const chUnclassifiedJobs = await queryClickhouse(
`with jobs as (
select
j.id as id,
j.run_id as run_id
from
default .workflow_job j final
where
j.torchci_classification.line = ''
and j.backfill
and j.conclusion in [ 'failure',
'cancelled' ]
and j.name != 'ciflow_should_run'
and j.name != 'generate-test-matrix'
and j.completed_at > now() - Interval 30 MINUTE
and j.completed_at < now() - Interval 5 MINUTE
)
select
j.id as id
from
default .workflow_run w final
join jobs j on w.id = j.run_id
where
w.event != 'workflow_run'
and w.event != 'repository_dispatch'
and w.head_repository. 'full_name' = 'pytorch/pytorch'
and w.id in (
select
run_id
from
jobs
)`,
{}
);
for (const job of unclassifiedJobs.results) {
// Add jobs that CH found but Rockset didn't
for (const job of chUnclassifiedJobs) {
const { id } = job;
if (unclassifiedJobs.find((job) => job.id === id)) {
continue;
} else {
unclassifiedJobs.push(job);
}
}

console.log(`There are ${unclassifiedJobs.length} jobs with unclassified logs`);
for (const job of unclassifiedJobs) {
console.log(`Attempting to backfill log of ${job.id}`);
try {
const a = await request(
Expand Down

0 comments on commit 332b5a9

Please sign in to comment.