Skip to content

Commit

Permalink
server: fix task manager
Browse files Browse the repository at this point in the history
  • Loading branch information
pnmadelaine committed Sep 30, 2023
1 parent 69bcd3e commit 8b489da
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 38 deletions.
2 changes: 1 addition & 1 deletion typhon/src/evaluations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn evaluate_aux(id: i32, new_jobs: nix::NewJobs) -> Result<(), Error> {

impl Evaluation {
pub async fn cancel(&self) -> Result<(), Error> {
let r = EVALUATIONS.cancel(self.evaluation_id).await;
let r = EVALUATIONS.cancel(&self.evaluation_id).await;
if r {
Ok(())
} else {
Expand Down
6 changes: 3 additions & 3 deletions typhon/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use std::path::Path;

impl Job {
pub async fn cancel(&self) -> Result<(), Error> {
let a = JOBS_PRE.cancel(self.job_id).await;
let b = JOBS_BUILD.cancel(self.job_id).await;
let c = JOBS_POST.cancel(self.job_id).await;
let a = JOBS_PRE.cancel(&self.job_id).await;
let b = JOBS_BUILD.cancel(&self.job_id).await;
let c = JOBS_POST.cancel(&self.job_id).await;
if a || b || c {
Ok(())
} else {
Expand Down
69 changes: 35 additions & 34 deletions typhon/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,33 @@ use std::future::Future;

#[derive(Debug)]
struct TaskHandle {
canceler: Sender<()>,
canceler: Option<Sender<()>>,
waiters: Vec<Sender<()>>,
}

#[derive(Debug)]
struct TasksUnwrapped<Id> {
handles: HashMap<Id, TaskHandle>,
}

#[derive(Debug)]
pub struct Tasks<Id> {
tasks: Mutex<HashMap<Id, TaskHandle>>,
tasks: Mutex<TasksUnwrapped<Id>>,
}

impl<Id: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Send> Tasks<Id> {
pub fn new() -> Self {
Tasks {
tasks: Mutex::new(HashMap::new()),
tasks: Mutex::new(TasksUnwrapped {
handles: HashMap::new(),
}),
}
}

pub async fn wait(&self, id: &Id) -> () {
let mut tasks = self.tasks.lock().await;
let (send, recv) = channel::<()>();
match tasks.get_mut(&id) {
match tasks.handles.get_mut(&id) {
Some(task) => {
task.waiters.push(send);
}
Expand All @@ -39,7 +46,7 @@ impl<Id: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Send> Tasks<Id> {

pub async fn is_running(&self, id: &Id) -> bool {
let tasks = self.tasks.lock().await;
tasks.get(&id).is_some()
tasks.handles.get(&id).is_some()
}

pub async fn run<
Expand All @@ -53,42 +60,36 @@ impl<Id: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Send> Tasks<Id> {
task: T,
f: F,
) -> () {
let mut tasks = self.tasks.lock().await;
let (send, recv) = channel::<()>();
let handle = TaskHandle {
canceler: send,
canceler: Some(send),
waiters: Vec::new(),
};
let mut m = self.tasks.lock().await;
m.insert(id.clone(), handle);
drop(m);
tasks.handles.insert(id.clone(), handle);
drop(tasks);
tokio::spawn(async move {
tokio::select! {
_ = recv => {
f(None).await
},
r = task => {
self.tasks.lock().await.remove(&id).map(|task| {
for send in task.waiters {
let _ = send.send(());
}
});
f(Some(r)).await
},
}
let r = tokio::select! {
_ = recv => None,
r = task => Some(r),
};
f(r).await;
self.tasks.lock().await.handles.remove(&id).map(|handle| {
for send in handle.waiters {
let _ = send.send(());
}
});
});
}

pub async fn cancel(&self, id: Id) -> bool {
let mut tasks = self.tasks.lock().await;
tasks
.remove(&id)
.map(|task| {
for send in task.waiters {
let _ = send.send(());
}
let _ = task.canceler.send(());
true
})
.unwrap_or(false)
pub async fn cancel(&self, id: &Id) -> bool {
self.tasks
.lock()
.await
.handles
.get_mut(&id)
.map(|task| task.canceler.take().map(|send| send.send(())))
.flatten()
.is_some()
}
}

0 comments on commit 8b489da

Please sign in to comment.