Skip to content

Commit

Permalink
fix(typegraph): send rpc message in chunks in the TS typegraph client (
Browse files Browse the repository at this point in the history
…#904)

- Send the JSON-RPC message is chunks in the TypeScript typegraph client
to prevent reaching the line size limit for stdout. Note: we could not
reproduce the issue locally as it only occurs when using the published
package for Node.js.
- Use JSON-RPC notification for logging and report from the typegraph
clients.
- Other changes:
  - Use relative paths for static task sources in the CLI;
- Fix TODO in `meta gen`: pass the working directory on the
`working_dir` param of `SerializeActionGenerator::new`.

<!-- 2. Explain WHY the change cannot be made simpler -->



<!-- 3. Explain HOW users should update their code -->

#### Migration notes
_N/A_

---

- [ ] The change comes with new or modified tests
- [ ] Hard-to-understand functions have explanatory comments
- [ ] End-user documentation is updated to reflect the change
  • Loading branch information
Natoandro authored Nov 5, 2024
1 parent 18a13d6 commit 43faf11
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 93 deletions.
2 changes: 1 addition & 1 deletion examples/typegraphs/metagen/rs/fdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl Router {
}

pub fn init(&self, args: InitArgs) -> Result<InitResponse, InitError> {
static MT_VERSION: &str = "0.5.0-rc.3";
static MT_VERSION: &str = "0.5.0-rc.4";
if args.metatype_version != MT_VERSION {
return Err(InitError::VersionMismatch(MT_VERSION.into()));
}
Expand Down
7 changes: 4 additions & 3 deletions src/meta-cli/src/cli/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl InputResolver for MetagenCtx {
.join(path)
.canonicalize()
.wrap_err("unable to canonicalize typegraph path, make sure it exists")?;
let raw = load_tg_at(config, path, name.as_deref()).await?;
let raw = load_tg_at(config, path, name.as_deref(), &self.dir).await?;
GeneratorInputResolved::TypegraphFromTypegate { raw }
}
GeneratorInputOrder::LoadFdkTemplate {
Expand All @@ -177,6 +177,7 @@ async fn load_tg_at(
config: Arc<Config>,
path: PathBuf,
name: Option<&str>,
dir: &Path,
) -> anyhow::Result<Box<Typegraph>> {
let console = ConsoleActor::new(Arc::clone(&config)).start();

Expand All @@ -185,7 +186,7 @@ async fn load_tg_at(
config.clone(),
SerializeActionGenerator::new(
config_dir.clone(),
config_dir, // TODO cwd
dir.into(),
config
.prisma_migrations_base_dir(PathOption::Absolute)
.into(),
Expand All @@ -200,7 +201,7 @@ async fn load_tg_at(
let mut tgs = report.into_typegraphs()?;

if tgs.is_empty() {
bail!("not typegraphs loaded from path at {path:?}")
bail!("no typegraphs loaded from path at {path:?}")
}
let tg = if let Some(tg_name) = name {
if let Some(idx) = tgs.iter().position(|tg| tg.name().unwrap() == tg_name) {
Expand Down
186 changes: 120 additions & 66 deletions src/meta-cli/src/deploy/actors/task_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ mod message {
pub(super) struct Exit;
}

#[derive(Clone, Copy)]
enum OutputLevel {
Debug,
Info,
Warning,
Error,
}

#[derive(Serialize, Deserialize, Debug)]
enum JsonRpcVersion {
#[serde(rename = "2.0")]
Expand Down Expand Up @@ -76,8 +68,8 @@ pub(super) struct TaskIoActor<A: TaskAction + 'static> {
action: A,
task: Addr<TaskActor<A>>,
console: Addr<ConsoleActor>,
latest_level: OutputLevel,
results: Vec<ActionResult<A>>,
rpc_message_buffer: String,
}

impl<A: TaskAction + 'static> TaskIoActor<A> {
Expand All @@ -102,8 +94,8 @@ impl<A: TaskAction + 'static> TaskIoActor<A> {
action,
task,
console: console.clone(),
latest_level: OutputLevel::Info,
results: vec![],
rpc_message_buffer: String::new(),
};

let self_addr = ctx.address().downgrade();
Expand Down Expand Up @@ -155,6 +147,25 @@ impl<A: TaskAction + 'static> Actor for TaskIoActor<A> {
}
}

#[derive(Deserialize, Debug)]
struct RpcNotificationMessage {
#[allow(dead_code)]
jsonrpc: JsonRpcVersion,
#[serde(flatten)]
notification: RpcNotification,
}

#[derive(Deserialize, Debug)]
#[serde(tag = "method", content = "params")]
enum RpcNotification {
Debug { message: String },
Info { message: String },
Warning { message: String },
Error { message: String },
Success { data: serde_json::Value },
Failure { data: serde_json::Value },
}

impl<A: TaskAction + 'static> Handler<message::OutputLine> for TaskIoActor<A> {
type Result = ();

Expand All @@ -166,57 +177,28 @@ impl<A: TaskAction + 'static> Handler<message::OutputLine> for TaskIoActor<A> {
Some((prefix, tail)) => {
trace!("prefix: {prefix}");
match prefix {
"debug" => {
console.debug(format!("{scope} {tail}"));
self.latest_level = OutputLevel::Debug;
"jsonrpc^" => {
self.rpc_message_buffer.push_str(tail);
}
"info" => {
console.info(format!("{scope} {tail}"));
self.latest_level = OutputLevel::Info;
}
"warning" => {
console.warning(format!("{scope} {tail}"));
self.latest_level = OutputLevel::Warning;
}
"error" => {
console.error(format!("{scope} {tail}"));
self.latest_level = OutputLevel::Error;
}
"success" => {
match serde_json::from_str(tail) {
Ok(data) => self.results.push(Ok(data)),
Err(err) => {
console.error(format!("{scope} failed to process message: {err}"));
// TODO fail task?
}
}
}
"failure" => {
match serde_json::from_str(tail) {
Ok(data) => {
self.results.push(Err(data));
}
Err(err) => {
console.error(format!("{scope} failed to process message: {err}"));
// TODO fail task?
}
}
}
"jsonrpc" => {
match serde_json::from_str(tail) {
Ok(req) => self.handle_rpc_request(req, ctx.address(), ctx),
Err(err) => {
console.error(format!("{scope} failed to process message: {err}"));
// TODO fail task?
}
}
"jsonrpc$" => {
self.rpc_message_buffer.push_str(tail);
let message = std::mem::take(&mut self.rpc_message_buffer);
self.handle_rpc_message(&message, ctx);
}

_ => self.handle_continuation(&line),
_ => {
// a log message that were not outputted with the log library
// on the typegraph client
// --> as a debug message
console.debug(format!("{scope}$>{line}"));
}
}
}
None => {
self.handle_continuation(&line);
// a log message that were not outputted with the log library
// on the typegraph client
// --> as a debug message
console.debug(format!("{scope}$>{line}"));
}
}
}
Expand All @@ -228,23 +210,95 @@ impl<A: TaskAction + 'static> TaskIoActor<A> {
format!("[{path}]", path = path.yellow())
}

// process as continuation to previous output
fn handle_continuation(&self, line: &str) {
fn handle_rpc_message(&mut self, message: &str, ctx: &mut Context<Self>) {
let console = &self.console;
let scope = self.get_console_scope();
let message: serde_json::Value = match serde_json::from_str(message) {
Ok(value) => value,
Err(err) => {
self.console
.error(format!("{scope} failed to parse JSON-RPC message: {err}"));
// TODO cancel task?
return;
}
};

if message.get("id").is_some() {
// JSON-RPC request
match serde_json::from_value(message) {
Ok(req) => self.handle_rpc_request(req, ctx.address(), ctx),
Err(err) => {
console.error(format!(
"{scope} failed to validate JSON-RPC request: {err}"
));
// TODO cancel task?
}
}
} else {
// JSON-RPC notification
match serde_json::from_value::<RpcNotificationMessage>(message)
.map(|msg| msg.notification)
{
Ok(notification) => self.handle_rpc_notification(notification),
Err(err) => {
console.error(format!(
"{scope} failed to validate JSON-RPC notification: {err}"
));
// TODO cancel task?
}
};
}
}

match self.latest_level {
OutputLevel::Debug => {
console.debug(format!("{scope}>{line}"));
fn handle_rpc_notification(&mut self, notification: RpcNotification) {
let console = &self.console;
let scope = self.get_console_scope();
match notification {
RpcNotification::Debug { message } => {
for line in message.lines() {
console.debug(format!("{scope} {line}"));
}
}
RpcNotification::Info { message } => {
for line in message.lines() {
console.info(format!("{scope} {line}"));
}
}
RpcNotification::Warning { message } => {
for line in message.lines() {
console.warning(format!("{scope} {line}"));
}
}
OutputLevel::Info => {
console.info(format!("{scope}>{line}"));
RpcNotification::Error { message } => {
for line in message.lines() {
console.error(format!("{scope} {line}"));
}
}
OutputLevel::Warning => {
console.warning(format!("{scope}>{line}"));
RpcNotification::Success { data } => {
let data = match serde_json::from_value(data) {
Ok(data) => data,
Err(err) => {
console.error(format!(
"{scope} failed to validate JSON-RPC notification (success): {err}"
));
// TODO cancel task?
return;
}
};
self.results.push(Ok(data));
}
OutputLevel::Error => {
console.error(format!("{scope}>{line}"));
RpcNotification::Failure { data } => {
let data = match serde_json::from_value(data) {
Ok(data) => data,
Err(err) => {
console.error(format!(
"{scope} failed to validate JSON-RPC notification (failure): {err}"
));
// TODO cancel task?
return;
}
};
self.results.push(Err(data));
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/meta-cli/src/deploy/actors/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{config::Config, interlude::*};
use colored::OwoColorize;
use futures::channel::oneshot;
use indexmap::IndexMap;
use pathdiff::diff_paths;
use signal_handler::set_stop_recipient;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -205,9 +206,16 @@ impl<A: TaskAction + 'static> TaskManagerInit<A> {
) -> Option<Addr<WatcherActor<A>>> {
match &self.task_source {
TaskSource::Static(paths) => {
let working_dir = self
.action_generator
.get_shared_config()
.working_dir
.clone();
for path in paths {
let relative_path = diff_paths(path, &working_dir);
addr.do_send(AddTask {
task_ref: task_generator.generate(path.clone().into(), 0),
task_ref: task_generator
.generate(relative_path.unwrap_or_else(|| path.clone()).into(), 0),
reason: TaskReason::User,
});
}
Expand Down
Loading

0 comments on commit 43faf11

Please sign in to comment.