Skip to content

Commit

Permalink
fix: Merge pull request #155 from supabase/refactor-edge-rt-pt-2
Browse files Browse the repository at this point in the history
Refactor edge rt pt 2
  • Loading branch information
laktek authored Aug 3, 2023
2 parents 86add0a + 6adf76d commit 448dcd1
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 74 deletions.
4 changes: 2 additions & 2 deletions crates/base/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub async fn start_server(
ip: &str,
port: u16,
main_service_path: String,
event_manager_path: Option<String>,
event_worker_path: Option<String>,
import_map_path: Option<String>,
no_module_cache: bool,
callback_tx: Option<Sender<ServerCodes>>,
Expand All @@ -15,7 +15,7 @@ pub async fn start_server(
ip,
port,
main_service_path,
event_manager_path,
event_worker_path,
import_map_path,
no_module_cache,
callback_tx,
Expand Down
18 changes: 3 additions & 15 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use anyhow::{anyhow, Error};
use deno_core::error::AnyError;
use deno_core::url::Url;
use deno_core::{located_script_name, serde_v8, JsRuntime, ModuleId, RuntimeOptions};
use import_map::{parse_from_json, ImportMap, ImportMapDiagnostic};
use log::{error, warn};
use import_map::{parse_from_json, ImportMap};
use log::error;
use serde::de::DeserializeOwned;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
Expand All @@ -21,6 +21,7 @@ use crate::{errors_rt, snapshot};
use event_worker::events::{EventMetadata, WorkerEventWithMetadata};
use event_worker::js_interceptors::sb_events_js_interceptors;
use event_worker::sb_user_event_worker;
use module_fetcher::util::print_import_map_diagnostics;
use module_loader::DefaultModuleLoader;
use sb_core::http_start::sb_core_http;
use sb_core::net::sb_core_net;
Expand Down Expand Up @@ -61,19 +62,6 @@ fn load_import_map(maybe_path: Option<String>) -> Result<Option<ImportMap>, Erro
}
}

fn print_import_map_diagnostics(diagnostics: &[ImportMapDiagnostic]) {
if !diagnostics.is_empty() {
warn!(
"Import map diagnostics:\n{}",
diagnostics
.iter()
.map(|d| format!(" - {d}"))
.collect::<Vec<_>>()
.join("\n")
);
}
}

pub struct DenoRuntimeError(Error);

impl PartialEq for DenoRuntimeError {
Expand Down
6 changes: 3 additions & 3 deletions crates/base/src/rt_worker/worker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::deno_runtime::DenoRuntime;
use crate::rt_worker::utils::{get_event_metadata, parse_worker_conf};
use crate::rt_worker::worker_ctx::create_supervisor;
use crate::utils::send_event_if_event_manager_available;
use crate::utils::send_event_if_event_worker_available;
use anyhow::{anyhow, bail, Error};
use cpu_timer::get_thread_time;
use event_worker::events::{
Expand Down Expand Up @@ -130,7 +130,7 @@ impl Worker {

match result {
Ok(event) => {
send_event_if_event_manager_available(
send_event_if_event_worker_available(
events_msg_tx.clone(),
event,
event_metadata.clone(),
Expand All @@ -144,7 +144,7 @@ impl Worker {
format!("CPU time used: {:?}ms", (end_time - start_time) / 1_000_000);

debug!("{}", cpu_time_msg);
send_event_if_event_manager_available(
send_event_if_event_worker_available(
events_msg_tx,
WorkerEvents::Log(LogEvent {
msg: cpu_time_msg,
Expand Down
4 changes: 2 additions & 2 deletions crates/base/src/rt_worker/worker_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::deno_runtime::DenoRuntime;
use crate::utils::send_event_if_event_manager_available;
use crate::utils::send_event_if_event_worker_available;
use crate::utils::units::bytes_to_display;

use crate::rt_worker::worker::{Worker, WorkerHandler};
Expand Down Expand Up @@ -191,7 +191,7 @@ pub async fn create_worker(
.worker_boot_start_time
.elapsed()
.as_millis();
send_event_if_event_manager_available(
send_event_if_event_worker_available(
worker_struct_ref.events_msg_tx.clone(),
WorkerEvents::Boot(BootEvent {
boot_time: elapsed as usize,
Expand Down
8 changes: 4 additions & 4 deletions crates/base/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use tokio::sync::mpsc;

pub mod units;

pub fn send_event_if_event_manager_available(
maybe_event_manager: Option<mpsc::UnboundedSender<WorkerEventWithMetadata>>,
pub fn send_event_if_event_worker_available(
maybe_event_worker: Option<mpsc::UnboundedSender<WorkerEventWithMetadata>>,
event: WorkerEvents,
metadata: EventMetadata,
) {
if let Some(event_manager) = maybe_event_manager {
let _ = event_manager.send(WorkerEventWithMetadata { event, metadata });
if let Some(event_worker) = maybe_event_worker {
let _ = event_worker.send(WorkerEventWithMetadata { event, metadata });
}
}
4 changes: 2 additions & 2 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fn cli() -> Command {
.arg(arg!(--"main-service" <DIR> "Path to main service directory").default_value("examples/main"))
.arg(arg!(--"disable-module-cache" "Disable using module cache").default_value("false").value_parser(FalseyValueParser::new()))
.arg(arg!(--"import-map" <Path> "Path to import map file"))
.arg(arg!(--"event-manager" <Path> "Path to event manager directory"))
.arg(arg!(--"event-worker" <Path> "Path to event worker directory"))
)
}

Expand Down Expand Up @@ -85,7 +85,7 @@ fn main() -> Result<(), anyhow::Error> {
.cloned()
.unwrap();
let event_service_manager_path =
sub_matches.get_one::<String>("event-manager").cloned();
sub_matches.get_one::<String>("event-worker").cloned();

start_server(
ip.as_str(),
Expand Down
16 changes: 1 addition & 15 deletions crates/module_fetcher/src/args/import_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ use deno_core::error::AnyError;
use deno_core::serde_json;
use deno_core::url::Url;
use import_map::ImportMap;
use import_map::ImportMapDiagnostic;
use log::warn;

use crate::file_fetcher::get_source_from_data_url;
use crate::file_fetcher::FileFetcher;
use crate::permissions::Permissions;
use crate::util::print_import_map_diagnostics;

pub async fn resolve_import_map_from_specifier(
specifier: &Url,
Expand Down Expand Up @@ -49,16 +48,3 @@ fn import_map_from_value(
print_import_map_diagnostics(&result.diagnostics);
Ok(result.import_map)
}

fn print_import_map_diagnostics(diagnostics: &[ImportMapDiagnostic]) {
if !diagnostics.is_empty() {
warn!(
"Import map diagnostics:\n{}",
diagnostics
.iter()
.map(|d| format!(" - {d}"))
.collect::<Vec<_>>()
.join("\n")
);
}
}
16 changes: 16 additions & 0 deletions crates/module_fetcher/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use import_map::ImportMapDiagnostic;
use log::warn;

pub mod cache_setting;
pub mod checksum;
pub mod config_file;
Expand All @@ -6,3 +9,16 @@ pub mod fs;
pub mod path;
pub mod text_encoding;
pub mod version;

pub fn print_import_map_diagnostics(diagnostics: &[ImportMapDiagnostic]) {
if !diagnostics.is_empty() {
warn!(
"Import map diagnostics:\n{}",
diagnostics
.iter()
.map(|d| format!(" - {d}"))
.collect::<Vec<_>>()
.join("\n")
);
}
}
30 changes: 0 additions & 30 deletions crates/sb_workers/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,9 @@ use deno_core::{
AsyncRefCell, AsyncResult, BufView, ByteString, CancelFuture, CancelHandle, CancelTryFuture,
OpState, RcRef, Resource, ResourceId, WriteOutcome,
};
use event_worker::events::{
EventMetadata, LogEvent, LogLevel, WorkerEventWithMetadata, WorkerEvents,
};
use hyper::body::HttpBody;
use hyper::header::{HeaderName, HeaderValue};
use hyper::{Body, Request, Response};
use log::error;
use sb_worker_context::essentials::{
CreateUserWorkerResult, UserWorkerMsgs, UserWorkerRuntimeOpts, WorkerContextInitOpts,
WorkerRuntimeOpts,
Expand All @@ -34,7 +30,6 @@ deno_core::extension!(
op_user_worker_create,
op_user_worker_fetch_build,
op_user_worker_fetch_send,
op_user_worker_log,
],
esm = ["user_workers.js"]
);
Expand Down Expand Up @@ -137,31 +132,6 @@ pub async fn op_user_worker_create(
Ok(result.unwrap().key.to_string())
}

#[op]
pub fn op_user_worker_log(state: &mut OpState, msg: &str, is_err: bool) -> Result<(), AnyError> {
let maybe_tx = state.try_borrow::<mpsc::UnboundedSender<WorkerEventWithMetadata>>();
let mut level = LogLevel::Info;
if is_err {
level = LogLevel::Error;
}
if maybe_tx.is_some() {
let event_metadata = state
.try_borrow::<EventMetadata>()
.unwrap_or(&EventMetadata::default())
.clone();
maybe_tx.unwrap().send(WorkerEventWithMetadata {
event: WorkerEvents::Log(LogEvent {
msg: msg.to_string(),
level,
}),
metadata: event_metadata,
})?;
} else {
error!("[{:?}] {}", level, msg);
}
Ok(())
}

#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct UserWorkerRequest {
Expand Down
2 changes: 1 addition & 1 deletion scripts/run.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/env bash

cargo build && RUST_BACKTRACE=full ./target/debug/edge-runtime "$@" start --main-service ./examples/main --event-manager ./examples/event-manager
cargo build && RUST_BACKTRACE=full ./target/debug/edge-runtime "$@" start --main-service ./examples/main --event-worker ./examples/event-manager

0 comments on commit 448dcd1

Please sign in to comment.