Skip to content

Commit

Permalink
server: fix task manager
Browse files Browse the repository at this point in the history
  • Loading branch information
pnmadelaine committed Sep 30, 2023
1 parent 69bcd3e commit 8f596f0
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 29 deletions.
2 changes: 1 addition & 1 deletion typhon/src/evaluations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn evaluate_aux(id: i32, new_jobs: nix::NewJobs) -> Result<(), Error> {

impl Evaluation {
pub async fn cancel(&self) -> Result<(), Error> {
let r = EVALUATIONS.cancel(self.evaluation_id).await;
let r = EVALUATIONS.cancel(&self.evaluation_id).await;
if r {
Ok(())
} else {
Expand Down
6 changes: 3 additions & 3 deletions typhon/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use std::path::Path;

impl Job {
pub async fn cancel(&self) -> Result<(), Error> {
let a = JOBS_PRE.cancel(self.job_id).await;
let b = JOBS_BUILD.cancel(self.job_id).await;
let c = JOBS_POST.cancel(self.job_id).await;
let a = JOBS_PRE.cancel(&self.job_id).await;
let b = JOBS_BUILD.cancel(&self.job_id).await;
let c = JOBS_POST.cancel(&self.job_id).await;
if a || b || c {
Ok(())
} else {
Expand Down
42 changes: 17 additions & 25 deletions typhon/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::future::Future;

#[derive(Debug)]
struct TaskHandle {
canceler: Sender<()>,
canceler: Option<Sender<()>>,
waiters: Vec<Sender<()>>,
}

Expand Down Expand Up @@ -55,40 +55,32 @@ impl<Id: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Send> Tasks<Id> {
) -> () {
let (send, recv) = channel::<()>();
let handle = TaskHandle {
canceler: send,
canceler: Some(send),
waiters: Vec::new(),
};
let mut m = self.tasks.lock().await;
m.insert(id.clone(), handle);
drop(m);
tokio::spawn(async move {
tokio::select! {
_ = recv => {
f(None).await
},
r = task => {
self.tasks.lock().await.remove(&id).map(|task| {
for send in task.waiters {
let _ = send.send(());
}
});
f(Some(r)).await
},
}
let r = tokio::select! {
_ = recv => None,
r = task => Some(r),
};
f(r).await;
self.tasks.lock().await.remove(&id).map(|task| {
for send in task.waiters {
let _ = send.send(());
}
});
});
}

pub async fn cancel(&self, id: Id) -> bool {
pub async fn cancel(&self, id: &Id) -> bool {
let mut tasks = self.tasks.lock().await;
tasks
.remove(&id)
.map(|task| {
for send in task.waiters {
let _ = send.send(());
}
let _ = task.canceler.send(());
true
})
.unwrap_or(false)
.get_mut(&id)
.map(|task| task.canceler.take().map(|send| send.send(())))
.flatten()
.is_some()
}
}

0 comments on commit 8f596f0

Please sign in to comment.