Skip to content

Commit

Permalink
updated ui
Browse files Browse the repository at this point in the history
  • Loading branch information
kvey committed Sep 20, 2024
1 parent 8cec071 commit 0086d78
Show file tree
Hide file tree
Showing 9 changed files with 399 additions and 251 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "0.1.0"
description = "Add your description here"
dependencies = [
"litellm[proxy]>=1.37.17",
"pydantic>=2.8.2",
]
readme = "README.md"
requires-python = ">= 3.8"
Expand Down
2 changes: 2 additions & 0 deletions toolchain/chidori-core/src/cells/code_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ pub fn code_cell(execution_state_id: ExecutionNodeId, cell: &CodeCell, range: &T
let s = s.clone();
let cell = cell.clone();
async move {
println!("Should be running source_code_run_deno");
let result = crate::library::std::code::runtime_deno::source_code_run_deno(
&s,
&cell.source_code,
&x,
&cell.function_invocation,
).await?;
println!("After the evaluation of running source_code_run_deno");
Ok(OperationFnOutput {
has_error: false,
execution_state: None,
Expand Down
49 changes: 34 additions & 15 deletions toolchain/chidori-core/src/execution/execution/execution_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::sync::{Mutex};
use std::thread::{JoinHandle};
use std::time::Duration;
use anyhow::anyhow;
use dashmap::DashMap;
use futures_util::FutureExt;

use crate::execution::primitives::identifiers::{DependencyReference, OperationId};
Expand Down Expand Up @@ -87,7 +88,7 @@ pub struct ExecutionGraph {
execution_graph: Arc<Mutex<ExecutionGraphDiGraphSet>>,

// TODO: move to just using the digraph for this
pub(crate) execution_node_id_to_state: Arc<Mutex<HashMap<ExecutionNodeId, ExecutionStateEvaluation>>>,
pub(crate) execution_node_id_to_state: Arc<DashMap<ExecutionNodeId, ExecutionStateEvaluation>>,

/// Sender channel for sending messages to the execution graph
graph_mutation_sender: tokio::sync::mpsc::Sender<ExecutionGraphSendPayload>,
Expand Down Expand Up @@ -136,7 +137,7 @@ impl ExecutionGraph {
println!("Initializing ExecutionGraph");
let (sender, mut receiver) = tokio::sync::mpsc::channel::<ExecutionGraphSendPayload>(1028);

let mut state_id_to_state = HashMap::new();
let mut state_id_to_state = DashMap::new();

// Initialization of the execution graph at Uuid::nil - this is always the root of the execution graph
let init_id = Uuid::nil();
Expand All @@ -150,7 +151,7 @@ impl ExecutionGraph {
let execution_graph_clone = execution_graph.clone();

// Mapping of state_ids to state
let mut state_id_to_state = Arc::new(Mutex::new(state_id_to_state));
let mut state_id_to_state = Arc::new(state_id_to_state);
let state_id_to_state_clone = state_id_to_state.clone();

// Notification of successful startup
Expand Down Expand Up @@ -203,30 +204,30 @@ impl ExecutionGraph {

// Pushing this state into the graph
let mut execution_graph = execution_graph_clone.lock().unwrap();
let mut state_id_to_state = state_id_to_state_clone.lock().unwrap();
let mut state_id_to_state = state_id_to_state_clone.clone();

match resulting_execution_state {
ExecutionStateEvaluation::Error(state) => {
let resulting_state_id = state.id;
state_id_to_state.deref_mut().insert(resulting_state_id.clone(), s.clone());
state_id_to_state.insert(resulting_state_id.clone(), s.clone());
execution_graph.deref_mut()
.add_edge(state.parent_state_id, resulting_state_id.clone(), s);
}
ExecutionStateEvaluation::EvalFailure(id) => {
state_id_to_state.deref_mut().insert(id.clone(), s.clone());
state_id_to_state.insert(id.clone(), s.clone());
}
ExecutionStateEvaluation::Complete(state) => {
// println!("Adding to execution state!!!! {:?}", state.id);
let resulting_state_id = state.id;
state_id_to_state.deref_mut().insert(resulting_state_id.clone(), s.clone());
state_id_to_state.insert(resulting_state_id.clone(), s.clone());
execution_graph.deref_mut()
.add_edge(state.parent_state_id, resulting_state_id.clone(), s);
}
ExecutionStateEvaluation::Executing(state) => {
let resulting_state_id = state.id;
if let Some(ExecutionStateEvaluation::Complete(_)) = state_id_to_state.deref().get(&resulting_state_id) {
if let Some(ExecutionStateEvaluation::Complete(_)) = state_id_to_state.get(&resulting_state_id).map(|x| x.clone()) {
} else {
state_id_to_state.deref_mut().insert(resulting_state_id.clone(), s.clone());
state_id_to_state.insert(resulting_state_id.clone(), s.clone());
execution_graph.deref_mut()
.add_edge(state.parent_state_id, resulting_state_id.clone(), s);
}
Expand Down Expand Up @@ -310,8 +311,8 @@ impl ExecutionGraph {
}

pub fn get_state_at_id(&self, id: ExecutionNodeId) -> Option<ExecutionStateEvaluation> {
let state_id_to_state = self.execution_node_id_to_state.lock().unwrap();
state_id_to_state.get(&id).cloned()
let state_id_to_state = self.execution_node_id_to_state.clone();
state_id_to_state.get(&id).map(|x| x.clone())
}

#[tracing::instrument]
Expand All @@ -336,7 +337,6 @@ impl ExecutionGraph {

let mut merged_state = HashMap::new();
for predecessor in queue {
println!("Getting state {:?}", &predecessor);
let state = self.get_state_at_id(predecessor).unwrap();
if let ExecutionStateEvaluation::Complete(state) = state {
for (k, v) in state.state.iter() {
Expand All @@ -357,9 +357,9 @@ impl ExecutionGraph {
}

#[tracing::instrument]
fn progress_graph(&mut self, new_state: ExecutionStateEvaluation) -> ExecutionNodeId {
pub fn progress_graph(&mut self, new_state: ExecutionStateEvaluation) -> ExecutionNodeId {
let mut execution_graph = self.execution_graph.lock().unwrap();
let mut state_id_to_state = self.execution_node_id_to_state.lock().unwrap();
let mut state_id_to_state = self.execution_node_id_to_state.clone();
let (parent_id, resulting_state_id ) = match &new_state {
ExecutionStateEvaluation::Complete(state) => {
(state.parent_state_id, state.id)
Expand All @@ -372,7 +372,7 @@ impl ExecutionGraph {
};
println!("Resulting state received from progress_graph {:?}", &resulting_state_id);
// TODO: if state already exists how to handle
state_id_to_state.deref_mut().insert(resulting_state_id.clone(), new_state.clone());
state_id_to_state.insert(resulting_state_id.clone(), new_state.clone());
execution_graph.deref_mut()
.add_edge(parent_id, resulting_state_id.clone(), new_state.clone());
resulting_state_id
Expand Down Expand Up @@ -455,6 +455,25 @@ impl ExecutionGraph {
panic!("No state found for id {:?}", prev_execution_id);
}
}

#[tracing::instrument]
pub async fn immutable_external_step_execution(
source_id: ExecutionNodeId,
state: ExecutionStateEvaluation,
) -> anyhow::Result<(
ExecutionNodeId,
ExecutionStateEvaluation, // the resulting total state of this step
Vec<(OperationId, OperationFnOutput)>, // values emitted by operations during this step
)> {
println!("step_execution_with_previous_state {:?} {:?}", &source_id, &state);
let previous_state = match &state {
ExecutionStateEvaluation::Complete(state1) => state1,
_ => { panic!("Stepping execution should only occur against completed states") }
};
let eval_state = previous_state.determine_next_operation()?;
let (new_state, outputs) = previous_state.step_execution(eval_state).await?;
Ok((source_id, new_state, outputs))
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ impl ExecutionState {
}

fn get_operation_node(&self, operation_id: OperationId) -> anyhow::Result<MutexGuard<OperationNode>> {
println!("Getting operation node");
self.operation_by_id
.get(&operation_id)
.ok_or_else(|| anyhow::anyhow!("Operation not found"))?
Expand Down Expand Up @@ -773,7 +774,7 @@ impl ExecutionState {
}


fn process_next_operation(
fn select_next_operation_to_evalute(
&self,
exec_queue: &mut VecDeque<OperationId>,
) -> anyhow::Result<Option<(Option<String>, OperationId, OperationInputs)>> {
Expand Down Expand Up @@ -845,7 +846,7 @@ impl ExecutionState {
if count_loops >= operation_count * 2 {
return Err(Error::msg("Looped through all operations without detecting an execution"));
}
match self.process_next_operation(&mut exec_queue) {
match self.select_next_operation_to_evalute(&mut exec_queue) {
Ok(Some((op_node_name, next_operation_id, inputs))) => {
let mut new_state = self.initialize_new_state();
new_state.evaluating_fn = None;
Expand Down Expand Up @@ -885,12 +886,17 @@ impl ExecutionState {
let result = pause_future_with_oneshot(ExecutionStateEvaluation::Executing(new_state.clone()), &s).await;
let _recv = result.await;
}
println!("step_execution, getting operation node {:?}", &new_state.evaluating_id);
let op_node = self.get_operation_node(new_state.evaluating_id)?;
println!("step_execution, completed getting operation node {:?}", &new_state.evaluating_id);
let args = new_state.evaluating_arguments.take().unwrap();
let next_operation_id = new_state.evaluating_id.clone();
println!("step_execution about to run op_node.execute");
let result = op_node.execute(&mut new_state, args, None, None).await?;
println!("step_execution after op_node.execute");
outputs.push((next_operation_id, result.clone()));
self.update_state(&mut new_state, next_operation_id, result);
println!("step_execution about to complete, after update_state");
return Ok((ExecutionStateEvaluation::Complete(new_state), outputs));
}
}
Expand Down
Loading

0 comments on commit 0086d78

Please sign in to comment.