Skip to content

Commit

Permalink
fix(task_db): add async and mutable variables
Browse files Browse the repository at this point in the history
  • Loading branch information
petarvujovic98 committed Jul 1, 2024
1 parent c1536e9 commit c8fd4a6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 49 deletions.
1 change: 1 addition & 0 deletions task_manager/src/adv_sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ impl TaskDb {
}
}

#[async_trait::async_trait]
impl TaskManager for SqliteTaskManager {
fn new(opts: &TaskManagerOpts) -> Self {
static INIT: Once = Once::new();
Expand Down
87 changes: 47 additions & 40 deletions task_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ pub struct TaskManagerWrapper {
manager: TaskManagerInstance,
}

#[async_trait::async_trait]
impl TaskManager for TaskManagerWrapper {
fn new(opts: &TaskManagerOpts) -> Self {
let manager = if cfg!(feature = "sqlite") {
Expand All @@ -213,17 +214,17 @@ impl TaskManager for TaskManagerWrapper {
Self { manager }
}

fn enqueue_task(
async fn enqueue_task(
&mut self,
request: &EnqueueTaskParams,
) -> TaskManagerResult<TaskProvingStatusRecords> {
match &mut self.manager {
TaskManagerInstance::InMemory(ref mut manager) => manager.enqueue_task(request),
TaskManagerInstance::Sqlite(ref mut manager) => manager.enqueue_task(request),
TaskManagerInstance::InMemory(ref mut manager) => manager.enqueue_task(request).await,
TaskManagerInstance::Sqlite(ref mut manager) => manager.enqueue_task(request).await,
}
}

fn update_task_progress(
async fn update_task_progress(
&mut self,
chain_id: ChainId,
blockhash: B256,
Expand All @@ -233,26 +234,20 @@ impl TaskManager for TaskManagerWrapper {
proof: Option<&[u8]>,
) -> TaskManagerResult<()> {
match &mut self.manager {
TaskManagerInstance::InMemory(ref mut manager) => manager.update_task_progress(
chain_id,
blockhash,
proof_system,
prover,
status,
proof,
),
TaskManagerInstance::Sqlite(ref mut manager) => manager.update_task_progress(
chain_id,
blockhash,
proof_system,
prover,
status,
proof,
),
TaskManagerInstance::InMemory(ref mut manager) => {
manager
.update_task_progress(chain_id, blockhash, proof_system, prover, status, proof)
.await
}
TaskManagerInstance::Sqlite(ref mut manager) => {
manager
.update_task_progress(chain_id, blockhash, proof_system, prover, status, proof)
.await
}
}
}

fn get_task_proving_status(
async fn get_task_proving_status(
&mut self,
chain_id: ChainId,
blockhash: B256,
Expand All @@ -261,29 +256,33 @@ impl TaskManager for TaskManagerWrapper {
) -> TaskManagerResult<TaskProvingStatusRecords> {
match &mut self.manager {
TaskManagerInstance::InMemory(ref mut manager) => {
manager.get_task_proving_status(chain_id, blockhash, proof_system, prover)
manager
.get_task_proving_status(chain_id, blockhash, proof_system, prover)
.await
}
TaskManagerInstance::Sqlite(ref mut manager) => {
manager.get_task_proving_status(chain_id, blockhash, proof_system, prover)
manager
.get_task_proving_status(chain_id, blockhash, proof_system, prover)
.await
}
}
}

fn get_task_proving_status_by_id(
async fn get_task_proving_status_by_id(
&mut self,
task_id: u64,
) -> TaskManagerResult<TaskProvingStatusRecords> {
match &mut self.manager {
TaskManagerInstance::InMemory(ref mut manager) => {
manager.get_task_proving_status_by_id(task_id)
manager.get_task_proving_status_by_id(task_id).await
}
TaskManagerInstance::Sqlite(ref mut manager) => {
manager.get_task_proving_status_by_id(task_id)
manager.get_task_proving_status_by_id(task_id).await
}
}
}

fn get_task_proof(
async fn get_task_proof(
&mut self,
chain_id: ChainId,
blockhash: B256,
Expand All @@ -292,32 +291,40 @@ impl TaskManager for TaskManagerWrapper {
) -> TaskManagerResult<Vec<u8>> {
match &mut self.manager {
TaskManagerInstance::InMemory(ref mut manager) => {
manager.get_task_proof(chain_id, blockhash, proof_system, prover)
manager
.get_task_proof(chain_id, blockhash, proof_system, prover)
.await
}
TaskManagerInstance::Sqlite(ref mut manager) => {
manager.get_task_proof(chain_id, blockhash, proof_system, prover)
manager
.get_task_proof(chain_id, blockhash, proof_system, prover)
.await
}
}
}

fn get_task_proof_by_id(&mut self, task_id: u64) -> TaskManagerResult<Vec<u8>> {
async fn get_task_proof_by_id(&mut self, task_id: u64) -> TaskManagerResult<Vec<u8>> {
match &mut self.manager {
TaskManagerInstance::InMemory(ref mut manager) => manager.get_task_proof_by_id(task_id),
TaskManagerInstance::Sqlite(ref mut manager) => manager.get_task_proof_by_id(task_id),
TaskManagerInstance::InMemory(ref mut manager) => {
manager.get_task_proof_by_id(task_id).await
}
TaskManagerInstance::Sqlite(ref mut manager) => {
manager.get_task_proof_by_id(task_id).await
}
}
}

fn get_db_size(&mut self) -> TaskManagerResult<(usize, Vec<(String, usize)>)> {
async fn get_db_size(&mut self) -> TaskManagerResult<(usize, Vec<(String, usize)>)> {
match &mut self.manager {
TaskManagerInstance::InMemory(ref mut manager) => manager.get_db_size(),
TaskManagerInstance::Sqlite(ref mut manager) => manager.get_db_size(),
TaskManagerInstance::InMemory(ref mut manager) => manager.get_db_size().await,
TaskManagerInstance::Sqlite(ref mut manager) => manager.get_db_size().await,
}
}

fn prune_db(&mut self) -> TaskManagerResult<()> {
async fn prune_db(&mut self) -> TaskManagerResult<()> {
match &mut self.manager {
TaskManagerInstance::InMemory(ref mut manager) => manager.prune_db(),
TaskManagerInstance::Sqlite(ref mut manager) => manager.prune_db(),
TaskManagerInstance::InMemory(ref mut manager) => manager.prune_db().await,
TaskManagerInstance::Sqlite(ref mut manager) => manager.prune_db().await,
}
}
}
Expand All @@ -331,8 +338,8 @@ mod test {
use super::*;
use std::path::Path;

#[test]
fn test_new_taskmanager() {
#[tokio::test]
async fn test_new_taskmanager() {
let sqlite_file: &Path = Path::new("test.db");
// remove existed one
if sqlite_file.exists() {
Expand Down
19 changes: 10 additions & 9 deletions task_manager/src/mem_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl InMemoryTaskDb {
}
}

#[async_trait::async_trait]
impl TaskManager for InMemoryTaskManager {
fn new(_opts: &TaskManagerOpts) -> Self {
static INIT: Once = Once::new();
Expand All @@ -212,7 +213,7 @@ impl TaskManager for InMemoryTaskManager {
&mut self,
params: &EnqueueTaskParams,
) -> TaskManagerResult<TaskProvingStatusRecords> {
let db = self.db.lock().await;
let mut db = self.db.lock().await;
if let Ok(proving_status) = db.get_task_proving_status(
params.chain_id,
params.blockhash,
Expand Down Expand Up @@ -241,7 +242,7 @@ impl TaskManager for InMemoryTaskManager {
status: TaskStatus,
proof: Option<&[u8]>,
) -> TaskManagerResult<()> {
let db = self.db.lock().await;
let mut db = self.db.lock().await;
db.update_task_progress(chain_id, blockhash, proof_system, prover, status, proof)?;
Ok(())
}
Expand All @@ -254,16 +255,16 @@ impl TaskManager for InMemoryTaskManager {
proof_system: ProofType,
prover: Option<String>,
) -> TaskManagerResult<TaskProvingStatusRecords> {
let db = self.db.lock().await;
let mut db = self.db.lock().await;
db.get_task_proving_status(chain_id, blockhash, proof_system, prover)
}

/// Returns the latest triplet (submitter or fulfiller, status, last update time)
fn get_task_proving_status_by_id(
async fn get_task_proving_status_by_id(
&mut self,
task_id: u64,
) -> TaskManagerResult<TaskProvingStatusRecords> {
let db = self.db.lock().await;
let mut db = self.db.lock().await;
let proving_status = db.get_task_proving_status_by_id(task_id)?;
Ok(proving_status)
}
Expand All @@ -275,25 +276,25 @@ impl TaskManager for InMemoryTaskManager {
proof_system: ProofType,
prover: Option<String>,
) -> TaskManagerResult<Vec<u8>> {
let db = self.db.lock().await;
let mut db = self.db.lock().await;
let proof = db.get_task_proof(chain_id, blockhash, proof_system, prover)?;
Ok(proof)
}

async fn get_task_proof_by_id(&mut self, task_id: u64) -> TaskManagerResult<Vec<u8>> {
let db = self.db.lock().await;
let mut db = self.db.lock().await;
let proof = db.get_task_proof_by_id(task_id)?;
Ok(proof)
}

/// Returns the total and detailed database size
async fn get_db_size(&mut self) -> TaskManagerResult<(usize, Vec<(String, usize)>)> {
let db = self.db.lock().await;
let mut db = self.db.lock().await;
db.size()
}

async fn prune_db(&mut self) -> TaskManagerResult<()> {
let db = self.db.lock().await;
let mut db = self.db.lock().await;
db.prune()
}
}
Expand Down

0 comments on commit c8fd4a6

Please sign in to comment.