diff --git a/examples/typegraphs/metagen/rs/fdk.rs b/examples/typegraphs/metagen/rs/fdk.rs index 06e8b0b0a..4969ee601 100644 --- a/examples/typegraphs/metagen/rs/fdk.rs +++ b/examples/typegraphs/metagen/rs/fdk.rs @@ -109,7 +109,7 @@ impl Router { } pub fn init(&self, args: InitArgs) -> Result { - static MT_VERSION: &str = "0.5.0-rc.3"; + static MT_VERSION: &str = "0.5.0-rc.4"; if args.metatype_version != MT_VERSION { return Err(InitError::VersionMismatch(MT_VERSION.into())); } diff --git a/src/meta-cli/src/cli/gen.rs b/src/meta-cli/src/cli/gen.rs index 5ee1c8a29..9d3c7236a 100644 --- a/src/meta-cli/src/cli/gen.rs +++ b/src/meta-cli/src/cli/gen.rs @@ -158,7 +158,7 @@ impl InputResolver for MetagenCtx { .join(path) .canonicalize() .wrap_err("unable to canonicalize typegraph path, make sure it exists")?; - let raw = load_tg_at(config, path, name.as_deref()).await?; + let raw = load_tg_at(config, path, name.as_deref(), &self.dir).await?; GeneratorInputResolved::TypegraphFromTypegate { raw } } GeneratorInputOrder::LoadFdkTemplate { @@ -177,6 +177,7 @@ async fn load_tg_at( config: Arc, path: PathBuf, name: Option<&str>, + dir: &Path, ) -> anyhow::Result> { let console = ConsoleActor::new(Arc::clone(&config)).start(); @@ -185,7 +186,7 @@ async fn load_tg_at( config.clone(), SerializeActionGenerator::new( config_dir.clone(), - config_dir, // TODO cwd + dir.into(), config .prisma_migrations_base_dir(PathOption::Absolute) .into(), @@ -200,7 +201,7 @@ async fn load_tg_at( let mut tgs = report.into_typegraphs()?; if tgs.is_empty() { - bail!("not typegraphs loaded from path at {path:?}") + bail!("no typegraphs loaded from path at {path:?}") } let tg = if let Some(tg_name) = name { if let Some(idx) = tgs.iter().position(|tg| tg.name().unwrap() == tg_name) { diff --git a/src/meta-cli/src/deploy/actors/task_io.rs b/src/meta-cli/src/deploy/actors/task_io.rs index c1a69ff52..2343ebc65 100644 --- a/src/meta-cli/src/deploy/actors/task_io.rs +++ b/src/meta-cli/src/deploy/actors/task_io.rs @@ -31,14 +31,6 @@ mod message { pub(super) struct Exit; } -#[derive(Clone, Copy)] -enum OutputLevel { - Debug, - Info, - Warning, - Error, -} - #[derive(Serialize, Deserialize, Debug)] enum JsonRpcVersion { #[serde(rename = "2.0")] @@ -76,8 +68,8 @@ pub(super) struct TaskIoActor { action: A, task: Addr>, console: Addr, - latest_level: OutputLevel, results: Vec>, + rpc_message_buffer: String, } impl TaskIoActor { @@ -102,8 +94,8 @@ impl TaskIoActor { action, task, console: console.clone(), - latest_level: OutputLevel::Info, results: vec![], + rpc_message_buffer: String::new(), }; let self_addr = ctx.address().downgrade(); @@ -155,6 +147,25 @@ impl Actor for TaskIoActor { } } +#[derive(Deserialize, Debug)] +struct RpcNotificationMessage { + #[allow(dead_code)] + jsonrpc: JsonRpcVersion, + #[serde(flatten)] + notification: RpcNotification, +} + +#[derive(Deserialize, Debug)] +#[serde(tag = "method", content = "params")] +enum RpcNotification { + Debug { message: String }, + Info { message: String }, + Warning { message: String }, + Error { message: String }, + Success { data: serde_json::Value }, + Failure { data: serde_json::Value }, +} + impl Handler for TaskIoActor { type Result = (); @@ -166,57 +177,28 @@ impl Handler for TaskIoActor { Some((prefix, tail)) => { trace!("prefix: {prefix}"); match prefix { - "debug" => { - console.debug(format!("{scope} {tail}")); - self.latest_level = OutputLevel::Debug; + "jsonrpc^" => { + self.rpc_message_buffer.push_str(tail); } - "info" => { - console.info(format!("{scope} {tail}")); - self.latest_level = OutputLevel::Info; - } - "warning" => { - console.warning(format!("{scope} {tail}")); - self.latest_level = OutputLevel::Warning; - } - "error" => { - console.error(format!("{scope} {tail}")); - self.latest_level = OutputLevel::Error; - } - "success" => { - match serde_json::from_str(tail) { - Ok(data) => self.results.push(Ok(data)), - Err(err) => { - console.error(format!("{scope} failed to process message: {err}")); - // TODO fail task? - } - } - } - "failure" => { - match serde_json::from_str(tail) { - Ok(data) => { - self.results.push(Err(data)); - } - Err(err) => { - console.error(format!("{scope} failed to process message: {err}")); - // TODO fail task? - } - } - } - "jsonrpc" => { - match serde_json::from_str(tail) { - Ok(req) => self.handle_rpc_request(req, ctx.address(), ctx), - Err(err) => { - console.error(format!("{scope} failed to process message: {err}")); - // TODO fail task? - } - } + "jsonrpc$" => { + self.rpc_message_buffer.push_str(tail); + let message = std::mem::take(&mut self.rpc_message_buffer); + self.handle_rpc_message(&message, ctx); } - _ => self.handle_continuation(&line), + _ => { + // a log message that were not outputted with the log library + // on the typegraph client + // --> as a debug message + console.debug(format!("{scope}$>{line}")); + } } } None => { - self.handle_continuation(&line); + // a log message that were not outputted with the log library + // on the typegraph client + // --> as a debug message + console.debug(format!("{scope}$>{line}")); } } } @@ -228,23 +210,95 @@ impl TaskIoActor { format!("[{path}]", path = path.yellow()) } - // process as continuation to previous output - fn handle_continuation(&self, line: &str) { + fn handle_rpc_message(&mut self, message: &str, ctx: &mut Context) { let console = &self.console; let scope = self.get_console_scope(); + let message: serde_json::Value = match serde_json::from_str(message) { + Ok(value) => value, + Err(err) => { + self.console + .error(format!("{scope} failed to parse JSON-RPC message: {err}")); + // TODO cancel task? + return; + } + }; + + if message.get("id").is_some() { + // JSON-RPC request + match serde_json::from_value(message) { + Ok(req) => self.handle_rpc_request(req, ctx.address(), ctx), + Err(err) => { + console.error(format!( + "{scope} failed to validate JSON-RPC request: {err}" + )); + // TODO cancel task? + } + } + } else { + // JSON-RPC notification + match serde_json::from_value::(message) + .map(|msg| msg.notification) + { + Ok(notification) => self.handle_rpc_notification(notification), + Err(err) => { + console.error(format!( + "{scope} failed to validate JSON-RPC notification: {err}" + )); + // TODO cancel task? + } + }; + } + } - match self.latest_level { - OutputLevel::Debug => { - console.debug(format!("{scope}>{line}")); + fn handle_rpc_notification(&mut self, notification: RpcNotification) { + let console = &self.console; + let scope = self.get_console_scope(); + match notification { + RpcNotification::Debug { message } => { + for line in message.lines() { + console.debug(format!("{scope} {line}")); + } + } + RpcNotification::Info { message } => { + for line in message.lines() { + console.info(format!("{scope} {line}")); + } + } + RpcNotification::Warning { message } => { + for line in message.lines() { + console.warning(format!("{scope} {line}")); + } } - OutputLevel::Info => { - console.info(format!("{scope}>{line}")); + RpcNotification::Error { message } => { + for line in message.lines() { + console.error(format!("{scope} {line}")); + } } - OutputLevel::Warning => { - console.warning(format!("{scope}>{line}")); + RpcNotification::Success { data } => { + let data = match serde_json::from_value(data) { + Ok(data) => data, + Err(err) => { + console.error(format!( + "{scope} failed to validate JSON-RPC notification (success): {err}" + )); + // TODO cancel task? + return; + } + }; + self.results.push(Ok(data)); } - OutputLevel::Error => { - console.error(format!("{scope}>{line}")); + RpcNotification::Failure { data } => { + let data = match serde_json::from_value(data) { + Ok(data) => data, + Err(err) => { + console.error(format!( + "{scope} failed to validate JSON-RPC notification (failure): {err}" + )); + // TODO cancel task? + return; + } + }; + self.results.push(Err(data)); } } } diff --git a/src/meta-cli/src/deploy/actors/task_manager.rs b/src/meta-cli/src/deploy/actors/task_manager.rs index 1e679d393..8324aa31f 100644 --- a/src/meta-cli/src/deploy/actors/task_manager.rs +++ b/src/meta-cli/src/deploy/actors/task_manager.rs @@ -9,6 +9,7 @@ use crate::{config::Config, interlude::*}; use colored::OwoColorize; use futures::channel::oneshot; use indexmap::IndexMap; +use pathdiff::diff_paths; use signal_handler::set_stop_recipient; use std::collections::VecDeque; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -205,9 +206,16 @@ impl TaskManagerInit { ) -> Option>> { match &self.task_source { TaskSource::Static(paths) => { + let working_dir = self + .action_generator + .get_shared_config() + .working_dir + .clone(); for path in paths { + let relative_path = diff_paths(path, &working_dir); addr.do_send(AddTask { - task_ref: task_generator.generate(path.clone().into(), 0), + task_ref: task_generator + .generate(relative_path.unwrap_or_else(|| path.clone()).into(), 0), reason: TaskReason::User, }); } diff --git a/src/typegraph/deno/src/io.ts b/src/typegraph/deno/src/io.ts index c538db036..e11fab63a 100644 --- a/src/typegraph/deno/src/io.ts +++ b/src/typegraph/deno/src/io.ts @@ -7,6 +7,38 @@ import process from "node:process"; * see: module level documentation `meta-cli/src/deploy/actors/task.rs` */ +const JSONRPC_VERSION = "2.0"; + +function writeRpcMessage(message: string) { + // split into 32-KiB chunks + const chunkSize = 32758; // 32 KiB - 10 bytes for "jsonrpc^: " or "jsonrpc$: " + for (let i = 0; i < message.length; i += chunkSize) { + const chunk = message.slice(i, i + chunkSize); + if (i + chunkSize < message.length) { + process.stdout.write(`jsonrpc^: ${chunk}\n`); + continue; + } + process.stdout.write(`jsonrpc$: ${message.slice(i, i + chunkSize)}\n`); + } +} + +type RpcNotificationMethod = + | "Debug" + | "Info" + | "Warning" + | "Error" + | "Success" + | "Failure"; + +const rpcNotify = (method: RpcNotificationMethod, params: any = null) => { + const message = JSON.stringify({ + jsonrpc: JSONRPC_VERSION, + method, + params, + }); + writeRpcMessage(message); +}; + function getOutput(args: any[]) { return args .map((arg) => { @@ -23,28 +55,27 @@ function getOutput(args: any[]) { export const log = { debug(...args: any[]) { - const output = getOutput(args); - process.stdout.write(`debug: ${output}\n`); + rpcNotify("Debug", { message: getOutput(args) }); }, info(...args: any[]) { - const output = getOutput(args); - process.stdout.write(`info: ${output}\n`); + rpcNotify("Info", { message: getOutput(args) }); }, warn(...args: any[]) { - const output = getOutput(args); - process.stdout.write(`warning: ${output}\n`); + rpcNotify("Warning", { message: getOutput(args) }); }, error(...args: any[]) { - const output = getOutput(args); - process.stdout.write(`error: ${output}\n`); + rpcNotify("Error", { message: getOutput(args) }); }, failure(data: any) { - process.stdout.write(`failure: ${JSON.stringify(data)}\n`); + rpcNotify("Failure", { data: data }); }, success(data: any, noEncode = false) { - const encoded = noEncode ? data : JSON.stringify(data); - process.stdout.write(`success: ${encoded}\n`); + if (noEncode) { + rpcNotify("Success", { data: JSON.parse(data) }); + } else { + rpcNotify("Success", { data: data }); + } }, }; @@ -89,8 +120,6 @@ class RpcResponseReader { } } -const JSONRPC_VERSION = "2.0"; - const rpcCall = (() => { const responseReader = new RpcResponseReader(); let latestRpcId = 0; @@ -104,7 +133,7 @@ const rpcCall = (() => { params, }); - process.stdout.write(`jsonrpc: ${rpcMessage}\n`); + writeRpcMessage(rpcMessage); return responseReader.read(rpcId); }; })(); diff --git a/src/typegraph/python/typegraph/io.py b/src/typegraph/python/typegraph/io.py index 92bb66f38..df43374bf 100644 --- a/src/typegraph/python/typegraph/io.py +++ b/src/typegraph/python/typegraph/io.py @@ -12,6 +12,22 @@ _JSONRPC_VERSION = "2.0" +def write_rpc_message(message: str): + # we do not chunk the message as Python's print function supports long lines + print(f"jsonrpc$: {message}") + + +def rpc_notify(method: str, params: Any): + message = json.dumps( + { + "jsonrpc": _JSONRPC_VERSION, + "method": method, + "params": params, + } + ) + write_rpc_message(message) + + class Log: @staticmethod def __format(*largs: Any): @@ -19,30 +35,31 @@ def __format(*largs: Any): @staticmethod def debug(*largs: Any): - print("debug:", Log.__format(*largs)) + rpc_notify("Debug", {"message": Log.__format(*largs)}) @staticmethod def info(*largs: Any): - print("info:", Log.__format(*largs)) + rpc_notify("Info", {"message": Log.__format(*largs)}) @staticmethod def warn(*largs: Any): - print("warning:", Log.__format(*largs)) + rpc_notify("Warning", {"message": Log.__format(*largs)}) @staticmethod def error(*largs: Any): - print("error:", Log.__format(*largs)) + rpc_notify("Error", {"message": Log.__format(*largs)}) @staticmethod def failure(data: Any): - print("failure:", json.dumps(data)) + rpc_notify("Failure", {"data": data}) @staticmethod def success(data: Any, noencode: bool = False): if noencode: - print("success:", data) + parsed = json.loads(data) + rpc_notify("Success", {"data": parsed}) else: - print("success:", json.dumps(data)) + rpc_notify("Success", {"data": data}) class _RpcResponseReader: @@ -90,7 +107,7 @@ def call(cls, method: str, params: Any): } ) - print(f"jsonrpc: {rpc_message}") + write_rpc_message(rpc_message) return cls.response_reader.read(rpc_id)