diff --git a/.github/workflows/nodejs.yml b/.github/workflows/nodejs.yml index a57f592..45b5bda 100644 --- a/.github/workflows/nodejs.yml +++ b/.github/workflows/nodejs.yml @@ -24,7 +24,6 @@ jobs: matrix: node_version: - 16 - - 17 - 18 - 19 - 20 diff --git a/README.md b/README.md index 26eab9e..8d75c20 100644 --- a/README.md +++ b/README.md @@ -113,16 +113,132 @@ require them. OPENAI_API_KEY=... ``` -### Example +### Examples + +In the table below are examples for Node.js, Python and Rust. You'll need to scroll horizontally to view each. + +The following examples show how to build a simple agent that fetches the top stories from Hacker News and call +the OpenAI API to filter to AI related launches and then format that data into markdown. Results from the example +are pushed into the Chidori database and can be visualized using the prompt-graph-ui project. We'll update this example +with a pattern that makes those results more accessible soon. + +
Node.js Python Rust
+```javascript +const axios = require('axios'); +const {Chidori, GraphBuilder} = require("@1kbirds/chidori"); + +class Story { + constructor(title, url, score) { + this.title = title; + this.url = url; + this.score = score; + } +} + +const HN_URL_TOP_STORIES = "https://hacker-news.firebaseio.com/v0/topstories.json?print=pretty"; + +function fetchStory(id) { + return axios.get(`https://hacker-news.firebaseio.com/v0/item/${id}.json?print=pretty`) + .then(response => response.data); +} + +function fetchHN() { + return axios.get(HN_URL_TOP_STORIES) + .then(response => { + const storyIds = response.data; + const tasks = storyIds.slice(0, 30).map(id => fetchStory(id)); // Limit to 30 stories + return Promise.all(tasks) + .then(stories => { + return stories.map(story => { + const { title, url, score } = story; + return new Story(title, url, score); + }); + }); + }); +} + +class ChidoriWorker { + constructor() { + this.c = new Chidori("0", "http://localhost:9800"); // Assuming this is a connection object, replaced with an empty object for now + } + + async buildGraph() { + const g = new GraphBuilder(); + + const h = g.customNode({ + name: "FetchTopHN", + nodeTypeName: "FetchTopHN", + output: "type FetchTopHN { output: String }" + }); + + const hInterpret = g.promptNode({ + name: "InterpretTheGroup", + template: ` + Based on the following list of HackerNews threads, + filter this list to only launches of new AI projects: {{FetchTopHN.output}} + ` + }); + hInterpret.runWhen(g, h); + + const hFormatAndRank = g.promptNode({ + name: "FormatAndRank", + template: ` + Format this list of new AI projects in markdown, ranking the most + interesting projects from most interesting to least. + + {{InterpretTheGroup.promptResult}} + ` + }); + hFormatAndRank.runWhen(g, hInterpret); + + await g.commit(this.c, 0) + } + + async run() { + // Construct the agent graph + await this.buildGraph(); + + // Start graph execution from the root + // Implement the functionality of the play function + await this.c.play(0, 0); + + // Run the node execution loop + // Implement the functionality of the run_custom_node_loop function + await this.c.runCustomNodeLoop() + } +} + + +async function handleFetchHN(nodeWillExec, cb) { + const stories = await fetchHN(); + // return JSON.stringify(stories); + return cb({ "output": JSON.stringify(stories) }); + // return ; +} + +async function main() { + let w = new ChidoriWorker(); + await w.c.startServer(":memory:") + await w.c.registerCustomNodeHandle("FetchTopHN", handleFetchHN); + await w.run() +} + + +main(); + +``` + + + ```python import aiohttp import asyncio diff --git a/toolchain/chidori/package_node/index.js b/toolchain/chidori/package_node/index.js index f631c5a..14fa489 100644 --- a/toolchain/chidori/package_node/index.js +++ b/toolchain/chidori/package_node/index.js @@ -1,10 +1,7 @@ "use strict"; -const { promisify } = require("util"); - const { simpleFun, - nodehandleDebugExample, nodehandleRunWhen, nodehandleQuery, chidoriNew, @@ -15,21 +12,37 @@ const { chidoriBranch, chidoriQuery, chidoriGraphStructure, - chidoriCustomNode, - chidoriDenoCodeNode, - chidoriVectorMemoryNode + chidoriRegisterCustomNodeHandle, + chidoriRunCustomNodeLoop, + graphbuilderNew, + graphbuilderCustomNode, + graphbuilderPromptNode, + graphbuilderDenoCodeNode, + graphbuilderVectorMemoryNode, + graphbuilderCommit } = require("./native/chidori.node"); +const toSnakeCase = str => str.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`); +const transformKeys = (obj) => { + if (Array.isArray(obj)) { + return obj.map(val => transformKeys(val)); + } else if (obj !== null && obj.constructor === Object) { + return Object.keys(obj).reduce((accumulator, key) => { + accumulator[toSnakeCase(key)] = transformKeys(obj[key]); + return accumulator; + }, {}); + } + return obj; +}; -// Wrapper class for the boxed `Database` for idiomatic JavaScript usage class NodeHandle { - constructor() { - this.nh = nodehandleDebugExample(); + constructor(nh) { + this.nh = nh; } - runWhen(otherNodeHandle) { - return nodehandleRunWhen.call(this.nh, otherNodeHandle); + runWhen(graphBuilder, otherNodeHandle) { + return nodehandleRunWhen.call(this.nh, graphBuilder.g, otherNodeHandle.nh); } query(branch, frame) { @@ -43,8 +56,8 @@ class Chidori { this.chi = chidoriNew(fileId, url); } - startServer() { - return chidoriStartServer.call(this.chi); + startServer(filePath) { + return chidoriStartServer.call(this.chi, filePath); } objectInterface(executionStatus) { @@ -55,7 +68,7 @@ class Chidori { return chidoriPlay.call(this.chi, branch, frame); } - pause() { + pause(branch, frame) { return chidoriPause.call(this.chi, branch, frame); } @@ -63,34 +76,55 @@ class Chidori { return chidoriQuery.call(this.chi, query, branch, frame) } - branch() { + branch(branch, frame) { return chidoriBranch.call(this.chi, branch, frame); } - graphStructure() { - return chidoriGraphStructure.call(this.chi); + graphStructure(branch) { + return chidoriGraphStructure.call(this.chi, branch); + } + + registerCustomNodeHandle(nodeTypeName, handle) { + // TODO: we actually pass a callback to the function provided by the user, which they invoke with their result + return chidoriRegisterCustomNodeHandle.call(this.chi, nodeTypeName, handle); + } + + runCustomNodeLoop() { + return chidoriRunCustomNodeLoop.call(this.chi); } - objInterface() { - return chidoriObjInterface.call(this.chi, branch, frame); +} + +class GraphBuilder { + constructor() { + this.g = graphbuilderNew(); } - customNode(customNodeCreateOpts) { - return chidoriCustomNode.call(this.chi, createCustomNodeOpts); + customNode(createCustomNodeOpts) { + return new NodeHandle(graphbuilderCustomNode.call(this.g, transformKeys(createCustomNodeOpts))); + } + + promptNode(promptNodeCreateOpts) { + return new NodeHandle(graphbuilderPromptNode.call(this.g, transformKeys(promptNodeCreateOpts))); } denoCodeNode(denoCodeNodeCreateOpts) { - return chidoriDenoCodeNode.call(this.chi, denoCodeNodeCreateOpts); + return new NodeHandle(graphbuilderDenoCodeNode.call(this.g, transformKeys(denoCodeNodeCreateOpts))); } vectorMemoryNode(vectorMemoryNodeCreateOpts) { - return chidoriVectorMemoryNode.call(this.chi, vectorMemoryNodeCreateOpts); + return new NodeHandle(graphbuilderVectorMemoryNode.call(this.g, transformKeys(vectorMemoryNodeCreateOpts))); + } + + commit(chidori) { + return graphbuilderCommit.call(this.g, chidori.chi, 0); } } module.exports = { Chidori: Chidori, + GraphBuilder: GraphBuilder, NodeHandle: NodeHandle, simpleFun: simpleFun }; \ No newline at end of file diff --git a/toolchain/chidori/src/translations/nodejs.rs b/toolchain/chidori/src/translations/nodejs.rs index 0c790e2..9fa8823 100644 --- a/toolchain/chidori/src/translations/nodejs.rs +++ b/toolchain/chidori/src/translations/nodejs.rs @@ -1,10 +1,14 @@ use std::cell::RefCell; use std::collections::{HashMap, VecDeque}; +use std::future::Future; use std::marker::PhantomData; +use std::sync::{Arc}; +use tokio::sync::{mpsc, Mutex}; use anyhow::Error; use futures::StreamExt; use log::{debug, info}; use neon::{prelude::*, types::Deferred}; +use neon::handle::Managed; use neon::result::Throw; use once_cell::sync::OnceCell; use tokio::runtime::Runtime; @@ -15,7 +19,9 @@ use prompt_graph_core::proto2::execution_runtime_client::ExecutionRuntimeClient; use prompt_graph_core::proto2::serialized_value::Val; use prompt_graph_exec::tonic_runtime::run_server; use neon_serde3; +use prost::bytes::Buf; use serde::{Deserialize, Serialize}; +use crate::translations::rust::{Chidori, CustomNodeCreateOpts, DenoCodeNodeCreateOpts, GraphBuilder, Handler, NodeHandle, PromptNodeCreateOpts, VectorMemoryNodeCreateOpts}; // Return a global tokio runtime or create one if it doesn't exist. // Throws a JavaScript exception if the `Runtime` fails to create. @@ -31,25 +37,6 @@ async fn get_client(url: String) -> Result anyhow::Result { - let mut client = get_client(url.clone()).await?; - let exec_status = client.merge(RequestFileMerge { - id: file_id.clone(), - file: Some(File { - nodes: vec![node.clone()], - ..Default::default() - }), - branch: 0, - }).await?.into_inner(); - Ok(NodeHandle::from( - url.clone(), - file_id.clone(), - node, - exec_status - )?) -} - - #[derive(Debug)] pub struct SerializedValueWrapper(SerializedValue); @@ -116,161 +103,69 @@ fn from_js_value<'a, C: Context<'a>>(cx: &mut C, value: Handle) -> Neon cx.throw_error("Unsupported type") } +macro_rules! return_or_throw_deferred { + ($channel:expr, $deferred:expr, $m:expr) => { + if let Ok(result) = $m { + $deferred.settle_with($channel, move |mut cx| { + neon_serde3::to_value(&mut cx, &result) + .or_else(|e| cx.throw_error(e.to_string())) + }); + } else { + $deferred.settle_with($channel, move |mut cx| { + cx.throw_error("Error playing") + }); + } + }; +} + // Node handle #[derive(Clone)] -pub struct NodeHandle { - url: String, - file_id: String, - node: Item, - exec_status: ExecutionStatus, - indiv: CleanIndividualNode +pub struct NodeNodeHandle { + n: NodeHandle } -impl NodeHandle { - fn example() -> Self{ - let node = create_code_node( - "Example".to_string(), - vec![None], - "type O { output: String }".to_string(), - SourceNodeType::Code("DENO".to_string(), r#"return {"output": "hello"}"#.to_string(), false), - vec![], - ); - let indiv = derive_for_individual_node(&node).unwrap(); - NodeHandle { - url: "localhost:9800".to_string(), - file_id: "0".to_string(), - node: node, - exec_status: Default::default(), - indiv, - } - } - - fn from(url: String, file_id: String, node: Item, exec_status: ExecutionStatus) -> anyhow::Result { - let indiv = derive_for_individual_node(&node)?; - Ok(NodeHandle { - url, - file_id, - node, - exec_status, - indiv - }) +impl NodeNodeHandle { + fn from(n: NodeHandle) -> NodeNodeHandle { + NodeNodeHandle{ n } } } -impl Finalize for NodeHandle {} - +impl Finalize for NodeNodeHandle {} -impl NodeHandle { - pub fn js_debug_example(mut cx: FunctionContext) -> JsResult>> { - let nh = NodeHandle::example(); - Ok(cx.boxed(RefCell::new(nh))) - } +impl NodeNodeHandle { fn get_name(&self) -> String { - self.node.core.as_ref().unwrap().name.clone() + self.n.get_name() } pub fn run_when(mut cx: FunctionContext) -> JsResult { let channel = cx.channel(); let (deferred, promise) = cx.promise(); - let other_node = cx.argument::>>(0)?.downcast_or_throw::>, _>(&mut cx)?; - - let self_ = cx.this() - .downcast_or_throw::>, _>(&mut cx)?; - - let mut self_borrow = self_.borrow_mut(); - let queries = &mut self_borrow.node.core.as_mut().unwrap().queries; - - // Get the constructed query from the target node - let q = construct_query_from_output_type( - &other_node.borrow().get_name(), - &other_node.borrow().get_name(), - &self_.borrow().indiv.output_path - ).unwrap(); - - queries.push(Query { query: Some(q)}); - - let url = self_.borrow().url.clone(); - let file_id = self_.borrow().file_id.clone(); - let node = self_.borrow().node.clone(); - let rt = runtime(&mut cx)?; - rt.spawn(async move { - let result = push_file_merge(&url, &file_id, node).await.unwrap(); - deferred.settle_with(&channel, move |mut cx| { - Ok(cx.boolean(true)) - }); - }); - Ok(promise) - } - - - pub fn query(mut cx: FunctionContext) -> JsResult { - let (deferred, promise) = cx.promise(); - let channel = cx.channel(); - - let branch = cx.argument::(0)?.value(&mut cx) as u64; - let frame = cx.argument::(1)?.value(&mut cx) as u64; - let self_ = cx.this() - .downcast_or_throw::>, _>(&mut cx)?; - let mut self_borrow = self_.borrow(); - let file_id = self_borrow.file_id.clone(); - let url = self_borrow.url.clone(); - let name = &self_borrow.node.core.as_ref().unwrap().name; - - let query = construct_query_from_output_type(&name, &name, &self_.borrow().indiv.output_path).unwrap(); - - let rt = runtime(&mut cx)?; - rt.spawn(async move { - let mut client = if let Ok(mut client) = get_client(url).await { - client + .downcast_or_throw::>, _>(&mut cx)?; + + let graph_builder = cx.argument::>(0)?; + let other_node_handle = cx.argument::>>(1)?; + + let mut n = &mut self_.borrow_mut().n; + let g = &graph_builder.g; + let mut graph_builder = g.blocking_lock(); + let other_node = &other_node_handle.borrow().n; + let m = n.run_when(&mut graph_builder, &other_node); + deferred.settle_with((&channel), move |mut cx| { + if let Ok(result) = m { + Ok(cx.boolean(result)) } else { - deferred.settle_with(&channel, move |mut cx| { - cx.throw_error::<&str, JsUndefined>("Failed to connect to runtime service."); - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - let result = client.run_query(QueryAtFrame { - id: file_id, - query: Some(Query { - query: Some(query) - }), - frame, - branch, - }).await; - deferred.settle_with(&channel, move |mut cx| { - if let Ok(result) = result { - let res = result.into_inner(); - let mut obj = cx.empty_object(); - for value in res.values.iter() { - let c = value.change_value.as_ref().unwrap(); - let k = c.path.as_ref().unwrap().address.join(":"); - let v = c.value.as_ref().unwrap().clone(); - let js = SerializedValueWrapper(v).to_object(&mut cx); - obj.set(&mut cx, k.as_str(), js?).unwrap(); - } - Ok(obj) - } else { - cx.throw_error("Failed to query") - } - }); + cx.throw_error("Error playing") + } }); Ok(promise) - } + } - // fn js_to_string(mut cx: FunctionContext) -> JsResult { - // let branch = cx.argument::(0)?.value(&mut cx); - // let frame = cx.argument::(1)?.value(&mut cx); + // pub fn query(mut cx: FunctionContext) -> JsResult { // - // let channel = cx.channel(); - // - // // let name = self.get_name(); - // Ok(format!("NodeHandle(file_id={}, node={})", self.file_id, name)) - // // - // // Ok(cx.undefined()) // } } @@ -305,64 +200,15 @@ fn obj_to_paths<'a, C: Context<'a>>(cx: &mut C, d: Handle) -> NeonResu Ok(paths) } - - - -#[derive(serde::Serialize, serde::Deserialize)] -struct PromptNodeCreateOpts { - name: String, - queries: Option>, - output_tables: Option>, - template: String, - model: Option +struct NodeChidori { + c: Arc> } +impl Finalize for NodeChidori {} -#[derive(serde::Serialize, serde::Deserialize)] -struct CustomNodeCreateOpts { - name: String, - queries: Option>, - output_tables: Option>, - output: Option, - node_type_name: String -} +impl NodeChidori { -#[derive(serde::Serialize, serde::Deserialize)] -struct DenoCodeNodeCreateOpts { - name: String, - queries: Option>, - output_tables: Option>, - output: Option, - code: String, - is_template: Option -} - -#[derive(serde::Serialize, serde::Deserialize)] -struct VectorMemoryNodeCreateOpts { - name: String, - queries: Option>, - output_tables: Option>, - output: Option, - template: Option, // TODO: default is the contents of the query - action: Option, // TODO: default WRITE - embedding_model: Option, // TODO: default TEXT_EMBEDDING_ADA_002 - db_vendor: Option, // TODO: default QDRANT - collection_name: String, -} - - -struct Chidori { - file_id: String, - current_head: u64, - current_branch: u64, - url: String -} - -impl Finalize for Chidori {} - -impl Chidori { - - fn js_new(mut cx: FunctionContext) -> JsResult> { + fn js_new(mut cx: FunctionContext) -> JsResult> { let file_id = cx.argument::(0)?.value(&mut cx); let url = cx.argument::(1)?.value(&mut cx); @@ -371,55 +217,29 @@ impl Chidori { } // let api_token = cx.argument_opt(2)?.value(&mut cx); debug!("Creating new Chidori instance with file_id={}, url={}, api_token={:?}", file_id, url, "".to_string()); - Ok(cx.boxed(Chidori { - file_id, - current_head: 0, - current_branch: 0, - url, + Ok(cx.boxed(NodeChidori { + c: Arc::new(Mutex::new(Chidori::new(file_id, url))), })) } fn start_server(mut cx: FunctionContext) -> JsResult { let channel = cx.channel(); let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; + .downcast_or_throw::, _>(&mut cx)?; let (deferred, promise) = cx.promise(); - let url_server = self_.url.clone(); - let file_path: Option = match cx.argument_opt(0) { - Some(v) => Some(v.downcast_or_throw(&mut cx)), - None => None, - }.map(|p: JsResult| p.unwrap().value(&mut cx)); - std::thread::spawn(move || { - let result = run_server(url_server, file_path); - match result { - Ok(_) => { - println!("Server exited"); - }, - Err(e) => { - println!("Error running server: {}", e); - }, - } - }); - - let url = self_.url.clone(); + let file_path = cx.argument_opt(0).map(|x| x.downcast::(&mut cx).unwrap().value(&mut cx)); + let c = Arc::clone(&self_.c); let rt = runtime(&mut cx)?; rt.spawn(async move { - 'retry: loop { - let client = get_client(url.clone()); - match client.await { - Ok(connection) => { - eprintln!("Connection successfully established {:?}", &url); - deferred.settle_with(&channel, move |mut cx| { - Ok(cx.undefined()) - }); - break 'retry - }, - Err(e) => { - eprintln!("Error connecting to server: {} with Error {}. Retrying...", &url, &e.to_string()); - std::thread::sleep(std::time::Duration::from_millis(1000)); - } + let mut c = c.lock().await; + let m = c.start_server(file_path).await; + deferred.settle_with((&channel), move |mut cx| { + if let Ok(_) = m { + Ok(cx.undefined()) + } else { + cx.throw_error("Error playing") } - } + }); }); Ok(promise) } @@ -428,73 +248,44 @@ impl Chidori { let channel = cx.channel(); let (deferred, promise) = cx.promise(); let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; + .downcast_or_throw::, _>(&mut cx)?; let branch = cx.argument::(0).unwrap_or(JsNumber::new(&mut cx, 0.0)).value(&mut cx) as u64; let frame = cx.argument::(1).unwrap_or(JsNumber::new(&mut cx, 0.0)).value(&mut cx) as u64; - - let file_id = self_.file_id.clone(); - let url = self_.url.clone(); - + let c = Arc::clone(&self_.c); let rt = runtime(&mut cx)?; rt.spawn(async move { - let mut client = if let Ok(mut client) = get_client(url).await { - client - } else { - deferred.settle_with(&channel, move |mut cx| { - cx.throw_error::<&str, JsUndefined>("Failed to connect to runtime service."); - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - let result = client.play(RequestAtFrame { - id: file_id, - frame, - branch, - }).await; - deferred.settle_with(&channel, move |mut cx| { - if let Ok(result) = result { - neon_serde3::to_value(&mut cx, &result.into_inner()) + let c = c.lock().await; + let m = c.play(branch, frame).await; + deferred.settle_with((&channel), move |mut cx| { + if let Ok(result) = m { + neon_serde3::to_value(&mut cx, &result) .or_else(|e| cx.throw_error(e.to_string())) } else { - cx.throw_error("Failed to play runtime.") + cx.throw_error("Error playing") } }); }); Ok(promise) + } fn pause(mut cx: FunctionContext) -> JsResult { let channel = cx.channel(); let (deferred, promise) = cx.promise(); let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; + .downcast_or_throw::, _>(&mut cx)?; let frame = cx.argument::(0).unwrap_or(JsNumber::new(&mut cx, 0.0)).value(&mut cx) as u64; - let file_id = self_.file_id.clone(); - let url = self_.url.clone(); - let branch = self_.current_branch.clone(); - + let c = Arc::clone(&self_.c); let rt = runtime(&mut cx)?; rt.spawn(async move { - let mut client = if let Ok(mut client) = get_client(url).await { - client - } else { - deferred.settle_with(&channel, move |mut cx| { - cx.throw_error::<&str, JsUndefined>("Failed to connect to runtime service."); - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - let result = client.pause(RequestAtFrame { - id: file_id, - frame, - branch, - }).await; - deferred.settle_with(&channel, move |mut cx| { - if let Ok(result) = result { - neon_serde3::to_value(&mut cx, &result.into_inner()) + let c = c.lock().await; + let m = c.pause(frame).await; + deferred.settle_with((&channel), move |mut cx| { + if let Ok(result) = m { + neon_serde3::to_value(&mut cx, &result) .or_else(|e| cx.throw_error(e.to_string())) } else { - cx.throw_error("Failed to play runtime.") + cx.throw_error("Error playing") } }); }); @@ -505,33 +296,20 @@ impl Chidori { let channel = cx.channel(); let (deferred, promise) = cx.promise(); let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; - let file_id = self_.file_id.clone(); - let url = self_.url.clone(); - let branch = self_.current_branch.clone(); + .downcast_or_throw::, _>(&mut cx)?; + let branch = cx.argument::(0).unwrap_or(JsNumber::new(&mut cx, 0.0)).value(&mut cx) as u64; + let frame = cx.argument::(1).unwrap_or(JsNumber::new(&mut cx, 0.0)).value(&mut cx) as u64; + let c = Arc::clone(&self_.c); let rt = runtime(&mut cx)?; rt.spawn(async move { - let mut client = if let Ok(mut client) = get_client(url).await { - client - } else { - deferred.settle_with(&channel, move |mut cx| { - cx.throw_error::<&str, JsUndefined>("Failed to connect to runtime service."); - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - let result = client.branch(RequestNewBranch { - id: file_id, - source_branch_id: branch, - diverges_at_counter: 0, - }).await; - // TODO: need to somehow handle writing to the current_branch - deferred.settle_with(&channel, move |mut cx| { - if let Ok(result) = result { - neon_serde3::to_value(&mut cx, &result.into_inner()) + let c = c.lock().await; + let m = c.branch(branch, frame).await; + deferred.settle_with((&channel), move |mut cx| { + if let Ok(result) = m { + neon_serde3::to_value(&mut cx, &result) .or_else(|e| cx.throw_error(e.to_string())) } else { - cx.throw_error("Failed to play runtime.") + cx.throw_error("Error playing") } }); }); @@ -542,37 +320,22 @@ impl Chidori { let channel = cx.channel(); let (deferred, promise) = cx.promise(); let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; + .downcast_or_throw::, _>(&mut cx)?; let query = cx.argument::(0).unwrap_or(JsString::new(&mut cx, "")).value(&mut cx); let branch = cx.argument::(1).unwrap_or(JsNumber::new(&mut cx, 0.0)).value(&mut cx) as u64; let frame = cx.argument::(2).unwrap_or(JsNumber::new(&mut cx, 0.0)).value(&mut cx) as u64; - let file_id = self_.file_id.clone(); - let url = self_.url.clone(); + + let c = Arc::clone(&self_.c); let rt = runtime(&mut cx)?; rt.spawn(async move { - let mut client = if let Ok(mut client) = get_client(url).await { - client - } else { - deferred.settle_with(&channel, move |mut cx| { - cx.throw_error::<&str, JsUndefined>("Failed to connect to runtime service."); - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - let result = client.run_query(QueryAtFrame { - id: file_id, - query: Some(Query { - query: Some(query) - }), - frame, - branch, - }).await; - deferred.settle_with(&channel, move |mut cx| { - if let Ok(result) = result { - neon_serde3::to_value(&mut cx, &result.into_inner()) + let c = c.lock().await; + let m = c.query(query, branch, frame).await; + deferred.settle_with((&channel), move |mut cx| { + if let Ok(result) = m { + neon_serde3::to_value(&mut cx, &result) .or_else(|e| cx.throw_error(e.to_string())) } else { - cx.throw_error("Failed to play runtime.") + cx.throw_error("Error playing") } }); }); @@ -583,29 +346,18 @@ impl Chidori { let channel = cx.channel(); let (deferred, promise) = cx.promise(); let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; - let file_id = self_.file_id.clone(); - let url = self_.url.clone(); + .downcast_or_throw::, _>(&mut cx)?; + let c = Arc::clone(&self_.c); let rt = runtime(&mut cx)?; rt.spawn(async move { - let mut client = if let Ok(mut client) = get_client(url).await { - client - } else { - deferred.settle_with(&channel, move |mut cx| { - cx.throw_error::<&str, JsUndefined>("Failed to connect to runtime service."); - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - let result = client.list_branches(RequestListBranches { - id: file_id, - }).await; - deferred.settle_with(&channel, move |mut cx| { - if let Ok(result) = result { - neon_serde3::to_value(&mut cx, &result.into_inner()) + let c = c.lock().await; + let m = c.list_branches().await; + deferred.settle_with((&channel), move |mut cx| { + if let Ok(result) = m { + neon_serde3::to_value(&mut cx, &result) .or_else(|e| cx.throw_error(e.to_string())) } else { - cx.throw_error("Failed to play runtime.") + cx.throw_error("Error playing") } }); }); @@ -616,82 +368,34 @@ impl Chidori { let channel = cx.channel(); let (deferred, promise) = cx.promise(); let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; - let file_id = self_.file_id.clone(); - let url = self_.url.clone(); - let branch = self_.current_branch.clone(); + .downcast_or_throw::, _>(&mut cx)?; + let branch = cx.argument::(0).unwrap_or(JsNumber::new(&mut cx, 0.0)).value(&mut cx) as u64; + let c = Arc::clone(&self_.c); let rt = runtime(&mut cx)?; rt.spawn(async move { - let mut client = if let Ok(mut client) = get_client(url).await { - client - } else { - deferred.settle_with(&channel, move |mut cx| { - cx.throw_error::<&str, JsUndefined>("Failed to connect to runtime service."); - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - let file = if let Ok(file) = client.current_file_state(RequestOnlyId { - id: file_id, - branch - }).await { - file - } else { - deferred.settle_with(&channel, move |mut cx| { - cx.throw_error::<&str, JsUndefined>("Failed to get current file state."); - Ok(cx.undefined()) - }); - panic!("Failed to get current file state."); - }; - let mut file = file.into_inner(); - let mut g = CleanedDefinitionGraph::zero(); - g.merge_file(&mut file).unwrap(); + let c = c.lock().await; + let r= c.display_graph_structure(branch).await; deferred.settle_with(&channel, move |mut cx| { - Ok(cx.string(g.get_dot_graph())) + if let Ok(r) = r { + Ok(cx.string(r)) + } else { + cx.throw_error("Error displaying graph structure") + } }); }); Ok(promise) } -// -// // TODO: some of these register handlers instead -// // TODO: list registered graphs should not stream -// // TODO: add a message that sends the current graph state -// - fn list_registered_graphs(mut cx: FunctionContext) -> JsResult { let channel = cx.channel(); let (deferred, promise) = cx.promise(); let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; - let file_id = self_.file_id.clone(); - let url = self_.url.clone(); + .downcast_or_throw::, _>(&mut cx)?; + let c = Arc::clone(&self_.c); let rt = runtime(&mut cx)?; rt.spawn(async move { - let mut client = if let Ok(mut client) = get_client(url).await { - client - } else { - deferred.settle_with(&channel, move |mut cx| { - cx.throw_error::<&str, JsUndefined>("Failed to connect to runtime service."); - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - let resp = if let Ok(resp) = client.list_registered_graphs(Empty { - }).await { - resp - } else { - deferred.settle_with(&channel, move |mut cx| { - cx.throw_error::<&str, JsUndefined>("Failed to get registered graph stream."); - Ok(cx.undefined()) - }); - panic!("Failed to get registered graph stream."); - }; - let mut stream = resp.into_inner(); - while let Some(x) = stream.next().await { - // callback.call(py, (x,), None); - info!("Registered Graph = {:?}", x); - }; + let c = c.lock().await; + let _ = c.list_registered_graphs().await; deferred.settle_with(&channel, move |mut cx| { Ok(cx.undefined()) }); @@ -753,6 +457,100 @@ impl Chidori { // // } // // + + + + fn register_custom_node_handle(mut cx: FunctionContext) -> JsResult { + let channel = cx.channel(); + let self_ = cx.this() + .downcast_or_throw::, _>(&mut cx)?; + + let function_name: String = cx.argument::(0)?.value(&mut cx); + let callback = cx.argument::(1)?.root(&mut cx); + + let h = callback.to_inner(&mut cx); + let callback = Arc::new(callback); + let c = Arc::clone(&self_.c); + + let rt = runtime(&mut cx)?; + rt.spawn(async move { + let mut c = c.lock().await; + c.register_custom_node_handle(function_name, Handler::new( + move |n| { + let channel_clone = channel.clone(); + let handler_clone = Arc::clone(&callback); + Box::pin(async move { + // TODO: clean this up, can't use ? + let (tx, mut rx) = mpsc::channel::(1); + if let Ok(_) = channel_clone.send(move |mut cx| { + if let Ok(v) = neon_serde3::to_value(&mut cx, &n) { + let js_function = JsFunction::new(&mut cx, move |mut cx| { + if let Ok(v) = cx.argument::(0) { + let value: Result = neon_serde3::from_value(&mut cx, v); + if let Ok(value) = value { + tx.blocking_send(value).unwrap(); + } + } + Ok(cx.undefined()) + })?; + let callback = handler_clone.to_inner(&mut cx); + let _: JsResult = callback.call_with(&mut cx).arg(v).arg(js_function).apply(&mut cx); + } + Ok(serde_json::Value::Null) + }).join() { + // block until we receive the result from the channel + if let Some(value) = rx.recv().await { + Ok(value) + } else { + Ok(serde_json::Value::Null) + } + } else { + Err(anyhow::anyhow!("Failed to send result")) + } + }) + } + )); + }); + Ok(cx.undefined().upcast()) + } + + + fn run_custom_node_loop(mut cx: FunctionContext) -> JsResult { + let channel = cx.channel(); + let (deferred, promise) = cx.promise(); + let self_ = cx.this() + .downcast_or_throw::, _>(&mut cx)?; + let c = Arc::clone(&self_.c); + let rt = runtime(&mut cx)?; + rt.spawn(async move { + let mut c = c.lock().await; + let _ = c.run_custom_node_loop().await; + deferred.settle_with((&channel), move |mut cx| { + Ok(cx.undefined()) + }); + + }); + // This promise is never resolved + Ok(promise) + } + + + +} + +struct NodeGraphBuilder { + g: Arc>, +} + +impl Finalize for NodeGraphBuilder {} + +impl NodeGraphBuilder { + fn js_new(mut cx: FunctionContext) -> JsResult> { + Ok(cx.boxed(NodeGraphBuilder { + g: Arc::new(Mutex::new(GraphBuilder::new())), + })) + } + // // TODO: need to figure out passing a buffer of bytes // // TODO: nodes that are added should return a clean definition of what their addition looks like // // TODO: adding a node should also display any errors @@ -789,12 +587,9 @@ impl Chidori { // // TODO: adding a node should also display any errors - fn prompt_node(mut cx: FunctionContext) -> JsResult { - let channel = cx.channel(); - let (deferred, promise) = cx.promise(); + fn prompt_node(mut cx: FunctionContext) -> JsResult>> { let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; - + .downcast_or_throw::, _>(&mut cx)?; let arg0 = cx.argument::(0)?; let arg0_value: PromptNodeCreateOpts = match neon_serde3::from_value(&mut cx, arg0) { Ok(value) => value, @@ -802,208 +597,17 @@ impl Chidori { return cx.throw_error(e.to_string()); } }; - - let queries: Vec> = if let Some(queries) = arg0_value.queries { - queries.into_iter().map(|q| { - if q == "None".to_string() { - None - } else { - Some(q) - } - }).collect() - } else { - vec![None] - }; - - let file_id = self_.file_id.clone(); - let url = self_.url.clone(); - let rt = runtime(&mut cx)?; - rt.spawn(async move { - let prompt_node = create_prompt_node( - arg0_value.name, - queries, - arg0_value.template, - arg0_value.model.unwrap_or("GPT_3_5_TURBO".to_string()), - arg0_value.output_tables.unwrap_or(vec![])); - let node = if let Ok(node) = prompt_node { - node - } else { - deferred.settle_with(&channel, move |mut cx| { - // TODO: throw error - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - - if let Ok(result ) = push_file_merge(&url, &file_id, node).await { - deferred.settle_with(&channel, move |mut cx| { - Ok(cx.boxed(RefCell::new(result))) - }); - } else { - deferred.settle_with(&channel, move |mut cx| { - // TODO: throw error - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - }); - Ok(promise) + let mut g = self_.g.blocking_lock(); + match g.prompt_node(arg0_value) { + Ok(result) => Ok(cx.boxed(RefCell::new(NodeNodeHandle::from(result)))), + Err(e) => cx.throw_error(e.to_string()) + } } - fn poll_local_code_node_execution(mut cx: FunctionContext) -> JsResult { - let channel = cx.channel(); - let (deferred, promise) = cx.promise(); + fn custom_node(mut cx: FunctionContext) -> JsResult>> { let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; - let file_id = self_.file_id.clone(); - let url = self_.url.clone(); - - let rt = runtime(&mut cx)?; - rt.spawn(async move { - let mut client = if let Ok(mut client) = get_client(url).await { - client - } else { - deferred.settle_with(&channel, move |mut cx| { - cx.throw_error::<&str, JsUndefined>("Failed to connect to runtime service."); - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - if let Ok(result) = client.poll_custom_node_will_execute_events(FilteredPollNodeWillExecuteEventsRequest { - id: file_id.clone(), - }).await { - debug!("poll_local_code_node_execution result = {:?}", result); - deferred.settle_with(&channel, move |mut cx| { - neon_serde3::to_value(&mut cx, &result.into_inner()) - .or_else(|e| cx.throw_error(e.to_string())) - }); - } else { - deferred.settle_with(&channel, move |mut cx| { - // TODO: throw error - Ok(cx.undefined()) - }); - }; - }); - Ok(promise) - } - fn ack_local_code_node_execution(mut cx: FunctionContext) -> JsResult { - let channel = cx.channel(); - let (deferred, promise) = cx.promise(); - let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; - let file_id = self_.file_id.clone(); - let url = self_.url.clone(); - let branch = cx.argument::(0)?.value(&mut cx) as u64; - let counter = cx.argument::(1)?.value(&mut cx) as u64; - let rt = runtime(&mut cx)?; - rt.spawn(async move { - let mut client = if let Ok(mut client) = get_client(url).await { - client - } else { - deferred.settle_with(&channel, move |mut cx| { - cx.throw_error::<&str, JsUndefined>("Failed to connect to runtime service."); - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - if let Ok(result) = client.ack_node_will_execute_event(RequestAckNodeWillExecuteEvent { - id: file_id.clone(), - branch, - counter, - }).await { - deferred.settle_with(&channel, move |mut cx| { - neon_serde3::to_value(&mut cx, &result.into_inner()) - .or_else(|e| cx.throw_error(e.to_string())) - }); - } else { - deferred.settle_with(&channel, move |mut cx| { - // TODO: throw error - Ok(cx.undefined()) - }); - } - }); - Ok(promise) - } - - fn respond_local_code_node_execution(mut cx: FunctionContext) -> JsResult { - let channel = cx.channel(); - let (deferred, promise) = cx.promise(); - let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; - let file_id = self_.file_id.clone(); - let url = self_.url.clone(); - - let branch = cx.argument::(0)?.value(&mut cx) as u64; - let counter = cx.argument::(1)?.value(&mut cx) as u64; - let node_name = cx.argument::(2)?.value(&mut cx); - - let response: Option> = match cx.argument_opt(0) { - Some(v) => Some(v.downcast_or_throw(&mut cx)), - None => None, - }; - - // TODO: need parent counters from the original change - // TODO: need source node - - let response_paths = if let Some(response) = response { - // TODO: need better error handling here - obj_to_paths(&mut cx, response.unwrap()).unwrap() - } else { - vec![] - }; - - let rt = runtime(&mut cx)?; - rt.spawn(async move { - let mut client = if let Ok(mut client) = get_client(url).await { - client - } else { - deferred.settle_with(&channel, move |mut cx| { - cx.throw_error::<&str, JsUndefined>("Failed to connect to runtime service."); - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - - // TODO: need to add the output table paths to these - let filled_values = response_paths.into_iter().map(|path| { - ChangeValue { - path: Some(Path { - address: path.0, - }), - value: Some(path.1), - branch, - } - }); - - // TODO: this needs to look more like a real change - client.push_worker_event(FileAddressedChangeValueWithCounter { - branch, - counter, - node_name, - id: file_id.clone(), - change: Some(ChangeValueWithCounter { - filled_values: filled_values.collect(), - parent_monotonic_counters: vec![], - monotonic_counter: counter, - branch, - source_node: "".to_string(), - }) - }).await.unwrap(); - }); - Ok(promise) - } - -// // } -// -// // TODO: handle dispatch to this handler - should accept a callback -// // https://github.com/PyO3/pyo3/issues/525 - fn custom_node(mut cx: FunctionContext) -> JsResult { - let channel = cx.channel(); - let (deferred, promise) = cx.promise(); - let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; - + .downcast_or_throw::, _>(&mut cx)?; let arg0 = cx.argument::(0)?; let arg0_value: CustomNodeCreateOpts = match neon_serde3::from_value(&mut cx, arg0) { Ok(value) => value, @@ -1011,53 +615,16 @@ impl Chidori { return cx.throw_error(e.to_string()); } }; - - let queries: Vec> = if let Some(queries) = arg0_value.queries { - queries.into_iter().map(|q| { - if q == "None".to_string() { - None - } else { - Some(q) - } - }).collect() - } else { - vec![] - }; - - let file_id = self_.file_id.clone(); - let url = self_.url.clone(); - let branch = self_.current_branch; - let rt = runtime(&mut cx)?; - rt.spawn(async move { - // Register the node with the system - let node = create_custom_node( - arg0_value.name, - queries, - arg0_value.output.unwrap_or("type O {}".to_string()), - arg0_value.node_type_name, - arg0_value.output_tables.unwrap_or(vec![]) - ); - if let Ok(result ) = push_file_merge(&url, &file_id, node).await { - deferred.settle_with(&channel, move |mut cx| { - Ok(cx.boxed(RefCell::new(result))) - }); - } else { - deferred.settle_with(&channel, move |mut cx| { - // TODO: throw error - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - }); - Ok(promise) + let mut g = self_.g.blocking_lock(); + match g.custom_node(arg0_value) { + Ok(result) => Ok(cx.boxed(RefCell::new(NodeNodeHandle::from(result)))), + Err(e) => cx.throw_error(e.to_string()) + } } - fn deno_code_node(mut cx: FunctionContext) -> JsResult { - let channel = cx.channel(); - let (deferred, promise) = cx.promise(); + fn deno_code_node(mut cx: FunctionContext) -> JsResult>> { let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; - + .downcast_or_throw::, _>(&mut cx)?; let arg0 = cx.argument::(0)?; let arg0_value: DenoCodeNodeCreateOpts = match neon_serde3::from_value(&mut cx, arg0) { Ok(value) => value, @@ -1065,53 +632,16 @@ impl Chidori { return cx.throw_error(e.to_string()); } }; - - let queries: Vec> = if let Some(queries) = arg0_value.queries { - queries.into_iter().map(|q| { - if q == "None".to_string() { - None - } else { - Some(q) - } - }).collect() - } else { - vec![None] - }; - - let file_id = self_.file_id.clone(); - let url = self_.url.clone(); - let branch = self_.current_branch; - - let rt = runtime(&mut cx)?; - rt.spawn(async move { - let node = create_code_node( - arg0_value.name, - queries, - arg0_value.output.unwrap_or("type O {}".to_string()), - SourceNodeType::Code("DENO".to_string(), arg0_value.code, arg0_value.is_template.unwrap_or(false)), - arg0_value.output_tables.unwrap_or(vec![]) - ); - if let Ok(result ) = push_file_merge(&url, &file_id, node).await { - deferred.settle_with(&channel, move |mut cx| { - Ok(cx.boxed(RefCell::new(result))) - }); - } else { - deferred.settle_with(&channel, move |mut cx| { - // TODO: throw error - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - }); - Ok(promise) + let mut g = self_.g.blocking_lock(); + match g.deno_code_node(arg0_value) { + Ok(result) => Ok(cx.boxed(RefCell::new(NodeNodeHandle::from(result)))), + Err(e) => cx.throw_error(e.to_string()) + } } - fn vector_memory_node(mut cx: FunctionContext) -> JsResult { - let channel = cx.channel(); - let (deferred, promise) = cx.promise(); + fn vector_memory_node(mut cx: FunctionContext) -> JsResult>> { let self_ = cx.this() - .downcast_or_throw::, _>(&mut cx)?; - + .downcast_or_throw::, _>(&mut cx)?; let arg0 = cx.argument::(0)?; let arg0_value: VectorMemoryNodeCreateOpts = match neon_serde3::from_value(&mut cx, arg0) { Ok(value) => value, @@ -1119,56 +649,39 @@ impl Chidori { return cx.throw_error(e.to_string()); } }; + let mut g = self_.g.blocking_lock(); + match g.vector_memory_node(arg0_value) { + Ok(result) => Ok(cx.boxed(RefCell::new(NodeNodeHandle::from(result)))), + Err(e) => cx.throw_error(e.to_string()) + } + } - let queries: Vec> = if let Some(queries) = arg0_value.queries { - queries.into_iter().map(|q| { - if q == "None".to_string() { - None - } else { - Some(q) - } - }).collect() - } else { - vec![] - }; - let file_id = self_.file_id.clone(); - let url = self_.url.clone(); - let branch = self_.current_branch; + + fn commit(mut cx: FunctionContext) -> JsResult { + let channel = cx.channel(); + let (deferred, promise) = cx.promise(); + let self_ = cx.this() + .downcast_or_throw::, _>(&mut cx)?; + let node_chidori = cx.argument::>(0)?; + let branch = cx.argument::(1).unwrap_or(JsNumber::new(&mut cx, 0.0)).value(&mut cx) as u64; + + let c = Arc::clone(&node_chidori.c); + let g = Arc::clone(&self_.g); + let rt = runtime(&mut cx)?; rt.spawn(async move { - let node = create_vector_memory_node( - arg0_value.name, - queries, - arg0_value.output.unwrap_or("type O {}".to_string()), - arg0_value.action.unwrap_or("READ".to_string()), - arg0_value.embedding_model.unwrap_or("TEXT_EMBEDDING_ADA_002".to_string()), - arg0_value.template.unwrap_or("".to_string()), - arg0_value.db_vendor.unwrap_or("QDRANT".to_string()), - arg0_value.collection_name, - arg0_value.output_tables.unwrap_or(vec![]) - ); - let node = if let Ok(node) = node { - node - } else { - deferred.settle_with(&channel, move |mut cx| { - // TODO: throw error - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; - - if let Ok(result ) = push_file_merge(&url, &file_id, node).await { - deferred.settle_with(&channel, move |mut cx| { - Ok(cx.boxed(RefCell::new(result))) - }); - } else { - deferred.settle_with(&channel, move |mut cx| { - // TODO: throw error - Ok(cx.undefined()) - }); - panic!("Failed to connect to runtime service."); - }; + let mut graph_builder = g.lock().await; + let mut chidori = c.lock().await; + let m = graph_builder.commit(&mut chidori, branch).await; + deferred.settle_with((&channel), move |mut cx| { + if let Ok(result) = m { + neon_serde3::to_value(&mut cx, &result) + .or_else(|e| cx.throw_error(e.to_string())) + } else { + cx.throw_error("Error playing") + } + }); }); Ok(promise) } @@ -1203,22 +716,24 @@ fn neon_simple_fun(mut cx: FunctionContext) -> JsResult { #[neon::main] fn main(mut cx: ModuleContext) -> NeonResult<()> { env_logger::init(); - cx.export_function("nodehandleDebugExample", NodeHandle::js_debug_example)?; - cx.export_function("nodehandleRunWhen", NodeHandle::run_when)?; - cx.export_function("nodehandleQuery", NodeHandle::query)?; - cx.export_function("chidoriNew", Chidori::js_new)?; - cx.export_function("chidoriStartServer", Chidori::start_server)?; - cx.export_function("chidoriPlay", Chidori::play)?; - cx.export_function("chidoriPause", Chidori::pause)?; - cx.export_function("chidoriBranch", Chidori::branch)?; - cx.export_function("chidoriQuery", Chidori::query)?; - cx.export_function("chidoriGraphStructure", Chidori::display_graph_structure)?; - cx.export_function("chidoriCustomNode", Chidori::custom_node)?; - cx.export_function("chidoriDenoCodeNode", Chidori::deno_code_node)?; - cx.export_function("chidoriVectorMemoryNode", Chidori::vector_memory_node)?; - cx.export_function("chidoriPollLocalCodeNodeExecution", Chidori::poll_local_code_node_execution)?; - cx.export_function("chidoriAckLocalCodeNodeExecution", Chidori::ack_local_code_node_execution)?; - cx.export_function("chidoriRespondLocalCodeNodeExecution", Chidori::respond_local_code_node_execution)?; + cx.export_function("nodehandleRunWhen", NodeNodeHandle::run_when)?; + // cx.export_function("nodehandleQuery", NodeNodeHandle::query)?; + cx.export_function("chidoriNew", NodeChidori::js_new)?; + cx.export_function("chidoriStartServer", NodeChidori::start_server)?; + cx.export_function("chidoriPlay", NodeChidori::play)?; + cx.export_function("chidoriPause", NodeChidori::pause)?; + cx.export_function("chidoriBranch", NodeChidori::branch)?; + cx.export_function("chidoriQuery", NodeChidori::query)?; + cx.export_function("chidoriGraphStructure", NodeChidori::display_graph_structure)?; + cx.export_function("chidoriRegisterCustomNodeHandle", NodeChidori::register_custom_node_handle)?; + cx.export_function("chidoriRunCustomNodeLoop", NodeChidori::run_custom_node_loop)?; + + cx.export_function("graphbuilderNew", NodeGraphBuilder::js_new)?; + cx.export_function("graphbuilderCustomNode", NodeGraphBuilder::custom_node)?; + cx.export_function("graphbuilderDenoCodeNode", NodeGraphBuilder::deno_code_node)?; + cx.export_function("graphbuilderPromptNode", NodeGraphBuilder::prompt_node)?; + cx.export_function("graphbuilderVectorMemoryNode", NodeGraphBuilder::vector_memory_node)?; + cx.export_function("graphbuilderCommit", NodeGraphBuilder::commit)?; cx.export_function("simpleFun", neon_simple_fun)?; Ok(()) } \ No newline at end of file diff --git a/toolchain/chidori/src/translations/python.rs b/toolchain/chidori/src/translations/python.rs index 4ea2ab9..7471c4e 100644 --- a/toolchain/chidori/src/translations/python.rs +++ b/toolchain/chidori/src/translations/python.rs @@ -464,26 +464,6 @@ struct PyChidori { url: String } - - -// async fn push_file_merge(url: &String, file_id: &String, node: Item) -> Result { -// let mut client = get_client(url.clone()).await?; -// let exec_status = client.merge(RequestFileMerge { -// id: file_id.clone(), -// file: Some(File { -// nodes: vec![node.clone()], -// ..Default::default() -// }), -// branch: 0, -// }).await.map_err(PyErrWrapper::from)?.into_inner(); -// Ok(NodeHandle::from( -// url.clone(), -// file_id.clone(), -// node, -// exec_status -// ).map_err(AnyhowErrWrapper)?) -// } - // TODO: internally all operations should have an assigned counter // we can keep the actual target counter hidden from the host sdk #[pymethods] diff --git a/toolchain/chidori/src/translations/rust.rs b/toolchain/chidori/src/translations/rust.rs index 2294630..6be915d 100644 --- a/toolchain/chidori/src/translations/rust.rs +++ b/toolchain/chidori/src/translations/rust.rs @@ -142,6 +142,18 @@ impl Chidori { Ok(result.into_inner()) } + pub async fn branch( &self, branch: u64, frame: u64, ) -> anyhow::Result { + let file_id = self.file_id.clone(); + let url = self.url.clone(); + let mut client = get_client(url).await?; + let result = client.branch(RequestNewBranch { + id: file_id, + source_branch_id: branch, + diverges_at_counter: frame + }).await?; + Ok(result.into_inner()) + } + pub async fn list_branches( &self) -> anyhow::Result { let file_id = self.file_id.clone(); let url = self.url.clone(); @@ -323,6 +335,7 @@ impl Chidori { if let Some(x) = self.custom_node_handlers.get(&ev.custom_node_type_name.clone().unwrap()) { self.ack_local_code_node_execution(*branch, *counter).await?; let result = (x.as_ref().callback)(ev.clone()).await?; + dbg!(&result); self.respond_local_code_node_execution(*branch, *counter, node_name.clone(), result).await?; } } @@ -356,6 +369,16 @@ impl Default for PromptNodeCreateOpts { } } } +impl PromptNodeCreateOpts { + pub fn merge(&mut self, other: PromptNodeCreateOpts) { + self.name = other.name; + self.queries = other.queries.or(self.queries.take()); + self.output_tables = other.output_tables.or(self.output_tables.take()); + self.template = other.template; + self.model = other.model.or(self.model.take()); + } +} + #[derive(serde::Serialize, serde::Deserialize)] @@ -379,6 +402,17 @@ impl Default for CustomNodeCreateOpts { } } } +impl CustomNodeCreateOpts { + pub fn merge(&mut self, other: CustomNodeCreateOpts) { + self.name = other.name; + self.queries = other.queries.or(self.queries.take()); + self.output_tables = other.output_tables.or(self.output_tables.take()); + self.output = other.output.or(self.output.take()); + self.node_type_name = other.node_type_name; + } +} + + #[derive(serde::Serialize, serde::Deserialize)] pub struct DenoCodeNodeCreateOpts { @@ -402,7 +436,17 @@ impl Default for DenoCodeNodeCreateOpts { } } } - + +impl DenoCodeNodeCreateOpts { + pub fn merge(&mut self, other: DenoCodeNodeCreateOpts) { + self.name = other.name; + self.queries = other.queries.or(self.queries.take()); + self.output_tables = other.output_tables.or(self.output_tables.take()); + self.output = other.output.or(self.output.take()); + self.code = other.code; + self.is_template = other.is_template.or(self.is_template.take()); + } +} #[derive(serde::Serialize, serde::Deserialize)] pub struct VectorMemoryNodeCreateOpts { @@ -435,6 +479,20 @@ impl Default for VectorMemoryNodeCreateOpts { } +impl VectorMemoryNodeCreateOpts { + pub fn merge(&mut self, other: VectorMemoryNodeCreateOpts) { + self.name = other.name; + self.queries = other.queries.or(self.queries.take()); + self.output_tables = other.output_tables.or(self.output_tables.take()); + self.output = other.output.or(self.output.take()); + self.template = other.template.or(self.template.take()); + self.action = other.action.or(self.action.take()); + self.embedding_model = other.embedding_model.or(self.embedding_model.take()); + self.db_vendor = other.db_vendor.or(self.db_vendor.take()); + self.collection_name = other.collection_name; + } +} + fn remap_queries(queries: Option>) -> Vec> { let queries: Vec> = if let Some(queries) = queries { queries.into_iter().map(|q| { @@ -462,23 +520,27 @@ impl GraphBuilder { } } pub fn prompt_node(&mut self, arg: PromptNodeCreateOpts) -> anyhow::Result { + let mut def = PromptNodeCreateOpts::default(); + def.merge(arg); let node = create_prompt_node( - arg.name.clone(), - remap_queries(arg.queries), - arg.template, - arg.model.unwrap_or("GPT_3_5_TURBO".to_string()), - arg.output_tables.unwrap_or(vec![]))?; + def.name.clone(), + remap_queries(def.queries), + def.template, + def.model.unwrap_or("GPT_3_5_TURBO".to_string()), + def.output_tables.unwrap_or(vec![]))?; self.clean_graph.merge_file(&File { nodes: vec![node.clone()], ..Default::default() })?; Ok(NodeHandle::from(node)?) } pub fn custom_node(&mut self, arg: CustomNodeCreateOpts) -> anyhow::Result { + let mut def = CustomNodeCreateOpts::default(); + def.merge(arg); let node = create_custom_node( - arg.name.clone(), - remap_queries(arg.queries.clone()), - arg.output.unwrap_or("type O {}".to_string()), - arg.node_type_name, - arg.output_tables.unwrap_or(vec![]) + def.name.clone(), + remap_queries(def.queries.clone()), + def.output.unwrap_or("type O {}".to_string()), + def.node_type_name, + def.output_tables.unwrap_or(vec![]) ); self.clean_graph.merge_file(&File { nodes: vec![node.clone()], ..Default::default() })?; Ok(NodeHandle::from(node)?) @@ -486,12 +548,14 @@ impl GraphBuilder { pub fn deno_code_node(&mut self, arg: DenoCodeNodeCreateOpts) -> anyhow::Result { + let mut def = DenoCodeNodeCreateOpts::default(); + def.merge(arg); let node = create_code_node( - arg.name.clone(), - remap_queries(arg.queries.clone()), - arg.output.unwrap_or("type O {}".to_string()), - SourceNodeType::Code("DENO".to_string(), arg.code, arg.is_template.unwrap_or(false)), - arg.output_tables.unwrap_or(vec![]) + def.name.clone(), + remap_queries(def.queries.clone()), + def.output.unwrap_or("type O {}".to_string()), + SourceNodeType::Code("DENO".to_string(), def.code, def.is_template.unwrap_or(false)), + def.output_tables.unwrap_or(vec![]) ); self.clean_graph.merge_file(&File { nodes: vec![node.clone()], ..Default::default() })?; Ok(NodeHandle::from(node)?) @@ -499,16 +563,18 @@ impl GraphBuilder { pub fn vector_memory_node(&mut self, arg: VectorMemoryNodeCreateOpts) -> anyhow::Result { + let mut def = VectorMemoryNodeCreateOpts::default(); + def.merge(arg); let node = create_vector_memory_node( - arg.name.clone(), - remap_queries(arg.queries.clone()), - arg.output.unwrap_or("type O {}".to_string()), - arg.action.unwrap_or("READ".to_string()), - arg.embedding_model.unwrap_or("TEXT_EMBEDDING_ADA_002".to_string()), - arg.template.unwrap_or("".to_string()), - arg.db_vendor.unwrap_or("QDRANT".to_string()), - arg.collection_name, - arg.output_tables.unwrap_or(vec![]) + def.name.clone(), + remap_queries(def.queries.clone()), + def.output.unwrap_or("type O {}".to_string()), + def.action.unwrap_or("READ".to_string()), + def.embedding_model.unwrap_or("TEXT_EMBEDDING_ADA_002".to_string()), + def.template.unwrap_or("".to_string()), + def.db_vendor.unwrap_or("QDRANT".to_string()), + def.collection_name, + def.output_tables.unwrap_or(vec![]) )?; self.clean_graph.merge_file(&File { nodes: vec![node.clone()], ..Default::default() })?; Ok(NodeHandle::from(node)?) @@ -583,7 +649,7 @@ impl GraphBuilder { // Node handle #[derive(Clone)] pub struct NodeHandle { - node: Item, + pub node: Item, indiv: CleanIndividualNode } diff --git a/toolchain/chidori/tests/nodejs/chidori.test.js b/toolchain/chidori/tests/nodejs/chidori.test.js index 54bbe9f..93480a0 100644 --- a/toolchain/chidori/tests/nodejs/chidori.test.js +++ b/toolchain/chidori/tests/nodejs/chidori.test.js @@ -1,4 +1,4 @@ -const {Chidori} = require("../.."); +const {Chidori, GraphBuilder} = require("../.."); async function delay(ms) { // Returns a promise that resolves after "ms" milliseconds @@ -7,48 +7,35 @@ async function delay(ms) { test('initialize without error', () => { - expect(new Chidori("1", "localhost:9800")).toEqual({"chi": {}}); + expect(new Chidori("1", "http://localhost:9800")).toEqual({"chi": {}}); }); -test('coerce to and from structure', async () => { - const chi = new Chidori("1", "http://localhost:9800"); - chi.startServer() - expect(chi.objectInterface({ - "id": "1", - "monotonic_counter": 0, - "branch": 0 - })).toEqual({ - "id": "1", - "monotonic_counter": 0, - "branch": 0 - }); -}); - - test('start server', async () => { const chi = new Chidori("21", "http://127.0.0.1:9800"); await chi.startServer(); await chi.play(0,0); - await chi.denoCodeNode({ + const g = new GraphBuilder(); + g.denoCodeNode({ name: "InspirationalQuote", code: `return {"promptResult": "Believe"}`, output: `type InspirationalQuote { promptResult: String }`, is_template: true }); - await chi.denoCodeNode({ + g.denoCodeNode({ name: "CodeNode", queries: ["query Q { InspirationalQuote { promptResult } }"], code: `return {"output": "Here is your quote for " + \`{{InspirationalQuote.promptResult}}\` }`, output: `type CodeNode { output: String }`, is_template: true }); + g.commit(chi, 0); await delay(1000); - console.log(await chi.graphStructure()) - expect((await chi.query(` - query Q { InspirationalQuote { promptResult } } - `, 0, 100))["values"].length).toBe(1) - console.log(await chi.query(` - query Q { CodeNode { output } } - `, 0, 100)) + console.log(await chi.graphStructure(0)) + // expect((await chi.query(` + // query Q { InspirationalQuote { promptResult } } + // `, 0, 100))["values"].length).toBe(1) + // console.log(await chi.query(` + // query Q { CodeNode { output } } + // `, 0, 100)) }); diff --git a/toolchain/chidori/tests/nodejs/nodeHandle.test.js b/toolchain/chidori/tests/nodejs/nodeHandle.test.js index 4c2b49e..6684a38 100644 --- a/toolchain/chidori/tests/nodejs/nodeHandle.test.js +++ b/toolchain/chidori/tests/nodejs/nodeHandle.test.js @@ -1,17 +1,15 @@ -const {simpleFun, NodeHandle} = require("../.."); -const {Chidori} = require("../../package_node"); +const {Chidori, GraphBuilder} = require("../../package_node"); -test('sdk', () => { - expect(simpleFun("1")).toBe("1"); -}); test('adds 1 + 2 to equal 3', () => { - expect(new NodeHandle()).toEqual({"nh": {}}); + const gb = new GraphBuilder(); + expect(gb.denoCodeNode({name: "test", code: "return 1+1"})).toEqual({"nh": {}}); }); test('nodehandle query', async () => { const chi = new Chidori("1", "http://localhost:9800"); await chi.startServer(); - const nh = new NodeHandle(); - expect(await nh.query(0, 0)).toEqual({}); + const gb = new GraphBuilder(); + const nh = gb.denoCodeNode({name: "test", code: "return 1+1"}); + // expect(await nh.query(0, 0)).toEqual({}); }); diff --git a/toolchain/examples/nodejs/index.js b/toolchain/examples/nodejs/index.js new file mode 100644 index 0000000..f82c42e --- /dev/null +++ b/toolchain/examples/nodejs/index.js @@ -0,0 +1,101 @@ +const axios = require('axios'); +const {Chidori, GraphBuilder} = require("@1kbirds/chidori"); + +class Story { + constructor(title, url, score) { + this.title = title; + this.url = url; + this.score = score; + } +} + +const HN_URL_TOP_STORIES = "https://hacker-news.firebaseio.com/v0/topstories.json?print=pretty"; + +function fetchStory(id) { + return axios.get(`https://hacker-news.firebaseio.com/v0/item/${id}.json?print=pretty`) + .then(response => response.data); +} + +function fetchHN() { + return axios.get(HN_URL_TOP_STORIES) + .then(response => { + const storyIds = response.data; + const tasks = storyIds.slice(0, 30).map(id => fetchStory(id)); // Limit to 30 stories + return Promise.all(tasks) + .then(stories => { + return stories.map(story => { + const { title, url, score } = story; + return new Story(title, url, score); + }); + }); + }); +} + +class ChidoriWorker { + constructor() { + this.c = new Chidori("0", "http://localhost:9800"); // Assuming this is a connection object, replaced with an empty object for now + } + + async buildGraph() { + const g = new GraphBuilder(); + + const h = g.customNode({ + name: "FetchTopHN", + nodeTypeName: "FetchTopHN", + output: "type FetchTopHN { output: String }" + }); + + const hInterpret = g.promptNode({ + name: "InterpretTheGroup", + template: ` + Based on the following list of HackerNews threads, + filter this list to only launches of new AI projects: {{FetchTopHN.output}} + ` + }); + hInterpret.runWhen(g, h); + + const hFormatAndRank = g.promptNode({ + name: "FormatAndRank", + template: ` + Format this list of new AI projects in markdown, ranking the most + interesting projects from most interesting to least. + + {{InterpretTheGroup.promptResult}} + ` + }); + hFormatAndRank.runWhen(g, hInterpret); + + await g.commit(this.c, 0) + } + + async run() { + // Construct the agent graph + await this.buildGraph(); + + // Start graph execution from the root + // Implement the functionality of the play function + await this.c.play(0, 0); + + // Run the node execution loop + // Implement the functionality of the run_custom_node_loop function + await this.c.runCustomNodeLoop() + } +} + + +async function handleFetchHN(nodeWillExec, cb) { + const stories = await fetchHN(); + // return JSON.stringify(stories); + return cb({ "output": JSON.stringify(stories) }); + // return ; +} + +async function main() { + let w = new ChidoriWorker(); + await w.c.startServer(":memory:") + await w.c.registerCustomNodeHandle("FetchTopHN", handleFetchHN); + await w.run() +} + + +main(); diff --git a/toolchain/examples/nodejs/package-lock.json b/toolchain/examples/nodejs/package-lock.json new file mode 100644 index 0000000..f0d2347 --- /dev/null +++ b/toolchain/examples/nodejs/package-lock.json @@ -0,0 +1,129 @@ +{ + "name": "top-ai-launches-hn", + "version": "1.0.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "top-ai-launches-hn", + "version": "1.0.0", + "license": "ISC", + "dependencies": { + "axios": "^1.4.0" + } + }, + "../../chidori": { + "name": "@1kbirds/chidori", + "version": "0.1.7", + "cpu": [ + "x64", + "ia32", + "arm64" + ], + "extraneous": true, + "license": "MIT", + "os": [ + "darwin", + "linux", + "win32" + ], + "dependencies": { + "@mapbox/node-pre-gyp": "^1.0.8" + }, + "devDependencies": { + "cargo-cp-artifact": "^0.1.8", + "jest": "^29.6.1" + } + }, + "node_modules/asynckit": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", + "integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==" + }, + "node_modules/axios": { + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.4.0.tgz", + "integrity": "sha512-S4XCWMEmzvo64T9GfvQDOXgYRDJ/wsSZc7Jvdgx5u1sd0JwsuPLqb3SYmusag+edF6ziyMensPVqLTSc1PiSEA==", + "dependencies": { + "follow-redirects": "^1.15.0", + "form-data": "^4.0.0", + "proxy-from-env": "^1.1.0" + } + }, + "node_modules/combined-stream": { + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz", + "integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==", + "dependencies": { + "delayed-stream": "~1.0.0" + }, + "engines": { + "node": ">= 0.8" + } + }, + "node_modules/delayed-stream": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", + "integrity": "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==", + "engines": { + "node": ">=0.4.0" + } + }, + "node_modules/follow-redirects": { + "version": "1.15.2", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.2.tgz", + "integrity": "sha512-VQLG33o04KaQ8uYi2tVNbdrWp1QWxNNea+nmIB4EVM28v0hmP17z7aG1+wAkNzVq4KeXTq3221ye5qTJP91JwA==", + "funding": [ + { + "type": "individual", + "url": "https://github.com/sponsors/RubenVerborgh" + } + ], + "engines": { + "node": ">=4.0" + }, + "peerDependenciesMeta": { + "debug": { + "optional": true + } + } + }, + "node_modules/form-data": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz", + "integrity": "sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==", + "dependencies": { + "asynckit": "^0.4.0", + "combined-stream": "^1.0.8", + "mime-types": "^2.1.12" + }, + "engines": { + "node": ">= 6" + } + }, + "node_modules/mime-db": { + "version": "1.52.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz", + "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==", + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/mime-types": { + "version": "2.1.35", + "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz", + "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==", + "dependencies": { + "mime-db": "1.52.0" + }, + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/proxy-from-env": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", + "integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==" + } + } +} diff --git a/toolchain/examples/nodejs/package.json b/toolchain/examples/nodejs/package.json new file mode 100644 index 0000000..0f5b790 --- /dev/null +++ b/toolchain/examples/nodejs/package.json @@ -0,0 +1,14 @@ +{ + "name": "top-ai-launches-hn", + "version": "1.0.0", + "description": "", + "main": "index.js", + "scripts": { + "start": "node index.js" + }, + "author": "", + "license": "ISC", + "dependencies": { + "axios": "^1.4.0" + } +} diff --git a/toolchain/examples/python/top-ai-launches-hn.py b/toolchain/examples/python/top-ai-launches-hn.py index a126b56..ebdeec4 100644 --- a/toolchain/examples/python/top-ai-launches-hn.py +++ b/toolchain/examples/python/top-ai-launches-hn.py @@ -33,9 +33,8 @@ async def fetch_hn() -> List[Story]: stories_out = [] for story in stories: - for k in ('title', 'url', 'score'): - stories_out.append(Story(**dict((k, story.get(k, None))))) - + story_dict = {k: story.get(k, None) for k in ('title', 'url', 'score')} + stories_out.append(Story(**story_dict)) return stories_out @@ -45,7 +44,6 @@ async def fetch_hn() -> List[Story]: class ChidoriWorker: def __init__(self): self.c = Chidori("0", "http://localhost:9800") - self.staged_custom_nodes = [] async def build_graph(self): g = GraphBuilder() diff --git a/toolchain/scripts/build_nodejs_packages.sh b/toolchain/scripts/build_nodejs_packages.sh new file mode 100644 index 0000000..3ff193a --- /dev/null +++ b/toolchain/scripts/build_nodejs_packages.sh @@ -0,0 +1,7 @@ +#!/bin/bash +set -euo pipefail + +cd ./chidori +npm run build +npm run test-js +cd - \ No newline at end of file