Skip to content

Commit

Permalink
Merge pull request #178 from supabase/eszip-for-main-n-events
Browse files Browse the repository at this point in the history
feat: option to provide eszips for main/event worker functions
  • Loading branch information
laktek authored Sep 18, 2023
2 parents c19dc58 + 68e510c commit 9387aec
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 31 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 58 additions & 6 deletions crates/base/src/rt_worker/worker_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ use crate::utils::units::bytes_to_display;

use crate::rt_worker::worker::{Worker, WorkerHandler};
use crate::rt_worker::worker_pool::WorkerPool;
use anyhow::{bail, Error};
use anyhow::{anyhow, bail, Error};
use cpu_timer::{CPUAlarmVal, CPUTimer};
use event_worker::events::{BootEvent, PseudoEvent, WorkerEventWithMetadata, WorkerEvents};
use hyper::{Body, Request, Response};
use log::{debug, error};
use sb_eszip::module_loader::EszipPayloadKind;
use sb_worker_context::essentials::{
EventWorkerRuntimeOpts, UserWorkerMsgs, WorkerContextInitOpts, WorkerRuntimeOpts,
EventWorkerRuntimeOpts, MainWorkerRuntimeOpts, UserWorkerMsgs, WorkerContextInitOpts,
WorkerRuntimeOpts,
};
use std::path::PathBuf;
use std::thread;
Expand Down Expand Up @@ -225,25 +227,75 @@ pub async fn send_user_worker_request(
Ok(res)
}

pub async fn create_main_worker(
main_worker_path: PathBuf,
import_map_path: Option<String>,
no_module_cache: bool,
user_worker_msgs_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
) -> Result<mpsc::UnboundedSender<WorkerRequestMsg>, Error> {
let mut service_path = main_worker_path.clone();
let mut maybe_eszip = None;
let mut maybe_entrypoint = None;
if let Some(ext) = main_worker_path.extension() {
if ext == "eszip" {
service_path = main_worker_path.parent().unwrap().to_path_buf();
maybe_eszip = Some(EszipPayloadKind::VecKind(std::fs::read(main_worker_path)?));
maybe_entrypoint = Some("file:///src/index.ts".to_string());
}
}

let main_worker_req_tx = create_worker(WorkerContextInitOpts {
service_path,
import_map_path,
no_module_cache,
events_rx: None,
maybe_eszip,
maybe_entrypoint,
maybe_module_code: None,
conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
worker_pool_tx: user_worker_msgs_tx,
}),
env_vars: std::env::vars().collect(),
})
.await
.map_err(|err| anyhow!("main worker boot error: {}", err))?;

Ok(main_worker_req_tx)
}

pub async fn create_events_worker(
events_worker_path: PathBuf,
import_map_path: Option<String>,
no_module_cache: bool,
) -> Result<mpsc::UnboundedSender<WorkerEventWithMetadata>, Error> {
let (events_tx, events_rx) = mpsc::unbounded_channel::<WorkerEventWithMetadata>();

let mut service_path = events_worker_path.clone();
let mut maybe_eszip = None;
let mut maybe_entrypoint = None;
if let Some(ext) = events_worker_path.extension() {
if ext == "eszip" {
service_path = events_worker_path.parent().unwrap().to_path_buf();
maybe_eszip = Some(EszipPayloadKind::VecKind(std::fs::read(
events_worker_path,
)?));
maybe_entrypoint = Some("file:///src/index.ts".to_string());
}
}

let _ = create_worker(WorkerContextInitOpts {
service_path: events_worker_path,
service_path,
no_module_cache,
import_map_path,
env_vars: std::env::vars().collect(),
events_rx: Some(events_rx),
maybe_eszip: None,
maybe_entrypoint: None,
maybe_eszip,
maybe_entrypoint,
maybe_module_code: None,
conf: WorkerRuntimeOpts::EventsWorker(EventWorkerRuntimeOpts {}),
})
.await?;
.await
.map_err(|err| anyhow!("events worker boot error: {}", err))?;

Ok(events_tx)
}
Expand Down
27 changes: 8 additions & 19 deletions crates/base/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use crate::rt_worker::worker_ctx::{
create_events_worker, create_user_worker_pool, create_worker, WorkerRequestMsg,
create_events_worker, create_main_worker, create_user_worker_pool, WorkerRequestMsg,
};
use anyhow::Error;
use event_worker::events::WorkerEventWithMetadata;
use hyper::{server::conn::Http, service::Service, Body, Request, Response};
use log::{debug, error, info};
use sb_worker_context::essentials::{
MainWorkerRuntimeOpts, WorkerContextInitOpts, WorkerRuntimeOpts,
};
use std::future::Future;
use std::net::IpAddr;
use std::net::Ipv4Addr;
Expand Down Expand Up @@ -101,8 +98,7 @@ impl Server {

let events_worker =
create_events_worker(events_path_buf, import_map_path.clone(), no_module_cache)
.await
.expect("Event worker could not be created");
.await?;

worker_events_sender = Some(events_worker);
}
Expand All @@ -111,20 +107,13 @@ impl Server {
let user_worker_msgs_tx = create_user_worker_pool(worker_events_sender).await?;

// create main worker
let main_path = Path::new(&main_service_path);
let main_worker_req_tx = create_worker(WorkerContextInitOpts {
service_path: main_path.to_path_buf(),
import_map_path,
let main_worker_path = Path::new(&main_service_path).to_path_buf();
let main_worker_req_tx = create_main_worker(
main_worker_path,
import_map_path.clone(),
no_module_cache,
events_rx: None,
maybe_eszip: None,
maybe_entrypoint: None,
maybe_module_code: None,
conf: WorkerRuntimeOpts::MainWorker(MainWorkerRuntimeOpts {
worker_pool_tx: user_worker_msgs_tx,
}),
env_vars: std::env::vars().collect(),
})
user_worker_msgs_tx,
)
.await?;

// register alarm signal handler
Expand Down
16 changes: 14 additions & 2 deletions crates/sb_eszip/module_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,21 @@ pub struct EszipModuleLoader {
maybe_import_map: Option<ImportMap>,
}

#[derive(Debug)]
pub enum EszipPayloadKind {
JsBufferKind(JsBuffer),
VecKind(Vec<u8>),
}

impl EszipModuleLoader {
pub async fn new(bytes: JsBuffer, maybe_import_map_url: Option<String>) -> Result<Self, Error> {
let bytes = Vec::from(&*bytes);
pub async fn new(
eszip_payload: EszipPayloadKind,
maybe_import_map_url: Option<String>,
) -> Result<Self, Error> {
let bytes = match eszip_payload {
EszipPayloadKind::JsBufferKind(js_buffer) => Vec::from(&*js_buffer),
EszipPayloadKind::VecKind(vec) => vec,
};

let bufreader = BufReader::new(AllowStdIo::new(bytes.as_slice()));
let (eszip, loader) = eszip::EszipV2::parse(bufreader).await?;
Expand Down
1 change: 1 addition & 0 deletions crates/sb_worker_context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ event_worker ={ version = "0.1.0", path = "../event_worker" }
enum-as-inner = "0.6.0"
hyper.workspace = true
serde.workspace = true
sb_eszip = { version = "0.1.0", path = "../sb_eszip" }
tokio.workspace = true
uuid.workspace = true
6 changes: 4 additions & 2 deletions crates/sb_worker_context/essentials.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Error;
use deno_core::{FastString, JsBuffer};
use deno_core::FastString;
use enum_as_inner::EnumAsInner;
use event_worker::events::WorkerEventWithMetadata;
use hyper::{Body, Request, Response};
Expand All @@ -8,6 +8,8 @@ use std::path::PathBuf;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;

use sb_eszip::module_loader::EszipPayloadKind;

#[derive(Debug, Clone)]
pub struct UserWorkerRuntimeOpts {
pub service_path: Option<String>,
Expand Down Expand Up @@ -78,7 +80,7 @@ pub struct WorkerContextInitOpts {
pub env_vars: HashMap<String, String>,
pub events_rx: Option<mpsc::UnboundedReceiver<WorkerEventWithMetadata>>,
pub conf: WorkerRuntimeOpts,
pub maybe_eszip: Option<JsBuffer>,
pub maybe_eszip: Option<EszipPayloadKind>,
pub maybe_module_code: Option<FastString>,
pub maybe_entrypoint: Option<String>,
}
Expand Down
1 change: 1 addition & 0 deletions crates/sb_workers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ deno_core.workspace = true
tokio.workspace = true
deno_http.workspace = true
hyper.workspace = true
sb_eszip = { version = "0.1.0", path = "../sb_eszip" }
serde.workspace = true
bytes.workspace = true
log.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion crates/sb_workers/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use deno_core::{
use hyper::body::HttpBody;
use hyper::header::{HeaderName, HeaderValue};
use hyper::{Body, Request, Response};
use sb_eszip::module_loader::EszipPayloadKind;
use sb_worker_context::essentials::{
CreateUserWorkerResult, UserWorkerMsgs, UserWorkerRuntimeOpts, WorkerContextInitOpts,
WorkerRuntimeOpts,
Expand Down Expand Up @@ -100,7 +101,7 @@ pub async fn op_user_worker_create(
import_map_path,
env_vars: env_vars_map,
events_rx: None,
maybe_eszip,
maybe_eszip: maybe_eszip.map(EszipPayloadKind::JsBufferKind),
maybe_entrypoint,
maybe_module_code: maybe_module_code.map(|v| v.into()),
conf: WorkerRuntimeOpts::UserWorker(UserWorkerRuntimeOpts {
Expand Down
2 changes: 1 addition & 1 deletion scripts/run.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/env bash

GIT_V_TAG=0.1.1 cargo build && RUST_BACKTRACE=full ./target/debug/edge-runtime "$@" start --main-service ./examples/main --event-worker ./examples/event-manager
GIT_V_TAG=0.1.1 cargo build && RUST_BACKTRACE=full ./target/debug/edge-runtime "$@" start --main-service ./examples/main --event-worker ./examples/event-manager

0 comments on commit 9387aec

Please sign in to comment.