Skip to content

Commit

Permalink
tests: Add more run_once test
Browse files Browse the repository at this point in the history
  • Loading branch information
leo91000 committed Feb 4, 2024
1 parent 4395c93 commit 3f62ac2
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 1 deletion.
20 changes: 20 additions & 0 deletions tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use sqlx::PgPool;
use tokio::sync::Mutex;
use tokio::sync::OnceCell;
use tokio::task::LocalSet;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;

#[derive(FromRow, Debug)]
pub struct Job {
Expand Down Expand Up @@ -200,3 +203,20 @@ impl StaticCounter {
*cell.lock().await
}
}

pub async fn enable_logs() {
static ONCE: OnceCell<()> = OnceCell::const_new();

ONCE.get_or_init(|| async {
let fmt_layer = tracing_subscriber::fmt::layer();
// Log level set to debug except for sqlx set at warn (to not show all sql requests)
let filter_layer =
EnvFilter::try_new("debug,sqlx=warn,graphile_worker_migrations=warn").unwrap();

tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.init();
})
.await;
}
136 changes: 135 additions & 1 deletion tests/run_once.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::ops::Add;

use crate::helpers::StaticCounter;
use chrono::Utc;
use chrono::{Timelike, Utc};
use graphile_worker::JobSpec;
use serde_json::json;

Expand Down Expand Up @@ -271,3 +273,135 @@ async fn it_should_supports_future_scheduled_jobs() {
})
.await;
}

#[tokio::test]
async fn it_shoud_allow_update_of_pending_jobs() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();

helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_raw_job("job3", |_, payload: serde_json::Value| async move {
JOB3_CALL_COUNT.increment().await;
assert_eq!(payload, json!({ "a": "right" }));
Ok(()) as Result<(), ()>
})
.init()
.await
.expect("Failed to create worker");

// Rust is more precise than postgres, so we need to remove the nanoseconds
let run_at = Utc::now()
.add(chrono::Duration::seconds(60))
.with_nanosecond(0)
.unwrap();
let utils = worker.create_utils();
// Schedule a future job - note incorrect payload
utils
.add_raw_job(
"job3",
json!({ "a": "wrong" }),
Some(JobSpec {
run_at: Some(run_at),
job_key: Some("abc".into()),
..Default::default()
}),
)
.await
.expect("Failed to add job");

// Assert that it has an entry in jobs / job_queues
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 1);
let job = &jobs[0];
assert_eq!(job.run_at, run_at);

// Run all jobs (none are ready)
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 0);

// update the job to run immediately with correct payload
let now = Utc::now().with_nanosecond(0).unwrap();
utils
.add_raw_job(
"job3",
json!({ "a": "right" }),
Some(JobSpec {
run_at: Some(now),
job_key: Some("abc".into()),
..Default::default()
}),
)
.await
.expect("Failed to add job");

// Assert that it has updated the existing entry and not created a new one
let updated_jobs = test_db.get_jobs().await;
assert_eq!(updated_jobs.len(), 1);
let updated_job = &updated_jobs[0];
assert_eq!(job.id, updated_job.id);
assert_eq!(updated_job.run_at, now);

// Run the task
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 1);
})
.await;
}

#[tokio::test]
async fn it_schedules_a_new_job_if_existing_is_completed() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();

helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_raw_job("job3", |_, _: serde_json::Value| async move {
JOB3_CALL_COUNT.increment().await;
Ok(()) as Result<(), ()>
})
.init()
.await
.expect("Failed to create worker");

let utils = worker.create_utils();
// Schedule a job to run immediately with job key "abc" and a payload.
utils
.add_raw_job(
"job3",
json!({ "a": "first" }),
Some(JobSpec {
job_key: Some("abc".into()),
..Default::default()
}),
)
.await
.expect("Failed to add job");

// Run the worker once to process the job.
worker.run_once().await.expect("Failed to run worker");

// Assert the job has been called once.
assert_eq!(JOB3_CALL_COUNT.get().await, 1);

// Attempt to update the job. Since the previous job is completed, it should schedule a new one.
utils
.add_raw_job(
"job3",
json!({ "a": "second" }),
Some(JobSpec {
job_key: Some("abc".into()),
..Default::default()
}),
)
.await
.expect("Failed to add job");

// Run the worker again to process the new job.
worker.run_once().await.expect("Failed to run worker");

// Assert the job has been called twice, indicating the new job was scheduled and run.
assert_eq!(JOB3_CALL_COUNT.get().await, 2);
})
.await;
}

0 comments on commit 3f62ac2

Please sign in to comment.