Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
jonmmease committed Oct 17, 2024
1 parent 6ee30c2 commit b42db1c
Showing 1 changed file with 59 additions and 36 deletions.
95 changes: 59 additions & 36 deletions vegafusion-wasm/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,35 @@
use futures::{SinkExt, StreamExt};
use prost::Message;

use vegafusion_common::data::scalar::{ScalarValue, ScalarValueHelpers};
use vegafusion_core::proto::gen::tasks::{NodeValueIndex, ResponseTaskValue, TaskGraph, TaskGraphValueRequest, TzConfig, VariableNamespace};
use vegafusion_core::task_graph::task_value::TaskValue;
use vegafusion_core::proto::gen::tasks::{
NodeValueIndex, ResponseTaskValue, TaskGraph, TaskGraphValueRequest, TzConfig,
VariableNamespace,
};
use wasm_bindgen::prelude::*;

use js_sys::Promise;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
// use std::sync::mpsc;
use futures::channel::{oneshot, mpsc as async_mpsc};
use futures::channel::{mpsc as async_mpsc, oneshot};

use wasm_bindgen_futures::{future_to_promise, JsFuture};
use wasm_bindgen_futures::JsFuture;

use serde_json::Value;
use wasm_bindgen_futures::spawn_local;
use vegafusion_common::data::table::VegaFusionTable;

use vegafusion_core::planning::stitch::CommPlan;
use vegafusion_core::planning::watch::{ExportUpdateJSON, ExportUpdateNamespace, WatchPlan};

use vegafusion_core::proto::gen::services::{
query_request, query_result, QueryRequest, QueryResult,
};
use vegafusion_core::spec::chart::ChartSpec;
use vegafusion_core::task_graph::graph::ScopedVariable;

use vegafusion_core::planning::plan::SpecPlan;
use web_sys::Element;
use vegafusion_core::chart_state::ChartState;
use vegafusion_core::data::dataset::VegaFusionDataset;
use vegafusion_core::runtime::VegaFusionRuntimeTrait;
use web_sys::Element;

pub fn set_panic_hook() {
// When the `console_error_panic_hook` feature is enabled, we can call the
Expand All @@ -54,23 +51,29 @@ extern "C" {
}

pub struct VegaFusionWasmRuntime {
sender: async_mpsc::Sender<(QueryRequest, oneshot::Sender<vegafusion_common::error::Result<Vec<ResponseTaskValue>>>)>
sender: async_mpsc::Sender<(
QueryRequest,
oneshot::Sender<vegafusion_common::error::Result<Vec<ResponseTaskValue>>>,
)>,
}

impl VegaFusionWasmRuntime {
pub fn new(query_fn: js_sys::Function) -> Self {
let (sender, mut receiver) = async_mpsc::channel::<(QueryRequest, oneshot::Sender<vegafusion_common::error::Result<Vec<ResponseTaskValue>>>)>(32);
let (sender, mut receiver) = async_mpsc::channel::<(
QueryRequest,
oneshot::Sender<vegafusion_common::error::Result<Vec<ResponseTaskValue>>>,
)>(32);

// Spawn a task to process incoming requests
spawn_local(async move {
while let Some((request_msg, response_tx)) = receiver.next().await {

let mut buf: Vec<u8> = Vec::with_capacity(request_msg.encoded_len());
request_msg.encode(&mut buf).unwrap();

let context =
js_sys::JSON::parse(&serde_json::to_string(&serde_json::Value::Null).unwrap()).unwrap();

js_sys::JSON::parse(&serde_json::to_string(&serde_json::Value::Null).unwrap())
.unwrap();

let js_buffer = js_sys::Uint8Array::from(buf.as_slice());
let promise = query_fn
.call1(&context, &js_buffer)
Expand All @@ -84,30 +87,39 @@ impl VegaFusionWasmRuntime {

match response.response.unwrap() {
query_result::Response::Error(error) => {
response_tx.send(Err(vegafusion_common::error::VegaFusionError::internal(format!("{error:?}")))).unwrap();
},
response_tx
.send(Err(vegafusion_common::error::VegaFusionError::internal(
format!("{error:?}"),
)))
.unwrap();
}
query_result::Response::TaskGraphValues(task_graph_value_response) => {
response_tx.send(Ok(task_graph_value_response.response_values)).unwrap();
},
response_tx
.send(Ok(task_graph_value_response.response_values))
.unwrap();
}
}
}
});

VegaFusionWasmRuntime {
sender
}
VegaFusionWasmRuntime { sender }
}
}
#[async_trait::async_trait]
impl VegaFusionRuntimeTrait for VegaFusionWasmRuntime {
async fn query_request(&self, task_graph: Arc<TaskGraph>, indices: &[NodeValueIndex], _inline_datasets: &HashMap<String, VegaFusionDataset>) -> vegafusion_common::error::Result<Vec<ResponseTaskValue>> {
async fn query_request(
&self,
task_graph: Arc<TaskGraph>,
indices: &[NodeValueIndex],
_inline_datasets: &HashMap<String, VegaFusionDataset>,
) -> vegafusion_common::error::Result<Vec<ResponseTaskValue>> {
// Request initial values
let request_msg = QueryRequest {
request: Some(query_request::Request::TaskGraphValues(
TaskGraphValueRequest {
task_graph: Some(task_graph.as_ref().clone()),
indices: Vec::from(indices),
inline_datasets: vec![], // TODO: inline datasets
inline_datasets: vec![], // TODO: inline datasets
},
)),
};
Expand Down Expand Up @@ -228,7 +240,7 @@ impl ChartHandle {
serde_json::from_str(
&js_sys::JSON::stringify(&val).unwrap().as_string().unwrap(),
)
.unwrap()
.unwrap()
};

if verbose {
Expand Down Expand Up @@ -259,7 +271,7 @@ impl ChartHandle {
let val: serde_json::Value = serde_json::from_str(
&js_sys::JSON::stringify(&val).unwrap().as_string().unwrap(),
)
.unwrap();
.unwrap();
if verbose {
log(&format!("VegaFusion(wasm): Sending data {name}"));
log(&serde_json::to_string_pretty(&val).unwrap());
Expand Down Expand Up @@ -324,13 +336,16 @@ pub async fn render_vegafusion(
};

let runtime = VegaFusionWasmRuntime::new(query_fn);
let chart_state = ChartState::try_new(&runtime, spec, Default::default(), tz_config, None).await.unwrap();
let chart_state = ChartState::try_new(&runtime, spec, Default::default(), tz_config, None)
.await
.unwrap();
// Mount vega chart
let dataflow = parse(
js_sys::JSON::parse(
&serde_json::to_string(chart_state.get_transformed_spec()).expect("Failed to parse spec as JSON"),
&serde_json::to_string(chart_state.get_transformed_spec())
.expect("Failed to parse spec as JSON"),
)
.unwrap(),
.unwrap(),
);

let view = View::new(dataflow);
Expand All @@ -342,15 +357,24 @@ pub async fn render_vegafusion(

let view_rc = Rc::new(view);
let handle = ChartHandle {
state: chart_state, view: view_rc.clone(), verbose, debounce_wait, debounce_max_wait, sender
state: chart_state,
view: view_rc.clone(),
verbose,
debounce_wait,
debounce_max_wait,
sender,
};
handle.register_callbacks();
let inner_handle = handle.clone();

// listen for callback updates
spawn_local(async move {
while let Some(update) = receiver.next().await {
let response_update = inner_handle.state.update(&runtime, vec![update]).await.unwrap();
let response_update = inner_handle
.state
.update(&runtime, vec![update])
.await
.unwrap();
inner_handle.update_view(&response_update);
}
});
Expand All @@ -360,7 +384,6 @@ pub async fn render_vegafusion(
handle
}


#[wasm_bindgen]
pub fn vega_version() -> String {
inner_vega_version()
Expand Down

0 comments on commit b42db1c

Please sign in to comment.