Skip to content

Commit

Permalink
feat: Introduce IntoTaskHandlerResult trait
Browse files Browse the repository at this point in the history
It allows to have no return type in the TaskHandler trait implementation
  • Loading branch information
leo91000 committed May 29, 2024
1 parent b735462 commit 3ccc4d2
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 23 deletions.
7 changes: 1 addition & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,10 @@ pub struct CounterTask;
impl TaskHandler for CounterTask {
const IDENTIFIER: &'static str = "counter_task";
async fn run(self, _ctx: WorkerContext) -> Result<(), ()> {
async fn run(self, ctx: WorkerContext) {
let app_state = ctx.extensions().get::<AppState>().unwrap();
let run_count = app_state.run_count.fetch_add(1, SeqCst);
println!("Run count: {run_count}");
Ok(())
}
}
Expand All @@ -138,10 +137,6 @@ async fn main() -> Result<(), ()> {
}
```

### Success!

You should see the worker output `Hello Bobby Tables !`. Gosh, that was fast!

## Features

- Standalone and embedded modes
Expand Down
43 changes: 30 additions & 13 deletions crates/task_handler/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,41 @@ use serde::Serialize;
use std::fmt::Debug;
use std::future::Future;

pub trait IntoTaskHandlerResult {
fn into_task_handler_result(self) -> Result<(), impl Debug>;
}

impl IntoTaskHandlerResult for () {
fn into_task_handler_result(self) -> Result<(), impl Debug> {
Ok::<_, ()>(())
}
}

impl<D: Debug> IntoTaskHandlerResult for Result<(), D> {
fn into_task_handler_result(self) -> Result<(), impl Debug> {
self
}
}

pub trait TaskHandler: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
const IDENTIFIER: &'static str;

fn run(
self,
ctx: WorkerContext,
) -> impl Future<Output = Result<(), impl Debug>> + Send + 'static;
) -> impl Future<Output = impl IntoTaskHandlerResult> + Send + 'static;
}

fn run_from_ctx(
worker_context: WorkerContext,
) -> impl Future<Output = Result<(), String>> + Send + 'static {
let job = serde_json::from_value::<Self>(worker_context.payload().clone());
async move {
let Ok(job) = job else {
let e = job.err().unwrap();
return Err(format!("{e:?}"));
};
job.run(worker_context).await.map_err(|e| format!("{e:?}"))
}
}
pub async fn run_task_from_worker_ctx<T: TaskHandler>(
worker_context: WorkerContext,
) -> Result<(), String> {
let job = serde_json::from_value::<T>(worker_context.payload().clone());
let Ok(job) = job else {
let e = job.err().unwrap();
return Err(format!("{e:?}"));
};
job.run(worker_context)
.await
.into_task_handler_result()
.map_err(|e| format!("{e:?}"))
}
2 changes: 2 additions & 0 deletions crates/task_handler/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod handler;

pub use handler::run_task_from_worker_ctx;
pub use handler::IntoTaskHandlerResult;
pub use handler::TaskHandler;
3 changes: 1 addition & 2 deletions examples/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ struct ShowRunCount;
impl TaskHandler for ShowRunCount {
const IDENTIFIER: &'static str = "show_run_count";

async fn run(self, ctx: WorkerContext) -> Result<(), String> {
async fn run(self, ctx: WorkerContext) {
let app_state = ctx.extensions().get::<AppState>().unwrap();
let run_count = app_state.run_count.fetch_add(1, SeqCst);
println!("Run count: {run_count}");
Ok(())
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use graphile_worker_ctx::WorkerContext;
use graphile_worker_extensions::Extensions;
use graphile_worker_migrations::migrate;
use graphile_worker_shutdown_signal::shutdown_signal;
use graphile_worker_task_handler::TaskHandler;
use graphile_worker_task_handler::{run_task_from_worker_ctx, TaskHandler};
use rand::RngCore;
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
Expand Down Expand Up @@ -149,7 +149,7 @@ impl WorkerOptions {

let worker_fn = move |ctx: WorkerContext| {
let ctx = ctx.clone();
T::run_from_ctx(ctx).boxed()
run_task_from_worker_ctx::<T>(ctx).boxed()
};

self.jobs
Expand Down

0 comments on commit 3ccc4d2

Please sign in to comment.