Skip to content

Commit

Permalink
Re-format
Browse files Browse the repository at this point in the history
  • Loading branch information
valff committed Oct 31, 2024
1 parent 6260dd9 commit 0bb0236
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 80 deletions.
54 changes: 42 additions & 12 deletions agentwire/src/agent/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use nix::{
sys::signal::{self, Signal},
unistd::Pid,
};
use rkyv::{de::deserializers::SharedDeserializeMap, Archive, Deserialize, Infallible, Serialize};
use rkyv::{
de::deserializers::SharedDeserializeMap, Archive, Deserialize, Infallible,
Serialize,
};
use std::{
env,
error::Error,
Expand Down Expand Up @@ -104,7 +107,8 @@ where
<Self as Archive>::Archived: Deserialize<Self, Infallible>,
Self::Input: Archive + for<'a> Serialize<SharedSerializer<'a>>,
Self::Output: Archive + for<'a> Serialize<SharedSerializer<'a>>,
<Self::Output as Archive>::Archived: Deserialize<Self::Output, SharedDeserializeMap>,
<Self::Output as Archive>::Archived:
Deserialize<Self::Output, SharedDeserializeMap>,
{
/// Error type returned by the agent.
type Error: Debug;
Expand Down Expand Up @@ -136,9 +140,13 @@ where
wait_kill_rx.await.unwrap();
tracing::info!("Process agent {} killed", Self::NAME);
};
let spawn_process = spawn_process_impl(self, inner, send_kill_rx, wait_kill_tx, logger);
let spawn_process =
spawn_process_impl(self, inner, send_kill_rx, wait_kill_tx, logger);
spawn_named_thread(format!("proc-ipc-{}", Self::NAME), || {
let rt = runtime::Builder::new_current_thread().enable_all().build().unwrap();
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(task::LocalSet::new().run_until(spawn_process));
});
(outer, kill.boxed())
Expand Down Expand Up @@ -171,7 +179,9 @@ where
/// This function must be called as early in the program lifetime as possible.
/// Everything before this function call gets duplicated for each process-based
/// agent.
pub fn init(call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box<dyn Error>>) {
pub fn init(
call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box<dyn Error>>,
) {
match (env::var(SHMEM_ENV), env::var(PARENT_PID_ENV)) {
(Ok(shmem), Ok(parent_pid)) => {
let result = unsafe { libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL) };
Expand All @@ -188,7 +198,9 @@ pub fn init(call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box<dyn
}
let shmem_fd = unsafe {
OwnedFd::from_raw_fd(
shmem.parse::<RawFd>().expect("shared memory file descriptor to be an integer"),
shmem
.parse::<RawFd>()
.expect("shared memory file descriptor to be an integer"),
)
};
// Agent's name is the first argument.
Expand All @@ -198,7 +210,9 @@ pub fn init(call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box<dyn
.expect("mega-agent process name should start with 'proc-'");
match call_process_agent(name, shmem_fd) {
Ok(()) => tracing::warn!("Agent {name} exited"),
Err(err) => tracing::error!("Agent {name} exited with an error: {err:#?}"),
Err(err) => {
tracing::error!("Agent {name} exited with an error: {err:#?}")
}
}
process::exit(1);
}
Expand All @@ -215,7 +229,11 @@ pub fn init(call_process_agent: impl FnOnce(&str, OwnedFd) -> Result<(), Box<dyn
}

/// Creates a default process agent logger.
pub async fn default_logger(agent_name: &'static str, stdout: ChildStdout, stderr: ChildStderr) {
pub async fn default_logger(
agent_name: &'static str,
stdout: ChildStdout,
stderr: ChildStderr,
) {
let mut stdout = BufReader::new(stdout).lines();
let mut stderr = BufReader::new(stderr).lines();
loop {
Expand Down Expand Up @@ -265,7 +283,8 @@ async fn spawn_process_impl<T: Process, Fut, F>(
let (shmem_fd, close) = inner
.into_shared_memory(T::NAME, &init_state, recovered_inputs)
.expect("couldn't initialize shared memory");
let exe = env::current_exe().expect("couldn't determine current executable file");
let exe =
env::current_exe().expect("couldn't determine current executable file");

let initializer = T::initializer();
let mut child_fds = initializer.keep_file_descriptors();
Expand All @@ -275,7 +294,10 @@ async fn spawn_process_impl<T: Process, Fut, F>(
.arg0(format!("proc-{}", T::NAME))
.args(
env::var(ARGS_ENV)
.map(|args| shell_words::split(&args).expect("invalid process arguments"))
.map(|args| {
shell_words::split(&args)
.expect("invalid process arguments")
})
.unwrap_or_default(),
)
.envs(initializer.envs())
Expand All @@ -295,8 +317,16 @@ async fn spawn_process_impl<T: Process, Fut, F>(
drop(shmem_fd);
drop(initializer);
let pid = Pid::from_raw(child.id().unwrap().try_into().unwrap());
task::spawn(logger(T::NAME, child.stdout.take().unwrap(), child.stderr.take().unwrap()));
tracing::info!("Process agent {} spawned with PID: {}", T::NAME, pid.as_raw());
task::spawn(logger(
T::NAME,
child.stdout.take().unwrap(),
child.stderr.take().unwrap(),
));
tracing::info!(
"Process agent {} spawned with PID: {}",
T::NAME,
pid.as_raw()
);
match future::select(Box::pin(child.wait()), &mut send_kill_rx).await {
Either::Left((status, _)) => {
let status = status.expect("failed to run a sub-process");
Expand Down
10 changes: 8 additions & 2 deletions agentwire/src/agent/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ pub trait Task: Agent + Send {
type Error: Debug;

/// Runs the agent event-loop inside a dedicated asynchronous task.
fn run(self, port: port::Inner<Self>) -> impl Future<Output = Result<(), Self::Error>> + Send;
fn run(
self,
port: port::Inner<Self>,
) -> impl Future<Output = Result<(), Self::Error>> + Send;

/// Spawns a new task running the agent event-loop and returns a handle for
/// bi-directional communication with the agent.
Expand All @@ -23,7 +26,10 @@ pub trait Task: Agent + Send {
tracing::warn!("Task agent {} exited", Self::NAME);
}
Err(err) => {
tracing::error!("Task agent {} exited with error: {err:#?}", Self::NAME);
tracing::error!(
"Task agent {} exited with error: {err:#?}",
Self::NAME
);
}
}
});
Expand Down
5 changes: 4 additions & 1 deletion agentwire/src/agent/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ pub trait Thread: Agent + Send {
tracing::warn!("Thread agent {} exited", Self::NAME);
}
Err(err) => {
tracing::error!("Thread agent {} exited with error: {err:#?}", Self::NAME);
tracing::error!(
"Thread agent {} exited with error: {err:#?}",
Self::NAME
);
}
}
});
Expand Down
3 changes: 2 additions & 1 deletion agentwire/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ where
.name(name.clone())
.spawn(move || {
if let Ok(title) = CString::new(name.as_bytes()) {
let result = unsafe { libc::prctl(libc::PR_SET_NAME, title.as_ptr(), 0, 0, 0) };
let result =
unsafe { libc::prctl(libc::PR_SET_NAME, title.as_ptr(), 0, 0, 0) };
if result == -1 {
eprintln!(
"failed to set thread name to '{name}': {:#?}",
Expand Down
Loading

0 comments on commit 0bb0236

Please sign in to comment.