Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] choredb #506

Closed
wants to merge 33 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1928402
chore db initial
olegklimov Oct 23, 2024
bbd5213
chore db getters setters
olegklimov Oct 23, 2024
8fc2207
chore db handlers and test for handlers (doesn't work)
olegklimov Oct 23, 2024
b81ec18
chore db first _update
olegklimov Oct 31, 2024
1810708
chore db first hello world stored
olegklimov Oct 31, 2024
5ae2784
chore db SSE works
olegklimov Oct 31, 2024
f3f4330
chore db CMessage
olegklimov Nov 2, 2024
5a6b1c8
chore db move files around
olegklimov Nov 2, 2024
fddfa4f
warnings
olegklimov Nov 2, 2024
5cdd5a0
cthread sub works
olegklimov Nov 2, 2024
7e85d6d
chore db: move handlers to db
olegklimov Nov 2, 2024
5684a5a
chore db pubsub sleeps and exits connectly
olegklimov Nov 2, 2024
5c12259
chore db improved shutdown time, fixed some sub problems
olegklimov Nov 2, 2024
e97df23
chore db foreign key tested to work
olegklimov Nov 2, 2024
870fceb
chore db simplify a bit
olegklimov Nov 2, 2024
95c90ab
chore db more structs in schema
olegklimov Nov 2, 2024
65f0df5
chores db struct in db
olegklimov Nov 2, 2024
33609eb
fix deadlock and quicksearch
olegklimov Nov 2, 2024
15aa52f
chore db: sub on chore events too
olegklimov Nov 3, 2024
821bb5c
choredb observe works for cthreads cmessages
olegklimov Nov 3, 2024
275344f
chore db ready to start autonomous
olegklimov Nov 3, 2024
cba9829
autonomy 1
olegklimov Nov 3, 2024
2c255ae
autonomy 2
olegklimov Nov 3, 2024
6405587
autonomy 3 (locking works)
olegklimov Nov 3, 2024
aa6c32c
test14 correct shutdown
olegklimov Nov 3, 2024
99b6379
autonomy 4 (have spad)
olegklimov Nov 3, 2024
e85813e
autonomy (almost works)
olegklimov Nov 4, 2024
6118083
autonomy 6 (actually works)
olegklimov Nov 4, 2024
eca7f13
minor
olegklimov Nov 4, 2024
28f4750
Merge branch 'self_configure' into oleg_chore2
JegernOUTT Dec 17, 2024
99a7721
Implement ongoing work tracking with new API endpoints
JegernOUTT Dec 18, 2024
f7c4aa3
Refactor subchat tool handling to use Option<Vec<String>>
JegernOUTT Dec 20, 2024
43d6e18
Merge branch 'self_configure' into oleg_chore2_rebased
JegernOUTT Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
autonomy (almost works)
olegklimov committed Nov 4, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit e85813e9be05088ae89c302a7a674cd45a8992d1
15 changes: 12 additions & 3 deletions src/agent_db/db_schema_20241102.rs
Original file line number Diff line number Diff line change
@@ -14,10 +14,19 @@ pub fn create_tables_20241102(conn: &Connection, reset_memory: bool) -> Result<(
pubevent_id INTEGER PRIMARY KEY AUTOINCREMENT,
pubevent_channel TEXT NOT NULL,
pubevent_action TEXT NOT NULL,
pubevent_json TEXT NOT NULL
pubevent_json TEXT NOT NULL,
pubevent_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)",
[],
).map_err(|e| e.to_string())?;
conn.execute(
"CREATE TRIGGER IF NOT EXISTS pubsub_events_delete_old
AFTER INSERT ON pubsub_events
BEGIN
DELETE FROM pubsub_events WHERE pubevent_ts <= datetime('now', '-15 minutes');
END;",
[],
).map_err(|e| e.to_string())?;
conn.execute(
"CREATE TABLE IF NOT EXISTS chores (
chore_id TEXT PRIMARY KEY,
@@ -70,8 +79,8 @@ pub fn create_tables_20241102(conn: &Connection, reset_memory: bool) -> Result<(
cmessage_num INT NOT NULL,
cmessage_prev_alt INT NOT NULL,
cmessage_usage_model TEXT NOT NULL,
cmessage_usage_prompt TEXT NOT NULL,
cmessage_usage_completion TEXT NOT NULL,
cmessage_usage_prompt INT NOT NULL,
cmessage_usage_completion INT NOT NULL,
cmessage_json TEXT NOT NULL,
PRIMARY KEY (cmessage_belongs_to_cthread_id, cmessage_alt, cmessage_num),
FOREIGN KEY (cmessage_belongs_to_cthread_id)
4 changes: 2 additions & 2 deletions src/agent_db/db_structs.rs
Original file line number Diff line number Diff line change
@@ -49,8 +49,8 @@ pub struct CMessage {
// /primary
pub cmessage_prev_alt: i32,
pub cmessage_usage_model: String,
pub cmessage_usage_prompt: String,
pub cmessage_usage_completion: String,
pub cmessage_usage_prompt: i32,
pub cmessage_usage_completion: i32,
pub cmessage_json: String,
}

104 changes: 76 additions & 28 deletions src/autonomy.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::RwLock as ARwLock;
use tokio::sync::{RwLock as ARwLock, Mutex as AMutex};
use indexmap::IndexSet;

use crate::global_context::GlobalContext;
use crate::agent_db::db_structs::{CThread, CMessage};
use crate::agent_db::chore_pubsub_sleeping_procedure;
use crate::agent_db::db_cthread::CThreadSubscription;
use crate::call_validation::{ChatMessage, ChatUsage};
use crate::call_validation::{ChatContent, ChatMessage};
use crate::at_commands::at_commands::AtCommandsContext;

const SLEEP_IF_NO_WORK_SEC: u64 = 10;
const LOCK_TOO_OLD_SEC: f64 = 600.0;
@@ -150,14 +151,38 @@ async fn do_the_job(
cthread_rec: &CThread,
cmessages: &Vec<CMessage>,
) -> Result<serde_json::Value, String> {
let mut messages: Vec<ChatMessage> = cmessages.iter().map(|cmsg| { serde_json::from_str(&cmsg.cmessage_json).map_err(|e| format!("{}", e))}).collect::<Result<Vec<_>, _>>()?;
let cdb = gcx.read().await.chore_db.clone();
let messages: Vec<ChatMessage> = cmessages.iter().map(|cmsg| { serde_json::from_str(&cmsg.cmessage_json).map_err(|e| format!("{}", e))}).collect::<Result<Vec<_>, _>>()?;
let message_info: Vec<String> = messages.iter().map(|msg| {
let role = &msg.role;
let content_brief = match &msg.content {
ChatContent::SimpleText(text) => { format!("{}", text.len()) },
ChatContent::Multimodal(elements) => {
elements.iter().map(|el| {
if el.is_text() {
format!("text{}", el.m_content.len())
} else {
format!("{}[image]", el.m_type)
}
}).collect::<Vec<_>>().join("+")
},
};
let mut tool_calls_brief = match &msg.tool_calls {
Some(tool_calls) => tool_calls.iter().map(|call| call.function.name.clone()).collect::<Vec<_>>().join("/"),
None => String::new(),
};
if !tool_calls_brief.is_empty() {
tool_calls_brief.insert(0, '/');
}
format!("{}/{}{}", role, content_brief, tool_calls_brief)
}).collect();
let message_info_str = message_info.join(", ");
tracing::info!("{} started work on {}\n[{}]", worker_name, cthread_rec.cthread_id, message_info_str);

// ccx: Arc<AMutex<AtCommandsContext>>,
// wrap_up_depth: usize,
// wrap_up_tokens_cnt: usize,
// wrap_up_prompt: &str,
// wrap_up_n: usize,
let mut usage_collector = ChatUsage { ..Default::default() };
let tools_turned_on_by_cmdline = crate::tools::tools_description::tools_merged_and_filtered(gcx.clone()).await?;
let allow_experimental = gcx.read().await.cmdline.experimental;
let tools_desclist = crate::tools::tools_description::tool_description_list_from_yaml(
@@ -190,40 +215,63 @@ async fn do_the_job(
None,
only_deterministic_messages,
).await?;
let n_ctx = chat_post.max_tokens; // create_chat_post_and_scratchpad saves n_ctx here :/

// let chat_response_msgs = crate::subchat::chat_interaction(ccx.clone(), spad, &mut chat_post).await?;
// let old_messages = messages.clone();
// // no need to remove user from old_messages here, because allow_at is false
let ccx: Arc<AMutex<AtCommandsContext>> = Arc::new(AMutex::new(AtCommandsContext::new(
gcx.clone(),
n_ctx,
7,
false,
messages.clone(),
cthread_rec.cthread_id.clone(),
).await));
// t.subchat_tx = ccx_lock.subchat_tx.clone();
// t.subchat_rx = ccx_lock.subchat_rx.clone();

// XXX at commands
tracing::info!("{} start chat_interaction()", worker_name);
let chat_response_msgs = crate::subchat::chat_interaction(ccx.clone(), spad, &mut chat_post).await?;
if chat_response_msgs.len() == 0 {
return Err("Oops strange, chat_interaction() returned no choices".to_string());
}
let choice0: Vec<ChatMessage> = chat_response_msgs[0].clone();

// save the messages
for (i, chat_message) in choice0.iter().enumerate() {
let mut cmessage_usage_prompt = 0;
let mut cmessage_usage_completion = 0;
if let Some(u) = &chat_message.usage {
cmessage_usage_prompt = u.prompt_tokens as i32;
cmessage_usage_completion = u.completion_tokens as i32;
} else {
tracing::warn!("running {} didn't produce usage so it's hard to calculate tokens :/", cthread_rec.cthread_model);
}
let cmessage = CMessage {
cmessage_belongs_to_cthread_id: cthread_rec.cthread_id.clone(),
cmessage_alt: 0,
cmessage_num: (cmessages.len() as i32) + (i as i32),
cmessage_prev_alt: 0,
cmessage_usage_model: cthread_rec.cthread_model.clone(),
cmessage_usage_prompt,
cmessage_usage_completion,
cmessage_json: serde_json::to_string(chat_message).map_err(|e| format!("{}", e))?,
};
crate::agent_db::db_cmessage::cmessage_set(cdb.clone(), cmessage);
}



// let old_messages = messages.clone();
// let results = chat_response_msgs.iter().map(|new_msgs| {
// let mut extended_msgs = old_messages.clone();
// extended_msgs.extend(new_msgs.clone());
// extended_msgs
// }).collect::<Vec<Vec<ChatMessage>>>();

// if let Some(usage_collector) = usage_collector_mb {
// update_usage_from_messages(usage_collector, &results);
// crate::subchat::update_usage_from_messages(usage_collector, &results);
// }

// if let Some(tx_chatid) = tx_chatid_mb {
// assert!(tx_toolid_mb.is_some());
// let tx_toolid = tx_toolid_mb.unwrap();
// let subchat_tx = ccx.lock().await.subchat_tx.clone();
// for (i, choice) in chat_response_msgs.iter().enumerate() {
// // XXX: ...-choice will not work to store in chat_client.py
// let cid = if chat_response_msgs.len() > 1 {
// format!("{}-choice{}", tx_chatid, i)
// } else {
// tx_chatid.clone()
// };
// for msg_in_choice in choice {
// let message = serde_json::json!({"tool_call_id": tx_toolid, "subchat_id": cid, "add_message": msg_in_choice});
// let _ = subchat_tx.lock().await.send(message);
// }
// }
// }


// {
// // keep session
// let mut step_n = 0;
2 changes: 1 addition & 1 deletion src/subchat.rs
Original file line number Diff line number Diff line change
@@ -111,7 +111,7 @@ async fn chat_interaction_non_stream(
chat_post.only_deterministic_messages,
).await.map_err(|e| {
warn!("network error communicating with the model (2): {:?}", e);
format!("network error communicating with the model (2): {:?}", e)
format!("network error communicating with the model (2): {}", e)
})?;
info!("non stream generation took {:?}ms", t1.elapsed().as_millis() as i32);

6 changes: 3 additions & 3 deletions tests/test14_choredb_observe.py
Original file line number Diff line number Diff line change
@@ -20,8 +20,8 @@ class CMessage(BaseModel):
cmessage_num: int
cmessage_prev_alt: int
cmessage_usage_model: str
cmessage_usage_prompt: str
cmessage_usage_completion: str
cmessage_usage_prompt: int
cmessage_usage_completion: int
cmessage_json: str

def cmessage_key(cmessage_belongs_to_cthread_id, cmessage_alt, cmessage_num):
@@ -147,7 +147,7 @@ def print_messages(indent, msgdict: Dict[str, CMessage]):

def cthread_emojis(cthread: CThread):
archived_emoji = "🗑️" if cthread.cthread_archived_ts else ""
error_emoji = "❌" if cthread.cthread_error else ""
error_emoji = ("❌%s" % cthread.cthread_error) if cthread.cthread_error else ""
new_emoji = "🟡" if cthread.cthread_anything_new else ""
return f"{archived_emoji}{error_emoji}{new_emoji}"