From c8fd4a66cd03c53decc3d1f773ba39e2318e1469 Mon Sep 17 00:00:00 2001 From: Petar Vujovic Date: Mon, 1 Jul 2024 14:14:36 +0200 Subject: [PATCH] fix(task_db): add async and mutable variables --- task_manager/src/adv_sqlite.rs | 1 + task_manager/src/lib.rs | 87 ++++++++++++++++++---------------- task_manager/src/mem_db.rs | 19 ++++---- 3 files changed, 58 insertions(+), 49 deletions(-) diff --git a/task_manager/src/adv_sqlite.rs b/task_manager/src/adv_sqlite.rs index ef3c48e9f..25cd215a4 100644 --- a/task_manager/src/adv_sqlite.rs +++ b/task_manager/src/adv_sqlite.rs @@ -747,6 +747,7 @@ impl TaskDb { } } +#[async_trait::async_trait] impl TaskManager for SqliteTaskManager { fn new(opts: &TaskManagerOpts) -> Self { static INIT: Once = Once::new(); diff --git a/task_manager/src/lib.rs b/task_manager/src/lib.rs index ea086b28d..5f3cb8adb 100644 --- a/task_manager/src/lib.rs +++ b/task_manager/src/lib.rs @@ -202,6 +202,7 @@ pub struct TaskManagerWrapper { manager: TaskManagerInstance, } +#[async_trait::async_trait] impl TaskManager for TaskManagerWrapper { fn new(opts: &TaskManagerOpts) -> Self { let manager = if cfg!(feature = "sqlite") { @@ -213,17 +214,17 @@ impl TaskManager for TaskManagerWrapper { Self { manager } } - fn enqueue_task( + async fn enqueue_task( &mut self, request: &EnqueueTaskParams, ) -> TaskManagerResult { match &mut self.manager { - TaskManagerInstance::InMemory(ref mut manager) => manager.enqueue_task(request), - TaskManagerInstance::Sqlite(ref mut manager) => manager.enqueue_task(request), + TaskManagerInstance::InMemory(ref mut manager) => manager.enqueue_task(request).await, + TaskManagerInstance::Sqlite(ref mut manager) => manager.enqueue_task(request).await, } } - fn update_task_progress( + async fn update_task_progress( &mut self, chain_id: ChainId, blockhash: B256, @@ -233,26 +234,20 @@ impl TaskManager for TaskManagerWrapper { proof: Option<&[u8]>, ) -> TaskManagerResult<()> { match &mut self.manager { - TaskManagerInstance::InMemory(ref mut manager) => manager.update_task_progress( - chain_id, - blockhash, - proof_system, - prover, - status, - proof, - ), - TaskManagerInstance::Sqlite(ref mut manager) => manager.update_task_progress( - chain_id, - blockhash, - proof_system, - prover, - status, - proof, - ), + TaskManagerInstance::InMemory(ref mut manager) => { + manager + .update_task_progress(chain_id, blockhash, proof_system, prover, status, proof) + .await + } + TaskManagerInstance::Sqlite(ref mut manager) => { + manager + .update_task_progress(chain_id, blockhash, proof_system, prover, status, proof) + .await + } } } - fn get_task_proving_status( + async fn get_task_proving_status( &mut self, chain_id: ChainId, blockhash: B256, @@ -261,29 +256,33 @@ impl TaskManager for TaskManagerWrapper { ) -> TaskManagerResult { match &mut self.manager { TaskManagerInstance::InMemory(ref mut manager) => { - manager.get_task_proving_status(chain_id, blockhash, proof_system, prover) + manager + .get_task_proving_status(chain_id, blockhash, proof_system, prover) + .await } TaskManagerInstance::Sqlite(ref mut manager) => { - manager.get_task_proving_status(chain_id, blockhash, proof_system, prover) + manager + .get_task_proving_status(chain_id, blockhash, proof_system, prover) + .await } } } - fn get_task_proving_status_by_id( + async fn get_task_proving_status_by_id( &mut self, task_id: u64, ) -> TaskManagerResult { match &mut self.manager { TaskManagerInstance::InMemory(ref mut manager) => { - manager.get_task_proving_status_by_id(task_id) + manager.get_task_proving_status_by_id(task_id).await } TaskManagerInstance::Sqlite(ref mut manager) => { - manager.get_task_proving_status_by_id(task_id) + manager.get_task_proving_status_by_id(task_id).await } } } - fn get_task_proof( + async fn get_task_proof( &mut self, chain_id: ChainId, blockhash: B256, @@ -292,32 +291,40 @@ impl TaskManager for TaskManagerWrapper { ) -> TaskManagerResult> { match &mut self.manager { TaskManagerInstance::InMemory(ref mut manager) => { - manager.get_task_proof(chain_id, blockhash, proof_system, prover) + manager + .get_task_proof(chain_id, blockhash, proof_system, prover) + .await } TaskManagerInstance::Sqlite(ref mut manager) => { - manager.get_task_proof(chain_id, blockhash, proof_system, prover) + manager + .get_task_proof(chain_id, blockhash, proof_system, prover) + .await } } } - fn get_task_proof_by_id(&mut self, task_id: u64) -> TaskManagerResult> { + async fn get_task_proof_by_id(&mut self, task_id: u64) -> TaskManagerResult> { match &mut self.manager { - TaskManagerInstance::InMemory(ref mut manager) => manager.get_task_proof_by_id(task_id), - TaskManagerInstance::Sqlite(ref mut manager) => manager.get_task_proof_by_id(task_id), + TaskManagerInstance::InMemory(ref mut manager) => { + manager.get_task_proof_by_id(task_id).await + } + TaskManagerInstance::Sqlite(ref mut manager) => { + manager.get_task_proof_by_id(task_id).await + } } } - fn get_db_size(&mut self) -> TaskManagerResult<(usize, Vec<(String, usize)>)> { + async fn get_db_size(&mut self) -> TaskManagerResult<(usize, Vec<(String, usize)>)> { match &mut self.manager { - TaskManagerInstance::InMemory(ref mut manager) => manager.get_db_size(), - TaskManagerInstance::Sqlite(ref mut manager) => manager.get_db_size(), + TaskManagerInstance::InMemory(ref mut manager) => manager.get_db_size().await, + TaskManagerInstance::Sqlite(ref mut manager) => manager.get_db_size().await, } } - fn prune_db(&mut self) -> TaskManagerResult<()> { + async fn prune_db(&mut self) -> TaskManagerResult<()> { match &mut self.manager { - TaskManagerInstance::InMemory(ref mut manager) => manager.prune_db(), - TaskManagerInstance::Sqlite(ref mut manager) => manager.prune_db(), + TaskManagerInstance::InMemory(ref mut manager) => manager.prune_db().await, + TaskManagerInstance::Sqlite(ref mut manager) => manager.prune_db().await, } } } @@ -331,8 +338,8 @@ mod test { use super::*; use std::path::Path; - #[test] - fn test_new_taskmanager() { + #[tokio::test] + async fn test_new_taskmanager() { let sqlite_file: &Path = Path::new("test.db"); // remove existed one if sqlite_file.exists() { diff --git a/task_manager/src/mem_db.rs b/task_manager/src/mem_db.rs index 9ee698d64..4b3287287 100644 --- a/task_manager/src/mem_db.rs +++ b/task_manager/src/mem_db.rs @@ -190,6 +190,7 @@ impl InMemoryTaskDb { } } +#[async_trait::async_trait] impl TaskManager for InMemoryTaskManager { fn new(_opts: &TaskManagerOpts) -> Self { static INIT: Once = Once::new(); @@ -212,7 +213,7 @@ impl TaskManager for InMemoryTaskManager { &mut self, params: &EnqueueTaskParams, ) -> TaskManagerResult { - let db = self.db.lock().await; + let mut db = self.db.lock().await; if let Ok(proving_status) = db.get_task_proving_status( params.chain_id, params.blockhash, @@ -241,7 +242,7 @@ impl TaskManager for InMemoryTaskManager { status: TaskStatus, proof: Option<&[u8]>, ) -> TaskManagerResult<()> { - let db = self.db.lock().await; + let mut db = self.db.lock().await; db.update_task_progress(chain_id, blockhash, proof_system, prover, status, proof)?; Ok(()) } @@ -254,16 +255,16 @@ impl TaskManager for InMemoryTaskManager { proof_system: ProofType, prover: Option, ) -> TaskManagerResult { - let db = self.db.lock().await; + let mut db = self.db.lock().await; db.get_task_proving_status(chain_id, blockhash, proof_system, prover) } /// Returns the latest triplet (submitter or fulfiller, status, last update time) - fn get_task_proving_status_by_id( + async fn get_task_proving_status_by_id( &mut self, task_id: u64, ) -> TaskManagerResult { - let db = self.db.lock().await; + let mut db = self.db.lock().await; let proving_status = db.get_task_proving_status_by_id(task_id)?; Ok(proving_status) } @@ -275,25 +276,25 @@ impl TaskManager for InMemoryTaskManager { proof_system: ProofType, prover: Option, ) -> TaskManagerResult> { - let db = self.db.lock().await; + let mut db = self.db.lock().await; let proof = db.get_task_proof(chain_id, blockhash, proof_system, prover)?; Ok(proof) } async fn get_task_proof_by_id(&mut self, task_id: u64) -> TaskManagerResult> { - let db = self.db.lock().await; + let mut db = self.db.lock().await; let proof = db.get_task_proof_by_id(task_id)?; Ok(proof) } /// Returns the total and detailed database size async fn get_db_size(&mut self) -> TaskManagerResult<(usize, Vec<(String, usize)>)> { - let db = self.db.lock().await; + let mut db = self.db.lock().await; db.size() } async fn prune_db(&mut self) -> TaskManagerResult<()> { - let db = self.db.lock().await; + let mut db = self.db.lock().await; db.prune() } }