Skip to content

Commit

Permalink
server: broadcast events with streams
Browse files Browse the repository at this point in the history
  • Loading branch information
pnmadelaine committed Oct 2, 2023
1 parent d0acd80 commit 60002c8
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 141 deletions.
19 changes: 0 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ actix = "0.13"
actix-cors = "0.6"
actix-files = "0.6"
actix-web = "4.2"
actix-web-actors = "4.1"
age = { version = "0.8", features = ["armor"] }
async-stream = "0.3"
async-trait = "0.1"
Expand Down
1 change: 0 additions & 1 deletion typhon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ authors = [ "Paul-Nicolas Madelaine <[email protected]>" ]
typhon-types.workspace = true
actix-cors.workspace = true
actix-files.workspace = true
actix-web-actors.workspace = true
actix-web.workspace = true
actix.workspace = true
age.workspace = true
Expand Down
15 changes: 10 additions & 5 deletions typhon/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use crate::actions::webhooks;
use crate::listeners::Session;
use crate::requests::*;
use crate::{handle_request, handles, Response, ResponseError, User};
use crate::{BUILD_LOGS, SETTINGS};
use crate::{BUILD_LOGS, EVENT_LOGGER, SETTINGS};
use actix_cors::Cors;
use actix_files::NamedFile;
use actix_web::{
body::EitherBody, guard, http::StatusCode, web, Error, HttpRequest, HttpResponse, Responder,
};
use actix_web_actors::ws;
use std::collections::HashMap;

struct ResponseWrapper(crate::Response);
Expand Down Expand Up @@ -215,8 +213,15 @@ async fn raw_request(
web::Json(handle_request(user, body.into_inner()).await)
}

async fn events(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
ws::start(Session::new(), &req, stream)
async fn events() -> Option<HttpResponse> {
use futures::StreamExt;
EVENT_LOGGER.lock().await.listen().map(|stream| {
HttpResponse::Ok().streaming(stream.map(|x: typhon_types::Event| {
Ok::<_, actix_web::Error>(actix_web::web::Bytes::from(
serde_json::to_string(&x).unwrap(),
))
}))
})
}

async fn webhook(
Expand Down
2 changes: 1 addition & 1 deletion typhon/src/evaluations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl Evaluation {
gcroots::update(&mut *conn);
drop(conn);

log_event(Event::EvaluationFinished(handle));
log_event(Event::EvaluationFinished(handle)).await;
};
EVALUATIONS.run(id, task, f).await?;

Expand Down
48 changes: 48 additions & 0 deletions typhon/src/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use typhon_types::Event;

use futures_core::stream::Stream;
use tokio::sync::mpsc::*;

pub struct EventLogger {
senders: Vec<Sender<Event>>,
shutdown: bool,
}

impl EventLogger {
pub fn new() -> Self {
Self {
senders: Vec::new(),
shutdown: false,
}
}

pub async fn log(&mut self, e: Event) {
let mut senders: Vec<Sender<Event>> = Vec::new();
for sender in self.senders.drain(..) {
match sender.send(e.clone()).await {
Ok(()) => senders.push(sender),
Err(_) => (),
};
}
self.senders = senders;
}

pub fn listen(&mut self) -> Option<impl Stream<Item = Event>> {
if self.shutdown {
None
} else {
let (sender, mut receiver) = channel(256);
self.senders.push(sender);
Some(async_stream::stream! {
while let Some(e) = receiver.recv().await {
yield e;
}
})
}
}

pub fn shutdown(&mut self) {
self.shutdown = true;
self.senders.clear()
}
}
6 changes: 3 additions & 3 deletions typhon/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl Job {
))
.execute(&mut *conn);
drop(conn);
log_event(Event::JobUpdated(handle_2));
log_event(Event::JobUpdated(handle_2)).await;
};
JOBS_BEGIN.run(id, task_begin, finish_begin).await?;

Expand Down Expand Up @@ -202,7 +202,7 @@ impl Job {
))
.execute(&mut *conn);
drop(conn);
log_event(Event::JobUpdated(handle_3));
log_event(Event::JobUpdated(handle_3)).await;
};
JOBS_BUILD.run(id, task_build, finish_build).await?;

Expand Down Expand Up @@ -264,7 +264,7 @@ impl Job {
.set((job_end_status.eq(status), job_end_time_finished.eq(now())))
.execute(&mut *conn);
drop(conn);
log_event(Event::JobUpdated(handle_5));
log_event(Event::JobUpdated(handle_5)).await;
};
JOBS_END.run(id, task_end, finish_end).await?;

Expand Down
2 changes: 1 addition & 1 deletion typhon/src/jobsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Jobset {
drop(conn);

let handle = evaluation.handle().await?;
log_event(Event::EvaluationNew(handle.clone()));
log_event(Event::EvaluationNew(handle.clone())).await;
evaluation.run().await?;

Ok(handle)
Expand Down
15 changes: 8 additions & 7 deletions typhon/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod actions;
mod error;
mod evaluations;
mod events;
mod gcroots;
mod jobs;
mod jobsets;
Expand All @@ -11,7 +12,6 @@ mod schema;
mod time;

pub mod api;
pub mod listeners;
pub mod logs;
pub mod tasks;

Expand Down Expand Up @@ -101,8 +101,8 @@ pub static CONNECTION: Lazy<Connection> = Lazy::new(|| {
.unwrap_or_else(|e| panic!("Error connecting to {}, with error {:#?}", database_url, e));
Connection::new(conn)
});
pub static LISTENERS: Lazy<Mutex<listeners::Listeners>> =
Lazy::new(|| Mutex::new(listeners::Listeners::new()));
pub static EVENT_LOGGER: Lazy<Mutex<events::EventLogger>> =
Lazy::new(|| Mutex::new(events::EventLogger::new()));
pub static BUILD_LOGS: Lazy<logs::live::Cache<nix::DrvPath>> = Lazy::new(logs::live::Cache::new);
pub static CURRENT_SYSTEM: Lazy<String> = Lazy::new(nix::current_system);

Expand Down Expand Up @@ -271,16 +271,17 @@ pub async fn handle_request(user: User, req: requests::Request) -> Result<Respon
})?)
}

pub fn log_event(event: Event) {
pub async fn log_event(event: Event) {
log::info!("event: {:?}", event);
let _ = tokio::spawn(async move {
LISTENERS.lock().await.log(event);
});
EVENT_LOGGER.lock().await.log(event).await;
}

pub async fn shutdown() {
eprintln!("Typhon is shutting down...");
tokio::join!(
async {
EVENT_LOGGER.lock().await.shutdown();
},
EVALUATIONS.shutdown(),
JOBS_BUILD.shutdown(),
JOBS_BEGIN.shutdown(),
Expand Down
97 changes: 0 additions & 97 deletions typhon/src/listeners.rs

This file was deleted.

12 changes: 6 additions & 6 deletions typhon/src/projects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Project {
.values(&new_project)
.execute(&mut *conn)?;
drop(conn);
log_event(Event::ProjectNew(project_handle.clone()));
log_event(Event::ProjectNew(project_handle.clone())).await;
Ok(())
}
}
Expand All @@ -67,7 +67,7 @@ impl Project {
pub async fn delete(&self) -> Result<(), Error> {
let mut conn = connection().await;
diesel::delete(projects.find(self.project_id)).execute(&mut *conn)?;
log_event(Event::ProjectDeleted(self.handle()));
log_event(Event::ProjectDeleted(self.handle())).await;
Ok(())
}

Expand Down Expand Up @@ -174,7 +174,7 @@ impl Project {
.execute(&mut *conn)?;
gcroots::update(&mut *conn);
drop(conn);
log_event(Event::ProjectUpdated(self.handle()));
log_event(Event::ProjectUpdated(self.handle())).await;

Ok(())
}
Expand All @@ -185,7 +185,7 @@ impl Project {
.set((project_url.eq(&decl.url), project_legacy.eq(decl.legacy)))
.execute(&mut *conn)?;
drop(conn);
log_event(Event::ProjectUpdated(self.handle()));
log_event(Event::ProjectUpdated(self.handle())).await;
Ok(())
}

Expand All @@ -196,7 +196,7 @@ impl Project {
.set(project_key.eq(key))
.execute(&mut *conn)?;
drop(conn);
log_event(Event::ProjectUpdated(self.handle()));
log_event(Event::ProjectUpdated(self.handle())).await;
Ok(())
}

Expand Down Expand Up @@ -265,7 +265,7 @@ impl Project {
gcroots::update(&mut *conn);
drop(conn);

log_event(Event::ProjectJobsetsUpdated(self.handle()));
log_event(Event::ProjectJobsetsUpdated(self.handle())).await;

Ok(decls.into_keys().collect())
}
Expand Down

0 comments on commit 60002c8

Please sign in to comment.