Skip to content

Commit

Permalink
server: simplify task manager
Browse files Browse the repository at this point in the history
  • Loading branch information
pnmadelaine committed Oct 11, 2023
1 parent 9efbf75 commit 4394127
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 46 deletions.
11 changes: 4 additions & 7 deletions typhon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,10 @@ pub static SETTINGS: Lazy<Settings> = Lazy::new(|| {
webroot: args.webroot.clone(),
}
});
pub static EVALUATIONS: Lazy<tasks::Tasks<i32, Result<nix::NewJobs, nix::Error>>> =
Lazy::new(tasks::Tasks::new);
pub static JOBS_BEGIN: Lazy<tasks::Tasks<i32, Result<String, Error>>> =
Lazy::new(tasks::Tasks::new);
pub static JOBS_BUILD: Lazy<tasks::Tasks<i32, Option<Result<nix::DrvOutputs, nix::Error>>>> =
Lazy::new(tasks::Tasks::new);
pub static JOBS_END: Lazy<tasks::Tasks<i32, Result<String, Error>>> = Lazy::new(tasks::Tasks::new);
pub static EVALUATIONS: Lazy<tasks::Tasks<i32>> = Lazy::new(tasks::Tasks::new);
pub static JOBS_BEGIN: Lazy<tasks::Tasks<i32>> = Lazy::new(tasks::Tasks::new);
pub static JOBS_BUILD: Lazy<tasks::Tasks<i32>> = Lazy::new(tasks::Tasks::new);
pub static JOBS_END: Lazy<tasks::Tasks<i32>> = Lazy::new(tasks::Tasks::new);
pub static CONNECTION: Lazy<Connection> = Lazy::new(|| {
use diesel::Connection as _;
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
Expand Down
2 changes: 1 addition & 1 deletion typhon/src/nix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,6 @@ pub mod build {
}
}

static TASKS: Lazy<Tasks<DrvPath, Output>> = Lazy::new(Tasks::new);
static TASKS: Lazy<Tasks<DrvPath>> = Lazy::new(Tasks::new);
pub static BUILDS: Lazy<Builder> = Lazy::new(Builder::new);
}
58 changes: 20 additions & 38 deletions typhon/src/tasks.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use futures::future::BoxFuture;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
Expand All @@ -18,16 +17,10 @@ impl std::fmt::Display for Error {
}
}

type CallbackFuture<'a, T> = Box<dyn (FnOnce(T) -> BoxFuture<'a, ()>) + Send + Sync>;

enum Msg<Id, T> {
enum Msg<Id> {
Cancel(Id),
Finish(Id),
Run(
Id,
BoxFuture<'static, T>,
CallbackFuture<'static, Option<T>>,
),
Run(Id, oneshot::Sender<()>, JoinHandle<()>),
Shutdown,
Wait(Id, oneshot::Sender<()>),
}
Expand All @@ -38,23 +31,17 @@ struct TaskHandle {
waiters: Vec<oneshot::Sender<()>>,
}

pub struct Tasks<Id, T> {
pub struct Tasks<Id> {
handle: Mutex<Option<JoinHandle<()>>>,
sender: mpsc::Sender<Msg<Id, T>>,
sender: mpsc::Sender<Msg<Id>>,
}

impl<
Id: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Send + Sync + 'static,
T: Send + 'static,
> Tasks<Id, T>
{
impl<Id: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Send + Sync + 'static> Tasks<Id> {
pub fn new() -> Self {
let (sender, mut receiver) = mpsc::channel(256);
let sender_self = sender.clone();
let handle = tokio::spawn(async move {
let mut tasks: HashMap<Id, TaskHandle> = HashMap::new();
while let Some(msg) = receiver.recv().await {
let sender_self = sender_self.clone();
match msg {
Msg::Cancel(id) => {
let _ = tasks
Expand All @@ -69,19 +56,9 @@ impl<
}
}
}
Msg::Run(id, task, finish) => {
let (send, recv) = oneshot::channel::<()>();
let id_bis = id.clone();
let handle = tokio::spawn(async move {
let r = tokio::select! {
_ = recv => None,
r = task => Some(r),
};
finish(r).await;
let _ = sender_self.send(Msg::Finish(id_bis)).await;
});
Msg::Run(id, sender, handle) => {
let task = TaskHandle {
canceler: Some(send),
canceler: Some(sender),
handle,
waiters: Vec::new(),
};
Expand Down Expand Up @@ -125,6 +102,7 @@ impl<

// TODO: `finish` should be able to output an error
pub async fn run<
T: Send + 'static,
O: Future<Output = T> + Send + 'static,
U: Future<Output = ()> + Send + 'static,
F: (FnOnce(Option<T>) -> U) + Send + Sync + 'static,
Expand All @@ -134,14 +112,18 @@ impl<
task: O,
finish: F,
) {
let _ = self
.sender
.send(Msg::Run(
id,
Box::pin(task),
Box::new(|x| Box::pin(finish(x))),
))
.await;
let (send, recv) = oneshot::channel::<()>();
let sender_self = self.sender.clone();
let id_bis = id.clone();
let handle = tokio::spawn(async move {
let r = tokio::select! {
_ = recv => None,
r = task => Some(r),
};
finish(r).await;
let _ = sender_self.send(Msg::Finish(id_bis)).await;
});
let _ = self.sender.send(Msg::Run(id, send, handle)).await;
}

pub async fn cancel(&self, id: Id) {
Expand Down

0 comments on commit 4394127

Please sign in to comment.