diff --git a/vegafusion-wasm/src/lib.rs b/vegafusion-wasm/src/lib.rs index d8fdcf78..455a08c0 100644 --- a/vegafusion-wasm/src/lib.rs +++ b/vegafusion-wasm/src/lib.rs @@ -1,38 +1,35 @@ use futures::{SinkExt, StreamExt}; use prost::Message; -use vegafusion_common::data::scalar::{ScalarValue, ScalarValueHelpers}; -use vegafusion_core::proto::gen::tasks::{NodeValueIndex, ResponseTaskValue, TaskGraph, TaskGraphValueRequest, TzConfig, VariableNamespace}; -use vegafusion_core::task_graph::task_value::TaskValue; +use vegafusion_core::proto::gen::tasks::{ + NodeValueIndex, ResponseTaskValue, TaskGraph, TaskGraphValueRequest, TzConfig, + VariableNamespace, +}; use wasm_bindgen::prelude::*; use js_sys::Promise; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::rc::Rc; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; // use std::sync::mpsc; -use futures::channel::{oneshot, mpsc as async_mpsc}; +use futures::channel::{mpsc as async_mpsc, oneshot}; -use wasm_bindgen_futures::{future_to_promise, JsFuture}; +use wasm_bindgen_futures::JsFuture; use serde_json::Value; use wasm_bindgen_futures::spawn_local; -use vegafusion_common::data::table::VegaFusionTable; -use vegafusion_core::planning::stitch::CommPlan; use vegafusion_core::planning::watch::{ExportUpdateJSON, ExportUpdateNamespace, WatchPlan}; use vegafusion_core::proto::gen::services::{ query_request, query_result, QueryRequest, QueryResult, }; use vegafusion_core::spec::chart::ChartSpec; -use vegafusion_core::task_graph::graph::ScopedVariable; -use vegafusion_core::planning::plan::SpecPlan; -use web_sys::Element; use vegafusion_core::chart_state::ChartState; use vegafusion_core::data::dataset::VegaFusionDataset; use vegafusion_core::runtime::VegaFusionRuntimeTrait; +use web_sys::Element; pub fn set_panic_hook() { // When the `console_error_panic_hook` feature is enabled, we can call the @@ -54,23 +51,29 @@ extern "C" { } pub struct VegaFusionWasmRuntime { - sender: async_mpsc::Sender<(QueryRequest, oneshot::Sender>>)> + sender: async_mpsc::Sender<( + QueryRequest, + oneshot::Sender>>, + )>, } impl VegaFusionWasmRuntime { pub fn new(query_fn: js_sys::Function) -> Self { - let (sender, mut receiver) = async_mpsc::channel::<(QueryRequest, oneshot::Sender>>)>(32); + let (sender, mut receiver) = async_mpsc::channel::<( + QueryRequest, + oneshot::Sender>>, + )>(32); // Spawn a task to process incoming requests spawn_local(async move { while let Some((request_msg, response_tx)) = receiver.next().await { - let mut buf: Vec = Vec::with_capacity(request_msg.encoded_len()); request_msg.encode(&mut buf).unwrap(); - + let context = - js_sys::JSON::parse(&serde_json::to_string(&serde_json::Value::Null).unwrap()).unwrap(); - + js_sys::JSON::parse(&serde_json::to_string(&serde_json::Value::Null).unwrap()) + .unwrap(); + let js_buffer = js_sys::Uint8Array::from(buf.as_slice()); let promise = query_fn .call1(&context, &js_buffer) @@ -84,30 +87,39 @@ impl VegaFusionWasmRuntime { match response.response.unwrap() { query_result::Response::Error(error) => { - response_tx.send(Err(vegafusion_common::error::VegaFusionError::internal(format!("{error:?}")))).unwrap(); - }, + response_tx + .send(Err(vegafusion_common::error::VegaFusionError::internal( + format!("{error:?}"), + ))) + .unwrap(); + } query_result::Response::TaskGraphValues(task_graph_value_response) => { - response_tx.send(Ok(task_graph_value_response.response_values)).unwrap(); - }, + response_tx + .send(Ok(task_graph_value_response.response_values)) + .unwrap(); + } } } }); - VegaFusionWasmRuntime { - sender - } + VegaFusionWasmRuntime { sender } } } #[async_trait::async_trait] impl VegaFusionRuntimeTrait for VegaFusionWasmRuntime { - async fn query_request(&self, task_graph: Arc, indices: &[NodeValueIndex], _inline_datasets: &HashMap) -> vegafusion_common::error::Result> { + async fn query_request( + &self, + task_graph: Arc, + indices: &[NodeValueIndex], + _inline_datasets: &HashMap, + ) -> vegafusion_common::error::Result> { // Request initial values let request_msg = QueryRequest { request: Some(query_request::Request::TaskGraphValues( TaskGraphValueRequest { task_graph: Some(task_graph.as_ref().clone()), indices: Vec::from(indices), - inline_datasets: vec![], // TODO: inline datasets + inline_datasets: vec![], // TODO: inline datasets }, )), }; @@ -228,7 +240,7 @@ impl ChartHandle { serde_json::from_str( &js_sys::JSON::stringify(&val).unwrap().as_string().unwrap(), ) - .unwrap() + .unwrap() }; if verbose { @@ -259,7 +271,7 @@ impl ChartHandle { let val: serde_json::Value = serde_json::from_str( &js_sys::JSON::stringify(&val).unwrap().as_string().unwrap(), ) - .unwrap(); + .unwrap(); if verbose { log(&format!("VegaFusion(wasm): Sending data {name}")); log(&serde_json::to_string_pretty(&val).unwrap()); @@ -324,13 +336,16 @@ pub async fn render_vegafusion( }; let runtime = VegaFusionWasmRuntime::new(query_fn); - let chart_state = ChartState::try_new(&runtime, spec, Default::default(), tz_config, None).await.unwrap(); + let chart_state = ChartState::try_new(&runtime, spec, Default::default(), tz_config, None) + .await + .unwrap(); // Mount vega chart let dataflow = parse( js_sys::JSON::parse( - &serde_json::to_string(chart_state.get_transformed_spec()).expect("Failed to parse spec as JSON"), + &serde_json::to_string(chart_state.get_transformed_spec()) + .expect("Failed to parse spec as JSON"), ) - .unwrap(), + .unwrap(), ); let view = View::new(dataflow); @@ -342,15 +357,24 @@ pub async fn render_vegafusion( let view_rc = Rc::new(view); let handle = ChartHandle { - state: chart_state, view: view_rc.clone(), verbose, debounce_wait, debounce_max_wait, sender + state: chart_state, + view: view_rc.clone(), + verbose, + debounce_wait, + debounce_max_wait, + sender, }; handle.register_callbacks(); let inner_handle = handle.clone(); - + // listen for callback updates spawn_local(async move { while let Some(update) = receiver.next().await { - let response_update = inner_handle.state.update(&runtime, vec![update]).await.unwrap(); + let response_update = inner_handle + .state + .update(&runtime, vec![update]) + .await + .unwrap(); inner_handle.update_view(&response_update); } }); @@ -360,7 +384,6 @@ pub async fn render_vegafusion( handle } - #[wasm_bindgen] pub fn vega_version() -> String { inner_vega_version()