Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(task_db): sqlite and in memory abstraction #301

Merged
merged 12 commits into from
Jul 2, 2024
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ tokio = { version = "^1.23", features = ["full"] }
hyper = { version = "0.14.27", features = ["server"] }
reqwest = { version = "0.11.22", features = ["json"] }
url = "2.5.0"
async-trait = "0.1.80"

# crypto
c-kzg = { git = "https://github.com/brechtpd/c-kzg-4844", branch = "for-alpha7", default-features = false, features = [
Expand Down
93 changes: 48 additions & 45 deletions host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,17 @@ impl ProverState {
};
let proof_result: HostResult<ProofResponse> = async move {
{
let manager_binding = get_task_manager(task_manager_opts);
let mut manager = manager_binding.lock().unwrap();
manager.update_task_progress(
chain_id,
blockhash,
proof_request.proof_type,
Some(proof_request.prover.to_string()),
TaskStatus::WorkInProgress,
None,
)?;
let mut manager = get_task_manager(task_manager_opts);
manager
.update_task_progress(
chain_id,
blockhash,
proof_request.proof_type,
Some(proof_request.prover.to_string()),
TaskStatus::WorkInProgress,
None,
)
.await?;
}
handle_proof(&proof_request_clone, &opts_clone, &chain_specs_clone).await
}
Expand All @@ -199,47 +200,49 @@ impl ProverState {
let _: HostResult<()> = async move {
let proof = proof.proof.unwrap();
let proof = proof.as_bytes();
let manager_binding = get_task_manager(task_manager_opts);
let mut manager = manager_binding.lock().unwrap();
manager.update_task_progress(
chain_id,
blockhash,
proof_request.proof_type,
Some(proof_request.prover.to_string()),
TaskStatus::WorkInProgress,
Some(proof),
)?;
let mut manager = get_task_manager(task_manager_opts);
manager
.update_task_progress(
chain_id,
blockhash,
proof_request.proof_type,
Some(proof_request.prover.to_string()),
TaskStatus::WorkInProgress,
Some(proof),
)
.await?;
Ok(())
}
.await;
}
Err(error) => {
let _: HostResult<()> = async move {
let manager_binding = get_task_manager(task_manager_opts);
let mut manager = manager_binding.lock().unwrap();
manager.update_task_progress(
chain_id,
blockhash,
proof_request.proof_type,
Some(proof_request.prover.to_string()),
match error {
HostError::HandleDropped
| HostError::CapacityFull
| HostError::JoinHandle(_)
| HostError::InvalidAddress(_)
| HostError::InvalidRequestConfig(_) => unreachable!(),
HostError::Conversion(_)
| HostError::Serde(_)
| HostError::Core(_)
| HostError::Anyhow(_)
| HostError::FeatureNotSupportedError(_)
| HostError::Io(_) => TaskStatus::UnspecifiedFailureReason,
HostError::RPC(_) => TaskStatus::NetworkFailure,
HostError::Guest(_) => TaskStatus::ProofFailure_Generic,
HostError::TaskManager(_) => TaskStatus::SqlDbCorruption,
},
None,
)?;
let mut manager = get_task_manager(task_manager_opts);
manager
.update_task_progress(
chain_id,
blockhash,
proof_request.proof_type,
Some(proof_request.prover.to_string()),
match error {
HostError::HandleDropped
| HostError::CapacityFull
| HostError::JoinHandle(_)
| HostError::InvalidAddress(_)
| HostError::InvalidRequestConfig(_) => unreachable!(),
HostError::Conversion(_)
| HostError::Serde(_)
| HostError::Core(_)
| HostError::Anyhow(_)
| HostError::FeatureNotSupportedError(_)
| HostError::Io(_) => TaskStatus::UnspecifiedFailureReason,
HostError::RPC(_) => TaskStatus::NetworkFailure,
HostError::Guest(_) => TaskStatus::ProofFailure_Generic,
HostError::TaskManager(_) => TaskStatus::SqlDbCorruption,
},
None,
)
.await?;
Ok(())
}
.await;
Expand Down
47 changes: 26 additions & 21 deletions host/src/server/api/v2/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,18 @@ async fn proof_handler(
)
.await?;

let manager_binding = get_task_manager(&TaskManagerOpts {
let mut manager = get_task_manager(&TaskManagerOpts {
sqlite_file: prover_state.opts.sqlite_file.clone(),
max_db_size: prover_state.opts.max_db_size,
});
let mut manager = manager_binding.lock().unwrap();
let status = manager.get_task_proving_status(
chain_id,
block_hash,
proof_request.proof_type,
Some(proof_request.prover.to_string()),
)?;
let status = manager
.get_task_proving_status(
chain_id,
block_hash,
proof_request.proof_type,
Some(proof_request.prover.to_string()),
)
.await?;

if status.is_empty() {
prover_state.task_channel.try_send((
Expand All @@ -77,25 +78,29 @@ async fn proof_handler(
prover_state.chain_specs,
))?;

manager.enqueue_task(&EnqueueTaskParams {
chain_id,
blockhash: block_hash,
proof_system: proof_request.proof_type,
prover: proof_request.prover.to_string(),
block_number: proof_request.block_number,
})?;
manager
.enqueue_task(&EnqueueTaskParams {
chain_id,
blockhash: block_hash,
proof_type: proof_request.proof_type,
prover: proof_request.prover.to_string(),
block_number: proof_request.block_number,
})
.await?;
return Ok(Json(serde_json::json!("{}")));
}

let status = status.first().unwrap().0;

if matches!(status, TaskStatus::Success) {
let proof = manager.get_task_proof(
chain_id,
block_hash,
proof_request.proof_type,
Some(proof_request.prover.to_string()),
)?;
let proof = manager
.get_task_proof(
chain_id,
block_hash,
proof_request.proof_type,
Some(proof_request.prover.to_string()),
)
.await?;

let response = ProofResponse {
proof: Some(String::from_utf8(proof).unwrap()),
Expand Down
8 changes: 7 additions & 1 deletion task_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@ serde_json = { workspace = true }
hex = { workspace = true }
tracing = { workspace = true }
anyhow = { workspace = true }
tokio = { workspace = true }
async-trait = { workspace = true }

[dev-dependencies]
rand = "0.9.0-alpha.1" # This is an alpha version, that has rng.gen_iter::<T>()
rand_chacha = "0.9.0-alpha.1"
tempfile = "3.10.1"
alloy-primitives = { workspace = true, features = ["getrandom"] }

rusqlite = { workspace = true, features = ["trace"] }

[features]
default = ["in-memory"]
sqlite = []
in-memory = []

[[test]]
name = "task_manager_tests"
path = "tests/main.rs"
Loading
Loading