Skip to content

Commit

Permalink
fix(octez-thread): clean up octez processes when error occurs before …
Browse files Browse the repository at this point in the history
…handle join
  • Loading branch information
zcabter authored and johnyob committed Jul 22, 2024
1 parent 3b57b56 commit d87a03c
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 31 deletions.
136 changes: 124 additions & 12 deletions crates/jstz_cli/src/sandbox/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use octez::OctezThread;
use regex::Regex;
use std::io::Write;
use std::{
cell::RefCell,
env,
fs::{self, File, OpenOptions},
io::{BufRead, BufReader, Seek},
path::PathBuf,
process::{Child, Command, Stdio},
rc::Rc,
thread::{self, sleep},
time::Duration,
};
Expand Down Expand Up @@ -100,6 +102,114 @@ pub struct SandboxBootstrapAccount<'a> {
pub secret: &'a str,
}

pub struct Sandbox {
config: Rc<RefCell<Config>>,
log_file: Rc<RefCell<File>>,
octez_node: Option<OctezThread>,
octez_baker: Option<OctezThread>,
rollup_node: Option<OctezThread>,
}

impl Sandbox {
pub fn new(config: Rc<RefCell<Config>>, log_file: Rc<RefCell<File>>) -> Sandbox {
Sandbox {
config,
log_file,
octez_node: None,
octez_baker: None,
rollup_node: None,
}
}

fn shutdown(&mut self) -> Result<()> {
if let Some(mut t) = self.rollup_node.take() {
t.shutdown()?;
}
if let Some(mut b) = self.octez_baker.take() {
b.shutdown()?;
}
if let Some(mut n) = self.octez_node.take() {
n.shutdown()?;
}
self.remove_sandbox_from_config()?;
Ok(())
}

fn remove_sandbox_from_config(&mut self) -> Result<()> {
let mut config = self.config.borrow_mut();
config.reload()?;
config.sandbox = None;
config.save()
}

pub fn join(&mut self) -> Result<()> {
let mut threads: Vec<&mut OctezThread> = Vec::new();
if let Some(n) = &mut self.octez_node {
threads.push(n);
}

if let Some(b) = &mut self.octez_baker {
threads.push(b)
}

if let Some(r) = &mut self.rollup_node {
threads.push(r)
}

OctezThread::join(&mut threads)
}

pub fn set_octez_node(&mut self, octez_node: OctezThread) -> Result<()> {
match self.octez_node {
Some(_) => {
debug!(self.log_file.borrow_mut(), "Error: Octez node already set");
panic!();
}
None => {
self.octez_node = Some(octez_node);
Ok(())
}
}
}

pub fn set_baker(&mut self, octez_baker: OctezThread) -> Result<()> {
match self.octez_baker {
Some(_) => {
debug!(self.log_file.borrow_mut(), "Error: Octez baker already set");
panic!();
}
None => {
self.octez_baker = Some(octez_baker);
Ok(())
}
}
}

pub fn set_rollup_node(&mut self, rollup_node: OctezThread) -> Result<()> {
match self.rollup_node {
Some(_) => {
debug!(self.log_file.borrow_mut(), "Error: Rollup node already set");
panic!();
}
None => {
self.rollup_node = Some(rollup_node);
Ok(())
}
}
}
}

impl Drop for Sandbox {
fn drop(&mut self) {
if let Err(e) = self.shutdown() {
debug!(
self.log_file.borrow_mut(),
"Failed to shutdown sandbox: {:?}", e
);
}
}
}

const ACTIVATOR_ACCOUNT_ALIAS: &str = "activator";
fn sandbox_daemon_log_path(cfg: &Config) -> PathBuf {
cfg.sandbox_logs_dir().join("sandbox_daemon.log")
Expand Down Expand Up @@ -313,14 +423,18 @@ async fn run_jstz_node(cfg: &Config) -> Result<()> {
fn start_sandbox(
log_file: &mut File,
progress: &mut u32,
cfg: &Config,
) -> Result<(OctezThread, OctezThread, OctezThread)> {
cfg: &mut Config,
) -> Result<Sandbox> {
let config = Rc::new(RefCell::new(cfg.clone()));
let logs = Rc::new(RefCell::new(log_file.try_clone()?));
let mut sandbox = Sandbox::new(config, logs);
// 1. Init node
init_node(log_file, progress, cfg)?;

// 2. As a thread, start node
progress_step(log_file, progress);
let node = OctezThread::from_child(start_node(cfg)?);
sandbox.set_octez_node(node)?;
debug!(log_file, "Started octez-node");

// 3. Init client
Expand All @@ -335,6 +449,7 @@ fn start_sandbox(
client_bake(cfg, &client_logs)?;
Ok(())
});
sandbox.set_baker(baker)?;
debug!(log_file, "Started baker (using octez-client)");

// 5. Deploy bridge
Expand Down Expand Up @@ -373,7 +488,6 @@ fn start_sandbox(
progress_step(log_file, progress);
let rollup =
JstzRollup::deploy(&cfg.octez_client_sandbox()?, OPERATOR_ADDRESS, &installer)?;

debug!(log_file, "`jstz_rollup` originated at {}", rollup);

// 8. As a thread, start rollup node
Expand All @@ -389,14 +503,15 @@ fn start_sandbox(
SANDBOX_LOCAL_HOST_ADDR,
SANDBOX_OCTEZ_SMART_ROLLUP_PORT,
)?);
sandbox.set_rollup_node(rollup_node)?;
debug!(log_file, "Started octez-smart-rollup-node");

// 9. Set the rollup address in the bridge
progress_step(log_file, progress);
bridge.set_rollup(&cfg.octez_client_sandbox()?, OPERATOR_ADDRESS, &rollup)?;
debug!(log_file, "`jstz_bridge` `rollup` address set to {}", rollup);

Ok((node, baker, rollup_node))
Ok(sandbox)
}

fn format_sandbox_bootstrap_accounts() -> Table {
Expand Down Expand Up @@ -437,9 +552,10 @@ pub async fn run_sandbox(cfg: &mut Config) -> Result<()> {

// 1. Configure sandbox
debug!(log_file, "Configuring sandbox...");
// TODO: Temp directories should be deleted
let sandbox_cfg = SandboxConfig {
pid: std::process::id(),
octez_client_dir: TempDir::with_prefix("octez_client")?.into_path(),
octez_client_dir: TempDir::with_prefix("octez_client")?.into_path(), // into_path() causes temp directories to be persisted!
octez_node_dir: TempDir::with_prefix("octez_node")?.into_path(),
octez_rollup_node_dir: TempDir::with_prefix("octez_rollup_node")?.into_path(),
};
Expand All @@ -449,21 +565,16 @@ pub async fn run_sandbox(cfg: &mut Config) -> Result<()> {

// 2. Start sandbox
progress_step(&mut log_file, &mut progress);
let (node, baker, rollup_node) = start_sandbox(&mut log_file, &mut progress, cfg)?;
let mut sandbox = start_sandbox(&mut log_file, &mut progress, cfg)?;
debug!(log_file, "Sandbox started 🎉");

// 3. Save config
progress_step(&mut log_file, &mut progress);
debug!(log_file, "Saving sandbox config");
cfg.save()?;

// 4. Wait for the sandbox or jstz-node to shutdown (either by the user or by an error)
run_jstz_node(cfg).await?;
OctezThread::join(vec![baker, rollup_node, node])?;

cfg.reload()?;
cfg.sandbox = None;
cfg.save()?;
sandbox.join()?;
Ok(())
}

Expand Down Expand Up @@ -649,6 +760,7 @@ pub async fn main(detach: bool, background: bool, cfg: &mut Config) -> Result<()
term::styles::command("jstz sandbox stop").bold()
);
} else {
// TODO: update using mpsc style or shared with mutex
let handle = {
// Clone config to move into the thread
let cfg = cfg.clone();
Expand Down
4 changes: 2 additions & 2 deletions crates/jstz_rollup/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,9 @@ fn run(
&addr,
port,
)?;
let thread = OctezThread::from_child(child);
let mut thread = OctezThread::from_child(child);

OctezThread::join(vec![thread])?;
OctezThread::join(&mut vec![&mut thread])?;

Ok(())
}
Expand Down
35 changes: 18 additions & 17 deletions crates/octez/src/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use signal_hook::{

pub struct OctezThread {
shutdown_tx: Sender<()>,
thread_handle: JoinHandle<Result<()>>,
thread_handle: Option<JoinHandle<Result<()>>>,
}

impl OctezThread {
Expand All @@ -40,7 +40,7 @@ impl OctezThread {

Self {
shutdown_tx,
thread_handle,
thread_handle: Some(thread_handle),
}
}

Expand All @@ -66,26 +66,23 @@ impl OctezThread {

Self {
shutdown_tx,
thread_handle,
thread_handle: Some(thread_handle),
}
}

pub fn is_running(&self) -> bool {
!self.thread_handle.is_finished()
fn is_running(&self) -> bool {
self.thread_handle.is_some()
}

pub fn shutdown(self) -> Result<()> {
self.shutdown_tx.send(())?;
match self.thread_handle.join() {
Ok(result) => result?,
Err(_) => {
// thread panicked
}
};
pub fn shutdown(&mut self) -> Result<()> {
if let Some(handle) = self.thread_handle.take() {
self.shutdown_tx.send(())?;
handle.join().unwrap().unwrap()
}
Ok(())
}

pub fn join(threads: Vec<Self>) -> Result<()> {
pub fn join(threads: &mut Vec<&mut Self>) -> Result<()> {
let mut signals = Signals::new([SIGINT, SIGTERM])?;

// Loop until 1 of the threads fails
Expand All @@ -109,11 +106,15 @@ impl OctezThread {

// Shutdown all running threads
for thread in threads {
if thread.is_running() {
thread.shutdown()?;
}
thread.shutdown()?;
}

Ok(())
}
}

impl Drop for OctezThread {
fn drop(&mut self) {
self.shutdown().unwrap();
}
}

0 comments on commit d87a03c

Please sign in to comment.