Skip to content

Commit

Permalink
wip: split out raw_code and raw_flow from queue
Browse files Browse the repository at this point in the history
  • Loading branch information
uael committed Nov 5, 2024
1 parent 77735d8 commit f008baa
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 128 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE job_params;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE job_params (
id UUID PRIMARY KEY,
raw_code TEXT,
raw_flow jsonb NULL,
tag VARCHAR(50),
workspace_id VARCHAR(50)
);
201 changes: 128 additions & 73 deletions backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use quick_cache::sync::Cache;
use serde_json::value::RawValue;
use sqlx::Pool;
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
#[cfg(feature = "prometheus")]
use std::sync::atomic::Ordering;
use tokio::io::AsyncReadExt;
Expand Down Expand Up @@ -600,7 +601,7 @@ async fn get_flow_job_debug_info(
Extension(db): Extension<DB>,
Path((w_id, id)): Path<(String, Uuid)>,
) -> error::Result<Response> {
let job = get_queued_job(&id, w_id.as_str(), &db).await?;
let job = get_queued_job_ex(&id, w_id.as_str(), &db, true).await?;
if let Some(job) = job {
let is_flow = job.is_flow();
if job.is_flow_step || !is_flow {
Expand Down Expand Up @@ -692,17 +693,22 @@ fn generate_get_job_query(no_logs: bool, table: &str) -> String {
} else {
format!("right(concat(coalesce({table}.logs, ''), job_logs.logs), 20000)")
};
let join = if no_logs {
let mut join = if no_logs {
"".to_string()
} else {
format!("LEFT JOIN job_logs ON {table}.id = job_logs.job_id")
};
if table == "queue" {
join = format!("{join} LEFT JOIN job_params jp ON jp.id = {table}.id AND jp.workspace_id = {table}.workspace_id")
}
let additional_fields = if table == "completed_job" {
"duration_ms,
success,
result,
deleted,
is_skipped,
raw_code,
raw_flow,
result->'wm_labels' as labels,
CASE WHEN result is null or pg_column_size(result) < 90000 THEN result ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as result"
} else {
Expand All @@ -716,23 +722,42 @@ fn generate_get_job_query(no_logs: bool, table: &str) -> String {
visible_to_owner,
root_job,
leaf_jobs,
tag,
concurrent_limit,
concurrency_time_window_s,
timeout,
flow_step_id,
cache_ttl
cache_ttl,
coalesce(jp.raw_code, queue.raw_code) as raw_code,
coalesce(jp.raw_flow, queue.raw_flow) as raw_flow
"
};
return format!("SELECT
id, {table}.workspace_id, parent_job, created_by, {table}.created_at, started_at, script_hash, script_path,
{table}.id, {table}.workspace_id, parent_job, created_by, {table}.created_at, started_at, script_hash, script_path,
CASE WHEN args is null or pg_column_size(args) < 90000 THEN args ELSE '{{\"reason\": \"WINDMILL_TOO_BIG\"}}'::jsonb END as args,
{log_expr} as logs, raw_code, canceled, canceled_by, canceled_reason, job_kind,
schedule_path, permissioned_as, flow_status, raw_flow, is_flow_step, language,
raw_lock, email, visible_to_owner, mem_peak, tag, priority, {additional_fields}
{log_expr} as logs, canceled, canceled_by, canceled_reason, job_kind,
schedule_path, permissioned_as, flow_status, is_flow_step, language,
raw_lock, email, visible_to_owner, mem_peak, {table}.tag, priority, {additional_fields}
FROM {table}
{join}
WHERE id = $1 AND {table}.workspace_id = $2");
WHERE {table}.id = $1 AND {table}.workspace_id = $2");
}
pub async fn get_queued_job_ex(
job_id: &Uuid,
workspace_id: &str,
db: &DB,
no_logs: bool,
) -> error::Result<Option<QueuedJobEx>> {
Ok(
sqlx::query_as::<_, QueuedJobEx>(if no_logs {
&*GET_QUEUED_JOB_QUERY_NO_LOGS
} else {
&*GET_QUEUED_JOB_QUERY
})
.bind(job_id)
.bind(workspace_id)
.fetch_optional(db)
.await?
)
}
pub async fn get_job_internal(
db: &DB,
Expand All @@ -752,39 +777,22 @@ pub async fn get_job_internal(
.fetch_optional(db)
.await?
.map(Job::CompletedJob);

if let Some(cjob) = cjob_maybe {
Ok(match cjob {
Job::CompletedJob(cjob) => {
if opt_authed.is_some_and(|x| x.is_none()) && cjob.created_by != "anonymous" {
return Err(Error::BadRequest(
"As a non logged in user, you can only see jobs ran by anonymous users"
.to_string(),
));
}
Job::CompletedJobWithFormattedResult(format_completed_job_result(cjob))
}
cjob => cjob,
})

let job = if let Some(cjob) = cjob_maybe {
cjob
} else {
let job_o = sqlx::query_as::<_, QueuedJob>(if no_logs {
&*GET_QUEUED_JOB_QUERY_NO_LOGS
} else {
&*GET_QUEUED_JOB_QUERY
})
.bind(job_id)
.bind(workspace_id)
.fetch_optional(db)
.await?
.map(Job::QueuedJob);
let job: Job = not_found_if_none(job_o, "Job", job_id.to_string())?;
if opt_authed.is_some_and(|x| x.is_none()) && job.created_by() != "anonymous" {
return Err(Error::BadRequest(
"As a non logged in user, you can only see jobs ran by anonymous users".to_string(),
));
}
Ok(job)
let job_maybe = get_queued_job_ex(&job_id, workspace_id, db, no_logs)
.await?
.map(Job::QueuedJob);
not_found_if_none(job_maybe, "Job", job_id.to_string())?
};

if opt_authed.is_some_and(|x| x.is_none()) && job.created_by() != "anonymous" {
return Err(Error::BadRequest(
"As a non logged in user, you can only see jobs ran by anonymous users".to_string(),
));
}
Ok(job)
}

#[cfg(all(feature = "enterprise", feature = "parquet"))]
Expand Down Expand Up @@ -1381,37 +1389,37 @@ async fn cancel_jobs(
, tag
, priority
)
SELECT workspace_id
, id
, parent_job
, created_by
, created_at
SELECT q.workspace_id
, q.id
, q.parent_job
, q.created_by
, q.created_at
, now()
, 0
, false
, script_hash
, script_path
, args
, q.script_hash
, q.script_path
, q.args
, $4
, raw_code
, raw_lock
, coalesce(jp.raw_code, q.raw_code)
, q.raw_lock
, true
, $1
, canceled_reason
, job_kind
, schedule_path
, permissioned_as
, flow_status
, raw_flow
, is_flow_step
, q.canceled_reason
, q.job_kind
, q.schedule_path
, q.permissioned_as
, q.flow_status
, coalesce(jp.raw_flow, q.raw_flow)
, q.is_flow_step
, false
, language
, email
, visible_to_owner
, mem_peak
, tag
, priority FROM queue
WHERE id = any($2) AND running = false AND parent_job IS NULL AND workspace_id = $3 AND schedule_path IS NULL FOR UPDATE SKIP LOCKED
, q.language
, q.email
, q.visible_to_owner
, q.mem_peak
, q.tag
, q.priority FROM queue q JOIN job_params jp USING(id, workspace_id)
WHERE q.id = any($2) AND q.running = false AND q.parent_job IS NULL AND q.workspace_id = $3 AND q.schedule_path IS NULL FOR UPDATE SKIP LOCKED
ON CONFLICT (id) DO NOTHING RETURNING id", username, &jobs, w_id, serde_json::json!({"error": { "message": format!("Job canceled: cancel all by {username}"), "name": "Canceled", "reason": "cancel all", "canceler": username}}))
.fetch_all(&mut *tx)
.await?.into_iter().map(|x| x.id).collect::<Vec<Uuid>>();
Expand All @@ -1423,6 +1431,15 @@ async fn cancel_jobs(
)
.execute(&mut *tx)
.await?;

sqlx::query!(
"DELETE FROM job_params WHERE id = any($1) AND workspace_id = $2",
&trivial_jobs,
w_id
)
.execute(&mut *tx)
.await?;

tx.commit().await?;

// sqlx::query!(
Expand Down Expand Up @@ -2225,10 +2242,50 @@ pub async fn get_resume_urls(
Ok(Json(res))
}

#[derive(sqlx::FromRow, Debug, Serialize)]
pub struct QueuedJobEx {
#[sqlx(flatten)]
#[serde(flatten)]
inner: QueuedJob,

// Imported from `JobParams`:
#[serde(skip_serializing_if = "Option::is_none")]
pub raw_code: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub raw_flow: Option<sqlx::types::Json<Box<RawValue>>>,

#[sqlx(skip)]
#[serde(skip_serializing_if = "Option::is_none")]
pub self_wait_time_ms: Option<i64>,
#[sqlx(skip)]
#[serde(skip_serializing_if = "Option::is_none")]
pub aggregate_wait_time_ms: Option<i64>,
}

impl QueuedJobEx {
pub fn new(self_wait_time_ms: Option<i64>, aggregate_wait_time_ms: Option<i64>, inner: QueuedJob) -> Self {
QueuedJobEx { inner, raw_code: None, raw_flow: None, self_wait_time_ms, aggregate_wait_time_ms }
}
}

impl Deref for QueuedJobEx {
type Target = QueuedJob;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl DerefMut for QueuedJobEx {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

#[derive(Serialize, Debug)]
#[serde(tag = "type")]
pub enum Job {
QueuedJob(QueuedJob),
QueuedJob(QueuedJobEx),
CompletedJob(CompletedJob),
#[serde(rename = "CompletedJob")]
CompletedJobWithFormattedResult(CompletedJobWithFormattedResult),
Expand All @@ -2242,6 +2299,7 @@ impl Job {
Job::CompletedJobWithFormattedResult(job) => &job.cj.created_by,
}
}

pub fn raw_flow(&self) -> Option<FlowValue> {
match self {
Job::QueuedJob(job) => job
Expand Down Expand Up @@ -2593,7 +2651,7 @@ impl<'a> From<UnifiedJob> for Job {
self_wait_time_ms: uj.self_wait_time_ms,
aggregate_wait_time_ms: uj.aggregate_wait_time_ms,
}),
"QueuedJob" => Job::QueuedJob(QueuedJob {
"QueuedJob" => Job::QueuedJob(QueuedJobEx::new(uj.self_wait_time_ms, uj.aggregate_wait_time_ms, QueuedJob {
workspace_id: uj.workspace_id,
id: uj.id,
parent_job: uj.parent_job,
Expand All @@ -2607,7 +2665,6 @@ impl<'a> From<UnifiedJob> for Job {
scheduled_for: uj.scheduled_for.unwrap(),
logs: None,
flow_status: None,
raw_code: None,
raw_lock: None,
canceled: uj.canceled,
canceled_by: uj.canceled_by,
Expand All @@ -2616,7 +2673,6 @@ impl<'a> From<UnifiedJob> for Job {
job_kind: uj.job_kind,
schedule_path: uj.schedule_path,
permissioned_as: uj.permissioned_as,
raw_flow: None,
is_flow_step: uj.is_flow_step,
language: uj.language,
same_worker: false,
Expand All @@ -2634,9 +2690,7 @@ impl<'a> From<UnifiedJob> for Job {
flow_step_id: None,
cache_ttl: None,
priority: uj.priority,
self_wait_time_ms: uj.self_wait_time_ms,
aggregate_wait_time_ms: uj.aggregate_wait_time_ms,
}),
})),
t => panic!("job type {} not valid", t),
}
}
Expand Down Expand Up @@ -3079,19 +3133,20 @@ pub async fn run_workflow_as_code(
i += 1;
}

let job = get_queued_job(&job_id, &w_id, &db).await?;
let job = get_queued_job_ex(&job_id, &w_id, &db, false).await?;

if *CLOUD_HOSTED {
tracing::info!("workflow_as_code_tracing id {i} ");
i += 1;
}

let job = not_found_if_none(job, "Queued Job", &job_id.to_string())?;
let QueuedJobEx { inner: job, raw_code, .. } = job;
let (job_payload, tag, _delete_after_use, timeout) = match job.job_kind {
JobKind::Preview => (
JobPayload::Code(RawCode {
hash: None,
content: job.raw_code.unwrap_or_default(),
content: raw_code.unwrap_or_default(),
path: job.script_path,
language: job.language.unwrap_or_else(|| ScriptLang::Deno),
lock: job.raw_lock,
Expand Down
Loading

0 comments on commit f008baa

Please sign in to comment.