Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor edge rt 2 #153

Merged
merged 6 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 112 additions & 97 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ members = [
"./crates/flaky_test",
"./crates/sb_os",
"./crates/node",
"./crates/cpu_timer"
"./crates/cpu_timer",
"./crates/event_worker"
]
resolver = "2"

Expand Down
2 changes: 2 additions & 0 deletions crates/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ reqwest = { version = "0.11.13" }
serde = { version = "1.0.149", features = ["derive"] }
tokio = { workspace = true }
url = { version = "2.3.1" }
event_worker ={ version = "0.1.0", path = "../event_worker" }
sb_worker_context = { version = "0.1.0", path = "../sb_worker_context" }
sb_workers = { version = "0.1.0", path = "../sb_workers" }
sb_env = { version = "0.1.0", path = "../sb_env" }
Expand Down Expand Up @@ -75,6 +76,7 @@ reqwest = { version = "0.11.13" }
serde = { version = "1.0.149", features = ["derive"] }
tokio.workspace = true
url = { version = "2.3.1" }
event_worker ={ version = "0.1.0", path = "../event_worker" }
sb_worker_context = { version = "0.1.0", path = "../sb_worker_context" }
sb_workers = { version = "0.1.0", path = "../sb_workers" }
sb_env = { version = "0.1.0", path = "../sb_env" }
Expand Down
4 changes: 3 additions & 1 deletion crates/base/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ mod supabase_startup_snapshot {
use deno_core::Extension;
use deno_core::ExtensionFileSource;
use deno_core::ModuleCode;
use event_worker::js_interceptors::sb_events_js_interceptors;
use event_worker::sb_user_event_worker;
use sb_core::http_start::sb_core_http;
use sb_core::net::sb_core_net;
use sb_core::permissions::sb_core_permissions;
use sb_core::runtime::sb_core_runtime;
use sb_core::sb_core_main_js;
use sb_env::sb_env;
use sb_workers::events::sb_user_event_worker;
use sb_workers::sb_user_workers;
use std::path::Path;

Expand Down Expand Up @@ -204,6 +205,7 @@ mod supabase_startup_snapshot {
sb_os::sb_os::init_ops_and_esm(),
sb_user_workers::init_ops_and_esm(),
sb_user_event_worker::init_ops_and_esm(),
sb_events_js_interceptors::init_ops_and_esm(),
sb_core_main_js::init_ops_and_esm(),
sb_core_net::init_ops_and_esm(),
sb_core_http::init_ops_and_esm(),
Expand Down
6 changes: 4 additions & 2 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use tokio::sync::mpsc;
use urlencoding::decode;

use crate::{errors_rt, snapshot};
use event_worker::events::{EventMetadata, WorkerEventWithMetadata};
use event_worker::js_interceptors::sb_events_js_interceptors;
use event_worker::sb_user_event_worker;
use module_loader::DefaultModuleLoader;
use sb_core::http_start::sb_core_http;
use sb_core::net::sb_core_net;
Expand All @@ -26,8 +29,6 @@ use sb_core::runtime::sb_core_runtime;
use sb_core::sb_core_main_js;
use sb_env::sb_env as sb_env_op;
use sb_worker_context::essentials::{UserWorkerMsgs, WorkerContextInitOpts, WorkerRuntimeOpts};
use sb_worker_context::events::{EventMetadata, WorkerEventWithMetadata};
use sb_workers::events::sb_user_event_worker;
use sb_workers::sb_user_workers;

fn load_import_map(maybe_path: Option<String>) -> Result<Option<ImportMap>, Error> {
Expand Down Expand Up @@ -172,6 +173,7 @@ impl DenoRuntime {
sb_os::sb_os::init_ops(),
sb_user_workers::init_ops(),
sb_user_event_worker::init_ops(),
sb_events_js_interceptors::init_ops(),
sb_core_main_js::init_ops(),
sb_core_net::init_ops(),
sb_core_http::init_ops(),
Expand Down
2 changes: 1 addition & 1 deletion crates/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod deno_runtime;
pub mod errors_rt;
pub mod js_worker;
pub mod macros;
pub mod rt_worker;
pub mod server;
pub mod snapshot;
pub mod utils;
pub mod worker_ctx;
49 changes: 49 additions & 0 deletions crates/base/src/rt_worker/implementation/default_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::deno_runtime::DenoRuntime;
use crate::rt_worker::worker::{HandleCreationType, Worker, WorkerHandler};
use anyhow::Error;
use event_worker::events::{BootFailure, PseudoEvent, UncaughtException, WorkerEvents};
use std::any::Any;
use tokio::net::UnixStream;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::oneshot::Receiver;

impl WorkerHandler for Worker {
fn handle_error(&self, error: Error) -> Result<WorkerEvents, Error> {
println!("{}", error);
Ok(WorkerEvents::BootFailure(BootFailure {
msg: error.to_string(),
}))
}

fn handle_creation(
&self,
created_rt: DenoRuntime,
unix_stream_rx: UnboundedReceiver<UnixStream>,
termination_event_rx: Receiver<WorkerEvents>,
) -> HandleCreationType {
let run_worker_rt = async {
match created_rt.run(unix_stream_rx).await {
// if the error is execution terminated, check termination event reason
Err(err) => {
let err_string = err.to_string();
if err_string.ends_with("execution terminated")
|| err_string.ends_with("wall clock duration reached")
{
Ok(termination_event_rx.await.unwrap())
} else {
Ok(WorkerEvents::UncaughtException(UncaughtException {
exception: err_string,
}))
}
}
Ok(()) => Ok(WorkerEvents::EventLoopCompleted(PseudoEvent {})),
}
};

Box::pin(run_worker_rt)
}

fn as_any(&self) -> &dyn Any {
self
}
}
1 change: 1 addition & 0 deletions crates/base/src/rt_worker/implementation/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod default_handler;
5 changes: 5 additions & 0 deletions crates/base/src/rt_worker/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod implementation;
pub mod utils;
pub mod worker;
pub mod worker_ctx;
pub mod worker_pool;
44 changes: 44 additions & 0 deletions crates/base/src/rt_worker/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use event_worker::events::{EventMetadata, WorkerEventWithMetadata};
use sb_worker_context::essentials::{UserWorkerMsgs, WorkerRuntimeOpts};
use tokio::sync::mpsc::UnboundedSender;

type WorkerCoreConfig = (
Option<u64>,
Option<UnboundedSender<UserWorkerMsgs>>,
Option<UnboundedSender<WorkerEventWithMetadata>>,
String,
);

pub fn parse_worker_conf(conf: &WorkerRuntimeOpts) -> WorkerCoreConfig {
let worker_core: WorkerCoreConfig = match conf {
WorkerRuntimeOpts::UserWorker(worker_opts) => (
worker_opts.key,
worker_opts.pool_msg_tx.clone(),
worker_opts.events_msg_tx.clone(),
worker_opts
.key
.map(|k| format!("sb-iso-{:?}", k))
.unwrap_or("isolate-worker-unknown".to_string()),
),
WorkerRuntimeOpts::MainWorker(_) => (None, None, None, "main-worker".to_string()),
WorkerRuntimeOpts::EventsWorker(_) => (None, None, None, "events-worker".to_string()),
};

worker_core
}

pub fn get_event_metadata(conf: &WorkerRuntimeOpts) -> EventMetadata {
let mut event_metadata = EventMetadata {
service_path: None,
execution_id: None,
};
if conf.is_user_worker() {
let conf = conf.as_user_worker().unwrap();
event_metadata = EventMetadata {
service_path: conf.service_path.clone(),
execution_id: conf.execution_id,
};
}

event_metadata
}
171 changes: 171 additions & 0 deletions crates/base/src/rt_worker/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use crate::deno_runtime::DenoRuntime;
use crate::rt_worker::utils::{get_event_metadata, parse_worker_conf};
use crate::rt_worker::worker_ctx::create_supervisor;
use crate::utils::send_event_if_event_manager_available;
use anyhow::{anyhow, bail, Error};
use cpu_timer::get_thread_time;
use event_worker::events::{
EventMetadata, LogEvent, LogLevel, WorkerEventWithMetadata, WorkerEvents,
};
use log::{debug, error};
use sb_worker_context::essentials::{UserWorkerMsgs, WorkerContextInitOpts};
use std::any::Any;
use std::future::Future;
use std::pin::Pin;
use std::thread;
use tokio::net::UnixStream;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::time::Instant;

#[derive(Clone)]
pub struct Worker {
pub worker_boot_start_time: Instant,
pub events_msg_tx: Option<UnboundedSender<WorkerEventWithMetadata>>,
pub pool_msg_tx: Option<UnboundedSender<UserWorkerMsgs>>,
pub event_metadata: EventMetadata,
pub worker_key: Option<u64>,
pub thread_name: String,
}

pub type HandleCreationType = Pin<Box<dyn Future<Output = Result<WorkerEvents, Error>>>>;

pub trait WorkerHandler: Send {
fn handle_error(&self, error: Error) -> Result<WorkerEvents, Error>;
fn handle_creation(
&self,
created_rt: DenoRuntime,
unix_stream_rx: UnboundedReceiver<UnixStream>,
termination_event_rx: Receiver<WorkerEvents>,
) -> HandleCreationType;
fn as_any(&self) -> &dyn Any;
}

impl Worker {
pub fn new(init_opts: &WorkerContextInitOpts) -> Result<Self, Error> {
let service_path = init_opts.service_path.clone();

let (worker_key, pool_msg_tx, events_msg_tx, thread_name) =
parse_worker_conf(&init_opts.conf);
let event_metadata = get_event_metadata(&init_opts.conf);

if !service_path.exists() {
bail!("service does not exist {:?}", &service_path)
}

let worker_boot_start_time = Instant::now();

Ok(Self {
worker_boot_start_time,
events_msg_tx,
pool_msg_tx,
event_metadata,
worker_key,
thread_name,
})
}

pub fn start(
&self,
opts: WorkerContextInitOpts,
unix_channel_rx: UnboundedReceiver<UnixStream>,
booter_signal: Sender<Result<(), Error>>,
) {
let thread_name = self.thread_name.clone();
let events_msg_tx = self.events_msg_tx.clone();
let event_metadata = self.event_metadata.clone();
let worker_key = self.worker_key;
let pool_msg_tx = self.pool_msg_tx.clone();
let method_cloner = self.clone();

let _handle: thread::JoinHandle<Result<(), Error>> = thread::Builder::new()
.name(thread_name)
.spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let local = tokio::task::LocalSet::new();

let mut start_time = 0;

let result: Result<WorkerEvents, Error> = local.block_on(&runtime, async {
match DenoRuntime::new(opts).await {
Ok(mut new_runtime) => {
let _ = booter_signal.send(Ok(()));

// CPU TIMER
let (termination_event_tx, termination_event_rx) =
oneshot::channel::<WorkerEvents>();
let _cputimer;

// TODO: Allow customization of supervisor
if new_runtime.conf.is_user_worker() {
start_time = get_thread_time()?;

// cputimer is returned from supervisor and assigned here to keep it in scope.
_cputimer = create_supervisor(
worker_key.unwrap_or(0),
&mut new_runtime,
termination_event_tx,
)?;
}

// TODO: Should we handle it gracefully?
start_time = get_thread_time()?;
let data = method_cloner.handle_creation(
new_runtime,
unix_channel_rx,
termination_event_rx,
);
data.await
}
Err(err) => {
let _ = booter_signal.send(Err(anyhow!("worker boot error")));
method_cloner.handle_error(err)
}
}
});

match result {
Ok(event) => {
send_event_if_event_manager_available(
events_msg_tx.clone(),
event,
event_metadata.clone(),
);
}
Err(err) => error!("unexpected worker error {}", err),
};

let end_time = get_thread_time()?;
let cpu_time_msg =
format!("CPU time used: {:?}ms", (end_time - start_time) / 1_000_000);

debug!("{}", cpu_time_msg);
send_event_if_event_manager_available(
events_msg_tx,
WorkerEvents::Log(LogEvent {
msg: cpu_time_msg,
level: LogLevel::Info,
}),
event_metadata,
);

worker_key.and_then(|worker_key_unwrapped| {
pool_msg_tx.map(|tx| {
if let Err(err) = tx.send(UserWorkerMsgs::Shutdown(worker_key_unwrapped)) {
error!(
"failed to send the shutdown signal to user worker pool: {:?}",
err
);
}
})
});

Ok(())
})
.unwrap();
}
}
Loading