Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backend Expose Test #534

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions examples/actix-web/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,28 @@ async fn main() -> Result<()> {
future::try_join(http, worker).await?;
Ok(())
}

// #[cfg(test)]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this

// mod tests {

// async fn push_email_test() -> () {
// // curl an email
// // check redis if email job exists
// let client = reqwest::Client::new();
// let res = client
// .post("http://localhost:8000/emails/push")
// .body(
// "{
// 'to': '[email protected]',
// 'subject': 'Message from Web',
// 'text': 'This is the text'
// }",
// )
// .send()
// .await?;

// let redis_url = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL");
// let conn = apalis_redis::connect(redis_url);
// let exists: bool = conn
// }
// }
65 changes: 64 additions & 1 deletion packages/apalis-sql/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ impl<J: 'static + Serialize + DeserializeOwned + Unpin + Send + Sync> BackendExp
COUNT(1) FILTER (WHERE status = 'Killed') AS killed
FROM Jobs WHERE job_type = ?";

let res: (i64, i64, i64, i64, i64, i64) = sqlx::query_as(fetch_query)
let res: (i64, i64, i64, i64, i64) = sqlx::query_as(fetch_query)
.bind(self.get_config().namespace())
.fetch_one(self.pool())
.await?;
Expand Down Expand Up @@ -883,4 +883,67 @@ mod tests {
assert_eq!(*ctx.last_error(), Some("{\"Ok\":\"success\"}".to_owned()));
assert_eq!(job.parts.attempt.current(), 1);
}

#[tokio::test]
async fn test_list_workers() {
let mut storage = setup().await;
register_worker(&mut storage).await;

let workers = storage.list_workers().await.expect("no workers found!");
assert_eq!(workers.len(), 1);
}

/// Pushes email jobs and verifies they are listed correctly.
#[tokio::test]
async fn test_push_and_list_10_jobs() {
let mut storage = setup().await;

// Push 100 email jobs
for i in 0..10 {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pushes 10 not 100

push_email(
&mut storage,
Email {
subject: format!("Test Subject {i}"),
to: format!("user{i}@example.com"),
text: format!("This is test email number {i}"),
},
)
.await;
}

// Verify the count of jobs
let jobs_num = storage
.list_jobs(&State::Pending, 1)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to ensure other states return nothing

.await
.expect("Failed to get job Count");
assert_eq!(jobs_num.len(), 10);
}

#[tokio::test]
async fn test_stats() {
let mut storage = setup().await;
let worker = register_worker(&mut storage).await;

// Push 100 email jobs
for i in 0..10 {
push_email(
&mut storage,
Email {
subject: format!("Test Subject {i}"),
to: format!("user{i}@example.com"),
text: format!("This is test email number {i}"),
},
)
.await;
}

let stats = storage.stats().await.expect("Failed to get stats");

assert_eq!(stats.pending, 10);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert everything not just pending


let job = consume_one(&mut storage, &worker).await;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than consume one, take a look at this approach:

let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
    // TODO: fail some jobs here
    Ok::<_, io::Error>(request.args)
});
let (mut t, poller) = TestWrapper::new_with_service(backend, service);
tokio::spawn(poller);
let res = t.len().await.unwrap();
assert_eq!(res, 0, "There should be no jobs");
t.push(1).await.unwrap();
let res = t.len().await.unwrap();
assert_eq!(res, 1, "A job exists");
let res = t.execute_next().await.unwrap();
assert_eq!(res.1, Ok("1".to_owned()));
apalis_core::sleep(Duration::from_secs(1)).await;
let res = t.len().await.unwrap();
assert_eq!(res, 0, "There should be no job");

I would repeat this several times in a loop, replacing t.len().await.unwrap(); with the stats call.

let ctx = job.parts.context;
assert_eq!(stats.pending, 10);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you consume one then this should be 9

assert_eq!(*ctx.status(), State::Running);
}
}
Loading