From cff6d895665ef1f94c3b47250b522e32d28710ca Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Wed, 13 Nov 2024 15:28:55 +0100 Subject: [PATCH] XMessage::Over for client to ask to stop communication The number of task can be set and pass to add tasks in batch. --- README.md | 8 +- src/actioner.rs | 173 +++++++++++++++++++++++--------------------- src/bin/actioner.rs | 106 +++++++++++++++++---------- src/codec.rs | 3 + 4 files changed, 168 insertions(+), 122 deletions(-) diff --git a/README.md b/README.md index 29338f7..33f0b47 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,8 @@ For testing purpose, I recommend to use in memory DB by running: surreal start --user root --pass root memory ``` -Add task to table and run it +Add task to table and run it. +The tasks can have different scale that run with different range of snooze time, and have different block type that required to be launched in async time or in threads. ```bash ./actionwurm task add -h @@ -58,8 +59,6 @@ Add task to table and run it ./actionwurm task play -a ``` -The tasks can have different scale that run with different range of snooze time, and have different block type that required to be launched in async time or in threads. - To check the task list and filtering on specific state of tasks ```bash @@ -128,6 +127,9 @@ I should polish and clear about design note and make an AEP for it first. At the current stage, the code base is small and every part is clear defined without too much abstractions. +Since the task pool is added by using mocked surrealdb, which requires huge amount of crates dependencies. +The worker and actioner binaries should be moved to crates that has independent `Cargo.toml`, to make the compile of server crate fast. + --------------------- - [ ] benchmark throughput, not too much to bench, the bottleneck is in DB access. diff --git a/src/actioner.rs b/src/actioner.rs index 815c388..c9e25b3 100644 --- a/src/actioner.rs +++ b/src/actioner.rs @@ -32,16 +32,24 @@ pub async fn handle( // - should reported from worker when the mission is finished // - should also get information from worker complain about the long running // block process if it runs on non-block worker. - if let Some(Ok(msg)) = framed_reader.next().await { - match msg { - XMessage::WorkerTablePrint => { - let resp_msg = XMessage::BulkMessage(format!("{}\n", worker_table.render().await,)); - framed_writer.send(resp_msg).await?; - } + loop { + if let Some(Ok(msg)) = framed_reader.next().await { + match msg { + // You say over, I say over + XMessage::Over => { + framed_writer.send(XMessage::Over).await?; + break; + } + + XMessage::WorkerTablePrint => { + let rtable = worker_table.render().await.to_string(); + let resp_msg = XMessage::BulkMessage(rtable); + framed_writer.send(resp_msg).await?; + } - XMessage::TaskTablePrint { states } => { - let count_info = task_table.count().await; - let count_info = format!("created: {}, ready: {}, submit: {}, pause: {}, run: {}, complete: {}, killed: {}.", + XMessage::TaskTablePrint { states } => { + let count_info = task_table.count().await; + let count_info = format!("created: {}, ready: {}, submit: {}, pause: {}, run: {}, complete: {}, killed: {}.", count_info.get(&State::Created).unwrap_or(&0), count_info.get(&State::Ready).unwrap_or(&0), count_info.get(&State::Submit).unwrap_or(&0), @@ -50,87 +58,90 @@ pub async fn handle( count_info.get(&State::Terminated(0)).unwrap_or(&0), count_info.get(&State::Terminated(-1)).unwrap_or(&0), ); - let tasks = task_table.filter_by_states(states).await; - let task_table = task::Table::from_mapping(tasks); - let resp_msg = XMessage::BulkMessage(format!( - "{}\n\n{}\n", - task_table.render().await, - count_info, - )); - framed_writer.send(resp_msg).await?; - } + let tasks = task_table.filter_by_states(states).await; + let task_table = task::Table::from_mapping(tasks); + let resp_msg = XMessage::BulkMessage(format!( + "{}\n\n{}", + task_table.render().await, + count_info, + )); + framed_writer.send(resp_msg).await?; + } - // Signal direction - src: actioner, dst: coordinator - // Handle signal n/a -> Created - XMessage::ActionerOp(Operation::AddTask(record_id)) => { - // TODO: need to check if the task exist - // TODO: priority passed from operation - let task_ = Task::new(0, &record_id); - let id = task_table.create(task_.clone()).await; - - // send resp to actioner - let resp_msg = XMessage::BulkMessage(format!( - "Add task id={id}, map to task record_id={record_id} to run.\n" - )); - framed_writer.send(resp_msg).await?; - } - // Signal direction - src: actioner, dst: coordinator - // Handle signal x -> Ready - XMessage::ActionerOp(Operation::PlayTask(id)) => { - // TODO: need to check init state is able to be played - let task_ = task_table.read(&id).await; - if let Some(mut task_) = task_ { - task_.state = task::State::Ready; - task_table.update(&id, task_).await?; + // Signal direction - src: actioner, dst: coordinator + // Handle signal n/a -> Created + XMessage::ActionerOp(Operation::AddTask(record_id)) => { + // TODO: need to check if the task exist + // TODO: priority passed from operation + let task_ = Task::new(0, &record_id); + let id = task_table.create(task_.clone()).await; + + // send resp to actioner + let resp_msg = XMessage::BulkMessage(format!( + "Add task id={id}, map to task record_id={record_id} to run." + )); + framed_writer.send(resp_msg).await?; } + // Signal direction - src: actioner, dst: coordinator + // Handle signal x -> Ready + XMessage::ActionerOp(Operation::PlayTask(id)) => { + // TODO: need to check init state is able to be played + let task_ = task_table.read(&id).await; + if let Some(mut task_) = task_ { + task_.state = task::State::Ready; + task_table.update(&id, task_).await?; + } - let resp_msg = XMessage::BulkMessage(format!("Launching task uuid={id}.\n",)); - framed_writer.send(resp_msg).await?; - } - // Signal direction - src: actioner, dst: coordinator - // Handle signal all pause/created x -> Ready - XMessage::ActionerOp(Operation::PlayAllTask) => { - // TODO: also include pause state to resume - let resumable_tasks = task_table - .filter_by_states(vec![task::State::Created]) - .await; - - for (task_id, _) in resumable_tasks { - let Some(mut task_) = task_table.read(&task_id).await else { - continue; - }; - // XXX: check, is cloned?? so the old_state is different from after changed - let old_state = task_.state; - - task_.state = task::State::Ready; - task_table.update(&task_id, task_).await?; - println!( - "Play task {task_id}: {} -> {}", - old_state, - task::State::Ready - ); + let resp_msg = XMessage::BulkMessage(format!("Launching task uuid={id}.",)); + framed_writer.send(resp_msg).await?; } - } - // Signal direction - src: actioner, dst: coordinator - // Handle signal x -> Terminated(-1) - XMessage::ActionerOp(Operation::KillTask(id)) => { - let task_ = task_table.read(&id).await; - if let Some(mut task_) = task_ { - task_.state = task::State::Terminated(-1); - task_table.update(&id, task_).await?; + // Signal direction - src: actioner, dst: coordinator + // Handle signal all pause/created x -> Ready + XMessage::ActionerOp(Operation::PlayAllTask) => { + // TODO: also include pause state to resume + let resumable_tasks = task_table + .filter_by_states(vec![task::State::Created]) + .await; + + for (task_id, _) in resumable_tasks { + let Some(mut task_) = task_table.read(&task_id).await else { + continue; + }; + // XXX: check, is cloned?? so the old_state is different from after changed + let old_state = task_.state; - // TODO: also sending a cancelling signal to the runnning task on worker + task_.state = task::State::Ready; + task_table.update(&task_id, task_).await?; + println!( + "Play task {task_id}: {} -> {}", + old_state, + task::State::Ready + ); + } + } + // Signal direction - src: actioner, dst: coordinator + // Handle signal x -> Terminated(-1) + XMessage::ActionerOp(Operation::KillTask(id)) => { + let task_ = task_table.read(&id).await; + if let Some(mut task_) = task_ { + task_.state = task::State::Terminated(-1); + task_table.update(&id, task_).await?; + + // TODO: also sending a cancelling signal to the runnning task on worker - let resp_msg = XMessage::BulkMessage(format!("Kill task uuid={id}.\n",)); + let resp_msg = XMessage::BulkMessage(format!("Kill task uuid={id}.\n",)); + framed_writer.send(resp_msg).await?; + } + } + + // boss is asking nonsense + _ => { + let resp_msg = XMessage::BulkMessage(format!( + "Shutup, I try to ignore you, since you say '{msg:#?}'" + )); framed_writer.send(resp_msg).await?; } } - _ => { - let resp_msg = XMessage::BulkMessage(format!( - "Shutup, I try to ignore you, since you say '{msg:#?}'" - )); - framed_writer.send(resp_msg).await?; - } } } diff --git a/src/bin/actioner.rs b/src/bin/actioner.rs index faa6553..37c3f7c 100644 --- a/src/bin/actioner.rs +++ b/src/bin/actioner.rs @@ -3,6 +3,7 @@ use clap::{Parser, Subcommand, ValueEnum}; use futures::SinkExt; use rand::{self, Rng}; use surrealdb::sql::Datetime; +use tokio::time; use uuid::Uuid; use tokio::net::TcpStream; @@ -103,6 +104,10 @@ enum BlockType { enum TaskCommand { /// Add a new task Add { + // number of tasks to add + #[arg(short, long)] + number: u64, + // Scale of task, small, medium, large #[arg(short, long, value_enum)] scale: TaskScale, @@ -205,38 +210,44 @@ async fn main() -> anyhow::Result<()> { } }, Commands::Task { command } => match command { - TaskCommand::Add { scale, block_type } => { - let isblock = match block_type { - BlockType::Sync => true, - BlockType::Async => false, - }; - - // small scale for 100 ~ 1000 millis - // medium for 1000 ~ 10_000 millis - // large for 10_000 ~ 100_000 - let x = { - let mut rng = rand::thread_rng(); - rng.gen_range(1..10) - }; - - let st = match scale { - TaskScale::Small => x * 100, - TaskScale::Medium => x * 1000, - TaskScale::Large => x * 10_000, - }; - - let task = MockTask::new(st, isblock, Utc::now().into()); - - let created: Option = db.create("task").content(task).await?; - - if let Some(created) = created { - let record_id = created.id.to_string(); - - framed_writer - .send(XMessage::ActionerOp(Operation::AddTask(record_id))) - .await?; - } else { - eprintln!("not able to create task to pool."); + TaskCommand::Add { + number, + scale, + block_type, + } => { + for _ in 0..number { + let isblock = match block_type { + BlockType::Sync => true, + BlockType::Async => false, + }; + + // small scale for 100 ~ 1000 millis + // medium for 1000 ~ 10_000 millis + // large for 10_000 ~ 100_000 + let x = { + let mut rng = rand::thread_rng(); + rng.gen_range(1..10) + }; + + let st = match scale { + TaskScale::Small => x * 100, + TaskScale::Medium => x * 1000, + TaskScale::Large => x * 10_000, + }; + + let task = MockTask::new(st, isblock, Utc::now().into()); + + let created: Option = db.create("task").content(task).await?; + + if let Some(created) = created { + let record_id = created.id.to_string(); + + framed_writer + .send(XMessage::ActionerOp(Operation::AddTask(record_id))) + .await?; + } else { + eprintln!("not able to create task to pool."); + } } } TaskCommand::Play { all: true, .. } => { @@ -286,13 +297,32 @@ async fn main() -> anyhow::Result<()> { }, } - if let Some(Ok(msg)) = framed_reader.next().await { - match msg { - XMessage::BulkMessage(s) => { - println!("{s}"); + // After a command bundle, send over to finish the comm and listen to echo + framed_writer.send(XMessage::Over).await?; + + let timeout = time::Duration::from_millis(500); + loop { + tokio::select! { + Some(Ok(msg)) = framed_reader.next() => { + match msg { + XMessage::BulkMessage(s) => { + println!("{s}"); + } + + XMessage::Over => { + break; + } + + _ => { + dbg!(msg); + } + } } - _ => { - dbg!(msg); + + () = time::sleep(timeout) => { + println!("Timeout reached: No message received."); + // Handle timeout, e.g., log, retry, or terminate loop + break; } } } diff --git a/src/codec.rs b/src/codec.rs index 9a7d956..5282bf0 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -101,6 +101,9 @@ pub enum XMessage { from: task::State, to: task::State, }, + + // over + Over, } #[derive(Debug)]