Skip to content

Commit

Permalink
fix our locking issue
Browse files Browse the repository at this point in the history
  • Loading branch information
kvey committed Nov 11, 2024
1 parent 39e9c95 commit c9f9346
Show file tree
Hide file tree
Showing 10 changed files with 716 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::{Arc, mpsc};
use std::task::{Context, Poll};
use std::sync::{Mutex};
// use no_deadlocks::Mutex;
// use std::sync::{Mutex};
use no_deadlocks::Mutex;
use std::thread::{JoinHandle};
use std::time::Duration;
use anyhow::anyhow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use anyhow::Error;
use tokio::sync::oneshot;
use futures_util::FutureExt;
use tokio::sync::oneshot::error::TryRecvError;
use tracing::debug;
use uuid::Uuid;
use crate::cells::{CellTypes, CodeCell, LLMPromptCell};
use crate::execution::execution::execution_graph::{ExecutionGraphSendPayload, ExecutionNodeId, ChronologyId};
Expand Down Expand Up @@ -173,7 +174,7 @@ pub struct ExecutionState {

impl std::fmt::Debug for ExecutionState {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str(&render_map_as_table(self))
f.write_str(&self.chronology_id.to_string())
}
}

Expand Down Expand Up @@ -545,7 +546,7 @@ impl ExecutionState {
// TODO: this should create a coroutine that yields with the result of the function invocation
#[tracing::instrument(parent = parent_span_id.clone(), skip(self, payload))]
pub async fn dispatch(&self, function_name: &str, payload: RkyvSerializedValue, parent_span_id: Option<tracing::Id>) -> anyhow::Result<(Result<RkyvSerializedValue, ExecutionStateErrors>, ExecutionState)> {
println!("Running dispatch {:?}", function_name);
debug!("Running dispatch {:?}", function_name);

// Store the invocation payload into an execution state and record this before executing
let mut before_execution_state = self.create_new_revision_of_execution_state();
Expand Down Expand Up @@ -614,9 +615,9 @@ impl ExecutionState {
if let Some(graph_sender) = self.graph_sender.as_ref() {
let (oneshot_sender, mut oneshot_receiver) = tokio::sync::oneshot::channel();
graph_sender.send((execution_state.clone(), Some(oneshot_sender))).await.expect("Failed to send oneshot signal to the graph receiver");
println!("============= should pause {:?} {:?} =============", &execution_state.chronology_id, &(&execution_state.evaluating_fn));
debug!("============= should pause {:?} {:?} =============", &execution_state.chronology_id, &(&execution_state.evaluating_fn));
let _recv = oneshot_receiver.await.expect("Failed to receive oneshot signal");
println!("============= should resume {:?} {:?} =============", &execution_state.chronology_id, &(&execution_state.evaluating_fn));
debug!("============= should resume {:?} {:?} =============", &execution_state.chronology_id, &(&execution_state.evaluating_fn));
}
}

Expand Down Expand Up @@ -685,7 +686,7 @@ impl ExecutionState {
let mut count_loops = 0;

loop {
println!("looping {:?} {:?}", self.exec_queue, count_loops);
debug!("Looping through queue of executable cells {:?} {:?}", self.exec_queue, count_loops);

if count_loops >= operation_count * 2 {
return Err(Error::msg("Looped through all operations without detecting an execution"));
Expand Down Expand Up @@ -737,6 +738,7 @@ impl ExecutionState {
pub async fn step_execution(
&self,
) -> anyhow::Result<(ExecutionState, Vec<(OperationId, OperationFnOutput)>)> {
debug!("Running step_execution for state {:?}", self.chronology_id);
// 1. Initialize state and prepare for execution
let mut before_execution_state = self.determine_next_operation()?;
let operation_id = before_execution_state.evaluating_operation_id.clone();
Expand Down
5 changes: 2 additions & 3 deletions toolchain/chidori-core/src/library/std/ai/llm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::collections::HashMap;
use std::env;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use tracing::debug;
use uuid::Uuid;
use chidori_prompt_format::templating::templates::{ChatModelRoles, TemplateWithSource};
use crate::cells::{LLMCodeGenCellChatConfiguration, LLMPromptCellChatConfiguration, TextRange};
Expand Down Expand Up @@ -320,7 +321,7 @@ pub async fn ai_llm_run_chat_model(
is_function_invocation: bool,
configuration: LLMPromptCellChatConfiguration
) -> anyhow::Result<(Result<RkyvSerializedValue, ExecutionStateErrors>, Option<ExecutionState>)> {
println!("Executing ai_llm_run_chat_model");
debug!("Executing ai_llm_run_chat_model");
let mut template_messages: Vec<TemplateMessage> = Vec::new();
let data = template_data_payload_from_rkyv(&payload);

Expand Down Expand Up @@ -353,8 +354,6 @@ pub async fn ai_llm_run_chat_model(
},
}).await;

println!("Completed chat model execution {:?}", result);

if let Err(e) = result {
return Ok((Result::Err(ExecutionStateErrors::AnyhowError(e)), None))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ pub async fn source_code_run_python(
// Capture the current span's ID
let current_span_id = Span::current().id();

println!("Invoking source_code_run_python");
debug!("Invoking source_code_run_python");

let exec_id = increment_source_code_run_counter();

Expand Down
2 changes: 1 addition & 1 deletion toolchain/chidori-core/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async fn main() -> anyhow::Result<()>{
// // Add your deployment logic here
// }
None => {
println!("No command was used");
info!("No command was used");
Ok(())
}
}
Expand Down
59 changes: 35 additions & 24 deletions toolchain/chidori-core/src/sdk/chidori_runtime_instance.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::collections::HashSet;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{mpsc, Arc, Mutex};
use std::sync::{mpsc, Arc};
use tokio::sync::mpsc::Receiver as TokioReceiver;
use no_deadlocks::Mutex;
use std::fmt;
use uuid::Uuid;
use std::time::Duration;
use anyhow::anyhow;
use dashmap::mapref::one::Ref;
use tracing::{debug, info};
use crate::cells::CellTypes;
use crate::execution::execution::execution_graph::{ExecutionGraph, ExecutionNodeId};
use crate::execution::execution::execution_state::{EnclosedState};
Expand Down Expand Up @@ -62,7 +64,7 @@ impl ChidoriRuntimeInstance {
// TODO: reload_cells needs to diff the mutations that live on the current branch, with the state
// that we see in the shared state when this event is fired.
pub async fn reload_cells(&mut self) -> anyhow::Result<()> {
println!("Reloading cells");
debug!("Reloading cells");
let cells_to_upsert: Vec<_> = {
let shared_state = self.shared_state.lock().unwrap();
shared_state.editor_cells.values().map(|cell| cell.clone()).collect()
Expand Down Expand Up @@ -97,14 +99,14 @@ impl ChidoriRuntimeInstance {
}

pub async fn shutdown(&mut self) {
println!("Shutting down Chidori runtime.");
info!("Shutting down Chidori runtime.");
self.db.shutdown().await;
}


// #[tracing::instrument]
pub async fn wait_until_ready(&mut self) -> anyhow::Result<()> {
println!("Awaiting initialization of the execution coordinator");
info!("Awaiting initialization of the execution coordinator");
self.db.execution_depth_orchestration_initialized_notify.notified().await;
Ok(())
}
Expand All @@ -119,7 +121,7 @@ impl ChidoriRuntimeInstance {
// Reload cells to make sure we're up-to-date
self.reload_cells().await?;

let executing_states = Arc::new(tokio::sync::Mutex::new(HashSet::new()));
let executing_states = Arc::new(Mutex::new(HashSet::new()));
// Create a channel for error notifications
let (error_tx, mut error_rx) = tokio::sync::mpsc::channel(32);

Expand All @@ -140,7 +142,7 @@ impl ChidoriRuntimeInstance {

// Receives the results of execution during progression of ExecutionStates
if let Ok(state) = self.rx_execution_states.try_recv() {
println!("InstancedEnvironment received an execution event {:?}", &state);
println!("InstancedEnvironment received an execution event {:?}", &state.chronology_id);
self.push_update_to_client(&state);
self.set_execution_head(&state);
}
Expand All @@ -155,7 +157,7 @@ impl ChidoriRuntimeInstance {
let execution_head_state_id = self.execution_head_state_id;

// Acquire lock and check if we're already executing this state
let mut executing_states_instance = executing_states.lock().await;
let mut executing_states_instance = executing_states.lock().unwrap();
if !executing_states_instance.contains(&execution_head_state_id) {
println!("Will eval step, inserting eval state {:?}", &execution_head_state_id);
executing_states_instance.insert(execution_head_state_id);
Expand All @@ -165,23 +167,32 @@ impl ChidoriRuntimeInstance {
let executing_states = Arc::clone(&executing_states);
let error_tx = error_tx.clone();
let state = self.get_state_at_current_execution_head_result()?.clone();
tokio::spawn(async move {
let result = state.step_execution().await;
match result {
Ok(_) => {
// Handle successful execution
executing_states.lock().await.remove(&execution_head_state_id);
},
Err(err) => {
// Ensure we clean up the execution state
executing_states.lock().await.remove(&execution_head_state_id);
// Send the error through the channel
if let Err(send_err) = error_tx.send(err).await {
eprintln!("Failed to send error through channel: {:?}", send_err);
}

std::thread::spawn(move || {
// Create a new tokio runtime for this thread
let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");

// Enter the runtime context
let _guard = runtime.enter();

// Execute the async block on this runtime
runtime.block_on(async {
let result = state.step_execution().await;
// Clear execution
let mut executing_states_lock = executing_states.lock().unwrap();
executing_states_lock.remove(&execution_head_state_id);
drop(executing_states_lock);

match result {
Err(err) => {
// Send the error through the channel
if let Err(send_err) = error_tx.send(err).await {
eprintln!("Failed to send error through channel: {:?}", send_err);
}
}
Ok(_) => {},
}
}
});
});
}
}
Expand Down Expand Up @@ -280,7 +291,7 @@ impl ChidoriRuntimeInstance {
}

fn set_execution_head(&mut self, state: &ExecutionState) {
println!("Setting execution head");
debug!("Setting execution head to {:?}", state.chronology_id);
// Execution heads can only be Completed states, not states still evaluating
if matches!(&state.evaluating_enclosed_state, EnclosedState::Close(_)) || (&state).evaluating_enclosed_state == EnclosedState::SelfContained {
if state.evaluating_fn.is_none() {
Expand All @@ -296,7 +307,7 @@ impl ChidoriRuntimeInstance {

fn push_update_to_client(&mut self, state: &ExecutionState) {
let state_id = state.chronology_id;
println!("Resulted in state with id {:?}, {:?}", &state_id, &state);
println!("Resulted in state with id {:?}", &state_id);
if let Some(sender) = self.runtime_event_sender.as_mut() {
sender.send(EventsFromRuntime::DefinitionGraphUpdated(state.get_dependency_graph_flattened())).unwrap();
let mut cells = vec![];
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::fmt;
use std::sync::{mpsc, Arc, Mutex, MutexGuard};
use std::sync::{mpsc, Arc, MutexGuard};

use no_deadlocks::Mutex;
use uuid::Uuid;
use std::sync::mpsc::Sender;
use tracing::dispatcher::DefaultGuard;
Expand Down
1 change: 0 additions & 1 deletion toolchain/chidori-core/src/utils/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ impl<S> Layer<S> for CustomLayer
}

fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
dbg!(&event);
// Process events here
self.sender.send(TraceEvents::Event).unwrap();
}
Expand Down
4 changes: 2 additions & 2 deletions toolchain/chidori-debugger/src/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,8 @@ fn maintain_call_tree(
mut traces: ResMut<ChidoriState>,
mut call_tree: ResMut<TracesCallTree>,
) {
let tree = build_call_tree(traces.trace_events.clone(), false);
call_tree.inner = tree;
// let tree = build_call_tree(traces.trace_events.clone(), false);
// call_tree.inner = tree;
}

fn update_positions(
Expand Down
Loading

0 comments on commit c9f9346

Please sign in to comment.