Skip to content

Commit

Permalink
Merge pull request #13 from unkcpz/mock-task-pool-surrealdb-test
Browse files Browse the repository at this point in the history
Mock task pool surrealdb test
  • Loading branch information
unkcpz authored Nov 13, 2024
2 parents 781a145 + cff6d89 commit 5f96a64
Show file tree
Hide file tree
Showing 10 changed files with 427 additions and 148 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target
Cargo.lock
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ anyhow = "1.0.89"
atoi = "2.0.0"
byteorder = "1.5.0"
bytes = "1.7.2"
chrono = { version = "0.4.38", features = ["serde"] }
clap = { version = "4.5.20", features = ["derive"] }
evalexpr = "12.0.1"
futures = "0.3.31"
rand = "0.8.5"
rmp-serde = "1.3.0"
serde = { version = "1.0.210", features = ["serde_derive"] }
surrealdb = "2.0.4" # only for clients, for dev purpose, should be removed for production
tabled = "0.16.0"
thiserror = "1.0.63"
tokio = { version = "1.40.0", features = ["full"] }
Expand Down
24 changes: 18 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,24 @@ Decompress the file and in different terminals or multiplexers, run

to start the coordinator.

Run
The task pool is mocked by using SurrealDB.
Therefore you need to [install the SurrealDB](https://surrealdb.com/docs/surrealdb/installation) and start a DB instance.

For testing purpose, I recommend to use in memory DB by running:

```bash
./actionwurm task add
surreal start --user root --pass root memory
```

Add task to table and run it.
The tasks can have different scale that run with different range of snooze time, and have different block type that required to be launched in async time or in threads.

```bash
./actionwurm task add -h
./actionwurm task play <id>
./actionwurm task play -a
```

to add task to table and run it.

To check the task list and filtering on specific state of tasks

```bash
Expand Down Expand Up @@ -108,9 +116,9 @@ Prototype:
- [x] in memory tasks table.
- [x] pretty print table and passing it to actioner.
- [x] mock use dummy async sleep tasks.
- [ ] mock the task pool where the task are constructed to perform.
- [x] mock the task pool where the task are constructed to perform. (#13)
- [x] worker manage tasks through channels (_kill).
- [ ] task pool mixed of sync/async tasks, benchmark throughput.
- [x] task pool mixed of sync/async tasks. (#13)
- [x] create -> ready state by the `play` signal.
- [x] sound CLI for register and play a single task.

Expand All @@ -119,8 +127,12 @@ I should polish and clear about design note and make an AEP for it first.

At the current stage, the code base is small and every part is clear defined without too much abstractions.

Since the task pool is added by using mocked surrealdb, which requires huge amount of crates dependencies.
The worker and actioner binaries should be moved to crates that has independent `Cargo.toml`, to make the compile of server crate fast.

---------------------

- [ ] benchmark throughput, not too much to bench, the bottleneck is in DB access.
- [ ] pyo3 interface to expose the communicate part for python runner.
- [ ] Adding unit tests for things above so stady to move forward.
- [ ] table management using actor model instead of using mutex.
Expand Down
167 changes: 90 additions & 77 deletions src/actioner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,24 @@ pub async fn handle(
// - should reported from worker when the mission is finished
// - should also get information from worker complain about the long running
// block process if it runs on non-block worker.
if let Some(Ok(msg)) = framed_reader.next().await {
match msg {
XMessage::WorkerTablePrint => {
let resp_msg = XMessage::BulkMessage(format!("{}\n", worker_table.render().await,));
framed_writer.send(resp_msg).await?;
}
loop {
if let Some(Ok(msg)) = framed_reader.next().await {
match msg {
// You say over, I say over
XMessage::Over => {
framed_writer.send(XMessage::Over).await?;
break;
}

XMessage::WorkerTablePrint => {
let rtable = worker_table.render().await.to_string();
let resp_msg = XMessage::BulkMessage(rtable);
framed_writer.send(resp_msg).await?;
}

XMessage::TaskTablePrint { states } => {
let count_info = task_table.count().await;
let count_info = format!("created: {}, ready: {}, submit: {}, pause: {}, run: {}, complete: {}, killed: {}.",
XMessage::TaskTablePrint { states } => {
let count_info = task_table.count().await;
let count_info = format!("created: {}, ready: {}, submit: {}, pause: {}, run: {}, complete: {}, killed: {}.",
count_info.get(&State::Created).unwrap_or(&0),
count_info.get(&State::Ready).unwrap_or(&0),
count_info.get(&State::Submit).unwrap_or(&0),
Expand All @@ -50,85 +58,90 @@ pub async fn handle(
count_info.get(&State::Terminated(0)).unwrap_or(&0),
count_info.get(&State::Terminated(-1)).unwrap_or(&0),
);
let tasks = task_table.filter_by_states(states).await;
let task_table = task::Table::from_mapping(tasks);
let resp_msg = XMessage::BulkMessage(format!(
"{}\n\n{}\n",
task_table.render().await,
count_info,
));
framed_writer.send(resp_msg).await?;
}
let tasks = task_table.filter_by_states(states).await;
let task_table = task::Table::from_mapping(tasks);
let resp_msg = XMessage::BulkMessage(format!(
"{}\n\n{}",
task_table.render().await,
count_info,
));
framed_writer.send(resp_msg).await?;
}

// Signal direction - src: actioner, dst: coordinator
// Handle signal n/a -> Created
XMessage::ActionerOp(Operation::AddTask(record_id)) => {
// TODO: need to check if the task exist
// TODO: priority passed from operation
let task_ = Task::new(0, &record_id);
let id = task_table.create(task_.clone()).await;

// Signal direction - src: actioner, dst: coordinator
// Handle signal n/a -> Created
XMessage::ActionerOp(Operation::AddTask(n)) => {
// TODO: need to check if the task exist
for _ in 0..n {
let task_ = Task::new(0);
task_table.create(task_.clone()).await;
// send resp to actioner
let resp_msg = XMessage::BulkMessage(format!(
"Add task id={id}, map to task record_id={record_id} to run."
));
framed_writer.send(resp_msg).await?;
}
// Signal direction - src: actioner, dst: coordinator
// Handle signal x -> Ready
XMessage::ActionerOp(Operation::PlayTask(id)) => {
// TODO: need to check init state is able to be played
let task_ = task_table.read(&id).await;
if let Some(mut task_) = task_ {
task_.state = task::State::Ready;
task_table.update(&id, task_).await?;
}

let resp_msg = XMessage::BulkMessage(format!("Add {n} tasks to run.\n",));
framed_writer.send(resp_msg).await?;
}
// Signal direction - src: actioner, dst: coordinator
// Handle signal x -> Ready
XMessage::ActionerOp(Operation::PlayTask(id)) => {
// TODO: need to check init state is able to be played
let task_ = task_table.read(&id).await;
if let Some(mut task_) = task_ {
task_.state = task::State::Ready;
task_table.update(&id, task_).await?;
let resp_msg = XMessage::BulkMessage(format!("Launching task uuid={id}.",));
framed_writer.send(resp_msg).await?;
}
// Signal direction - src: actioner, dst: coordinator
// Handle signal all pause/created x -> Ready
XMessage::ActionerOp(Operation::PlayAllTask) => {
// TODO: also include pause state to resume
let resumable_tasks = task_table
.filter_by_states(vec![task::State::Created])
.await;

let resp_msg = XMessage::BulkMessage(format!("Launching task uuid={id}.\n",));
framed_writer.send(resp_msg).await?;
}
// Signal direction - src: actioner, dst: coordinator
// Handle signal all pause/created x -> Ready
XMessage::ActionerOp(Operation::PlayAllTask) => {
// TODO: also include pause state to resume
let resumable_tasks = task_table
.filter_by_states(vec![task::State::Created])
.await;

for (task_id, _) in resumable_tasks {
let Some(mut task_) = task_table.read(&task_id).await else {
continue;
};
// XXX: check, is cloned?? so the old_state is different from after changed
let old_state = task_.state;

task_.state = task::State::Ready;
task_table.update(&task_id, task_).await?;
println!(
"Play task {task_id}: {} -> {}",
old_state,
task::State::Ready
);
for (task_id, _) in resumable_tasks {
let Some(mut task_) = task_table.read(&task_id).await else {
continue;
};
// XXX: check, is cloned?? so the old_state is different from after changed
let old_state = task_.state;

task_.state = task::State::Ready;
task_table.update(&task_id, task_).await?;
println!(
"Play task {task_id}: {} -> {}",
old_state,
task::State::Ready
);
}
}
}
// Signal direction - src: actioner, dst: coordinator
// Handle signal x -> Terminated(-1)
XMessage::ActionerOp(Operation::KillTask(id)) => {
let task_ = task_table.read(&id).await;
if let Some(mut task_) = task_ {
task_.state = task::State::Terminated(-1);
task_table.update(&id, task_).await?;
// Signal direction - src: actioner, dst: coordinator
// Handle signal x -> Terminated(-1)
XMessage::ActionerOp(Operation::KillTask(id)) => {
let task_ = task_table.read(&id).await;
if let Some(mut task_) = task_ {
task_.state = task::State::Terminated(-1);
task_table.update(&id, task_).await?;

// TODO: also sending a cancelling signal to the runnning task on worker

// TODO: also sending a cancelling signal to the runnning task on worker
let resp_msg = XMessage::BulkMessage(format!("Kill task uuid={id}.\n",));
framed_writer.send(resp_msg).await?;
}
}

let resp_msg = XMessage::BulkMessage(format!("Kill task uuid={id}.\n",));
// boss is asking nonsense
_ => {
let resp_msg = XMessage::BulkMessage(format!(
"Shutup, I try to ignore you, since you say '{msg:#?}'"
));
framed_writer.send(resp_msg).await?;
}
}
_ => {
let resp_msg = XMessage::BulkMessage(format!(
"Shutup, I try to ignore you, since you say '{msg:#?}'"
));
framed_writer.send(resp_msg).await?;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/assign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub async fn assign(worker_table: WorkerTable, task_table: TaskTable) -> anyhow:
continue;
};

if let Err(e) = worker.launch_task(&task_id).await {
if let Err(e) = worker.launch_task(&task_id, &task).await {
eprintln!("Failed to send message: {e}");
continue;
}
Expand Down
Loading

0 comments on commit 5f96a64

Please sign in to comment.