From 68e510c66e43e59222869dfdc03b79ef7590f5c7 Mon Sep 17 00:00:00 2001 From: Lakshan Perera Date: Mon, 18 Sep 2023 06:30:51 +1000 Subject: [PATCH] feat: option to provide eszips for main/event worker functions --- Cargo.lock | 2 + crates/base/src/rt_worker/worker_ctx.rs | 64 ++++++++++++++++++++++--- crates/base/src/server.rs | 27 ++++------- crates/sb_eszip/module_loader.rs | 16 ++++++- crates/sb_worker_context/Cargo.toml | 1 + crates/sb_worker_context/essentials.rs | 6 ++- crates/sb_workers/Cargo.toml | 1 + crates/sb_workers/lib.rs | 3 +- scripts/run.sh | 2 +- 9 files changed, 91 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 57955ee89..fb6d457a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3833,6 +3833,7 @@ dependencies = [ "enum-as-inner 0.6.0", "event_worker", "hyper 0.14.27", + "sb_eszip", "serde", "tokio", "uuid", @@ -3849,6 +3850,7 @@ dependencies = [ "event_worker", "hyper 0.14.27", "log", + "sb_eszip", "sb_worker_context", "serde", "tokio", diff --git a/crates/base/src/rt_worker/worker_ctx.rs b/crates/base/src/rt_worker/worker_ctx.rs index 408c38483..c75094f23 100644 --- a/crates/base/src/rt_worker/worker_ctx.rs +++ b/crates/base/src/rt_worker/worker_ctx.rs @@ -4,13 +4,15 @@ use crate::utils::units::bytes_to_display; use crate::rt_worker::worker::{Worker, WorkerHandler}; use crate::rt_worker::worker_pool::WorkerPool; -use anyhow::{bail, Error}; +use anyhow::{anyhow, bail, Error}; use cpu_timer::{CPUAlarmVal, CPUTimer}; use event_worker::events::{BootEvent, PseudoEvent, WorkerEventWithMetadata, WorkerEvents}; use hyper::{Body, Request, Response}; use log::{debug, error}; +use sb_eszip::module_loader::EszipPayloadKind; use sb_worker_context::essentials::{ - EventWorkerRuntimeOpts, UserWorkerMsgs, WorkerContextInitOpts, WorkerRuntimeOpts, + EventWorkerRuntimeOpts, MainWorkerRuntimeOpts, UserWorkerMsgs, WorkerContextInitOpts, + WorkerRuntimeOpts, }; use std::path::PathBuf; use std::thread; @@ -225,6 +227,42 @@ pub async fn send_user_worker_request( Ok(res) } +pub async fn create_main_worker( + main_worker_path: PathBuf, + import_map_path: Option, + no_module_cache: bool, + user_worker_msgs_tx: mpsc::UnboundedSender, +) -> Result, Error> { + let mut service_path = main_worker_path.clone(); + let mut maybe_eszip = None; + let mut maybe_entrypoint = None; + if let Some(ext) = main_worker_path.extension() { + if ext == "eszip" { + service_path = main_worker_path.parent().unwrap().to_path_buf(); + maybe_eszip = Some(EszipPayloadKind::VecKind(std::fs::read(main_worker_path)?)); + maybe_entrypoint = Some("file:///src/index.ts".to_string()); + } + } + + let main_worker_req_tx = create_worker(WorkerContextInitOpts { + service_path, + import_map_path, + no_module_cache, + events_rx: None, + maybe_eszip, + maybe_entrypoint, + maybe_module_code: None, + conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { + worker_pool_tx: user_worker_msgs_tx, + }), + env_vars: std::env::vars().collect(), + }) + .await + .map_err(|err| anyhow!("main worker boot error: {}", err))?; + + Ok(main_worker_req_tx) +} + pub async fn create_events_worker( events_worker_path: PathBuf, import_map_path: Option, @@ -232,18 +270,32 @@ pub async fn create_events_worker( ) -> Result, Error> { let (events_tx, events_rx) = mpsc::unbounded_channel::(); + let mut service_path = events_worker_path.clone(); + let mut maybe_eszip = None; + let mut maybe_entrypoint = None; + if let Some(ext) = events_worker_path.extension() { + if ext == "eszip" { + service_path = events_worker_path.parent().unwrap().to_path_buf(); + maybe_eszip = Some(EszipPayloadKind::VecKind(std::fs::read( + events_worker_path, + )?)); + maybe_entrypoint = Some("file:///src/index.ts".to_string()); + } + } + let _ = create_worker(WorkerContextInitOpts { - service_path: events_worker_path, + service_path, no_module_cache, import_map_path, env_vars: std::env::vars().collect(), events_rx: Some(events_rx), - maybe_eszip: None, - maybe_entrypoint: None, + maybe_eszip, + maybe_entrypoint, maybe_module_code: None, conf: WorkerRuntimeOpts::EventsWorker(EventWorkerRuntimeOpts {}), }) - .await?; + .await + .map_err(|err| anyhow!("events worker boot error: {}", err))?; Ok(events_tx) } diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index c5130eb42..c7c9b2149 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -1,13 +1,10 @@ use crate::rt_worker::worker_ctx::{ - create_events_worker, create_user_worker_pool, create_worker, WorkerRequestMsg, + create_events_worker, create_main_worker, create_user_worker_pool, WorkerRequestMsg, }; use anyhow::Error; use event_worker::events::WorkerEventWithMetadata; use hyper::{server::conn::Http, service::Service, Body, Request, Response}; use log::{debug, error, info}; -use sb_worker_context::essentials::{ - MainWorkerRuntimeOpts, WorkerContextInitOpts, WorkerRuntimeOpts, -}; use std::future::Future; use std::net::IpAddr; use std::net::Ipv4Addr; @@ -101,8 +98,7 @@ impl Server { let events_worker = create_events_worker(events_path_buf, import_map_path.clone(), no_module_cache) - .await - .expect("Event worker could not be created"); + .await?; worker_events_sender = Some(events_worker); } @@ -111,20 +107,13 @@ impl Server { let user_worker_msgs_tx = create_user_worker_pool(worker_events_sender).await?; // create main worker - let main_path = Path::new(&main_service_path); - let main_worker_req_tx = create_worker(WorkerContextInitOpts { - service_path: main_path.to_path_buf(), - import_map_path, + let main_worker_path = Path::new(&main_service_path).to_path_buf(); + let main_worker_req_tx = create_main_worker( + main_worker_path, + import_map_path.clone(), no_module_cache, - events_rx: None, - maybe_eszip: None, - maybe_entrypoint: None, - maybe_module_code: None, - conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts { - worker_pool_tx: user_worker_msgs_tx, - }), - env_vars: std::env::vars().collect(), - }) + user_worker_msgs_tx, + ) .await?; // register alarm signal handler diff --git a/crates/sb_eszip/module_loader.rs b/crates/sb_eszip/module_loader.rs index d5bdbef48..8bc97f80b 100644 --- a/crates/sb_eszip/module_loader.rs +++ b/crates/sb_eszip/module_loader.rs @@ -18,9 +18,21 @@ pub struct EszipModuleLoader { maybe_import_map: Option, } +#[derive(Debug)] +pub enum EszipPayloadKind { + JsBufferKind(JsBuffer), + VecKind(Vec), +} + impl EszipModuleLoader { - pub async fn new(bytes: JsBuffer, maybe_import_map_url: Option) -> Result { - let bytes = Vec::from(&*bytes); + pub async fn new( + eszip_payload: EszipPayloadKind, + maybe_import_map_url: Option, + ) -> Result { + let bytes = match eszip_payload { + EszipPayloadKind::JsBufferKind(js_buffer) => Vec::from(&*js_buffer), + EszipPayloadKind::VecKind(vec) => vec, + }; let bufreader = BufReader::new(AllowStdIo::new(bytes.as_slice())); let (eszip, loader) = eszip::EszipV2::parse(bufreader).await?; diff --git a/crates/sb_worker_context/Cargo.toml b/crates/sb_worker_context/Cargo.toml index 6ff8c45c6..a83577748 100644 --- a/crates/sb_worker_context/Cargo.toml +++ b/crates/sb_worker_context/Cargo.toml @@ -17,5 +17,6 @@ event_worker ={ version = "0.1.0", path = "../event_worker" } enum-as-inner = "0.6.0" hyper.workspace = true serde.workspace = true +sb_eszip = { version = "0.1.0", path = "../sb_eszip" } tokio.workspace = true uuid.workspace = true diff --git a/crates/sb_worker_context/essentials.rs b/crates/sb_worker_context/essentials.rs index 41ee0d09e..3fd912eea 100644 --- a/crates/sb_worker_context/essentials.rs +++ b/crates/sb_worker_context/essentials.rs @@ -1,5 +1,5 @@ use anyhow::Error; -use deno_core::{FastString, JsBuffer}; +use deno_core::FastString; use enum_as_inner::EnumAsInner; use event_worker::events::WorkerEventWithMetadata; use hyper::{Body, Request, Response}; @@ -8,6 +8,8 @@ use std::path::PathBuf; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; +use sb_eszip::module_loader::EszipPayloadKind; + #[derive(Debug, Clone)] pub struct UserWorkerRuntimeOpts { pub service_path: Option, @@ -78,7 +80,7 @@ pub struct WorkerContextInitOpts { pub env_vars: HashMap, pub events_rx: Option>, pub conf: WorkerRuntimeOpts, - pub maybe_eszip: Option, + pub maybe_eszip: Option, pub maybe_module_code: Option, pub maybe_entrypoint: Option, } diff --git a/crates/sb_workers/Cargo.toml b/crates/sb_workers/Cargo.toml index 16f4f4f10..9bf964b02 100644 --- a/crates/sb_workers/Cargo.toml +++ b/crates/sb_workers/Cargo.toml @@ -17,6 +17,7 @@ deno_core.workspace = true tokio.workspace = true deno_http.workspace = true hyper.workspace = true +sb_eszip = { version = "0.1.0", path = "../sb_eszip" } serde.workspace = true bytes.workspace = true log.workspace = true diff --git a/crates/sb_workers/lib.rs b/crates/sb_workers/lib.rs index 62aab81e0..8af2532c5 100644 --- a/crates/sb_workers/lib.rs +++ b/crates/sb_workers/lib.rs @@ -10,6 +10,7 @@ use deno_core::{ use hyper::body::HttpBody; use hyper::header::{HeaderName, HeaderValue}; use hyper::{Body, Request, Response}; +use sb_eszip::module_loader::EszipPayloadKind; use sb_worker_context::essentials::{ CreateUserWorkerResult, UserWorkerMsgs, UserWorkerRuntimeOpts, WorkerContextInitOpts, WorkerRuntimeOpts, @@ -100,7 +101,7 @@ pub async fn op_user_worker_create( import_map_path, env_vars: env_vars_map, events_rx: None, - maybe_eszip, + maybe_eszip: maybe_eszip.map(EszipPayloadKind::JsBufferKind), maybe_entrypoint, maybe_module_code: maybe_module_code.map(|v| v.into()), conf: WorkerRuntimeOpts::UserWorker(UserWorkerRuntimeOpts { diff --git a/scripts/run.sh b/scripts/run.sh index 8bce52ce6..f5579ef5b 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -1,3 +1,3 @@ #!/usr/bin/env bash - GIT_V_TAG=0.1.1 cargo build && RUST_BACKTRACE=full ./target/debug/edge-runtime "$@" start --main-service ./examples/main --event-worker ./examples/event-manager +GIT_V_TAG=0.1.1 cargo build && RUST_BACKTRACE=full ./target/debug/edge-runtime "$@" start --main-service ./examples/main --event-worker ./examples/event-manager