Skip to content

Commit

Permalink
feat(proof-data-handler): exclude batches without object file in GCS
Browse files Browse the repository at this point in the history
/tee/proof_inputs endpoint no longer returns batches that have no
corresponding object file in Google Cloud Storage for an extended
period.

Since the recent `mainnet`'s `24.25.0` redeployment, we've been
[flooded with warnings][warnings] for the `proof-data-handler` on
`mainnet` (the warnings are actually _not_ fatal in this context):

```
Failed request with a fatal error

(...)

Blobs for batch numbers 490520 to 490555 not found in the object store.
Marked as unpicked.
```

The issue was caused [by the code][code] behind the `/tee/proof_inputs`
[endpoint][endpoint_proof_inputs] (which is equivalent to the
`/proof_generation_data` [endpoint][endpoint_proof_generation_data]) –
it finds the next batch to send to the [requesting][requesting]
`tee-prover` by looking for the first batch that has a corresponding
object in the Google object store. As it skips over batches that don’t
have the objects, [it logs][logging] `Failed request with a fatal error`
for each one (unless the skipped batch was successfully proven, in which
case it doesn’t log the error). This happens with every
[request][request] the `tee-prover` sends, which is why we were getting
so much noise in the logs.

One possible solution was to manually flag the problematic batches as
`permanently_ignored`, like Thomas [did before][Thomas] on `mainnet`.
It was a quick and dirty workaround, but now we have a more automated
solution.

[warnings]: https://grafana.matterlabs.dev/goto/TjlaXQgHg?orgId=1
[code]: https://github.com/matter-labs/zksync-era/blob/3f406c7d0c0e76d798c2d838abde57ca692822c0/core/node/proof_data_handler/src/tee_request_processor.rs#L35-L79
[endpoint_proof_inputs]: https://github.com/matter-labs/zksync-era/blob/3f406c7d0c0e76d798c2d838abde57ca692822c0/core/node/proof_data_handler/src/lib.rs#L96
[endpoint_proof_generation_data]: https://github.com/matter-labs/zksync-era/blob/3f406c7d0c0e76d798c2d838abde57ca692822c0/core/node/proof_data_handler/src/lib.rs#L67
[requesting]: https://github.com/matter-labs/zksync-era/blob/3f406c7d0c0e76d798c2d838abde57ca692822c0/core/bin/zksync_tee_prover/src/tee_prover.rs#L93
[logging]: https://github.com/matter-labs/zksync-era/blob/3f406c7d0c0e76d798c2d838abde57ca692822c0/core/lib/object_store/src/retries.rs#L56
[Thomas]: https://matter-labs-workspace.slack.com/archives/C05ANUCGCKV/p1725284962312929
  • Loading branch information
pbeza committed Oct 30, 2024
1 parent 8db7e93 commit b3cb2a3
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 29 deletions.
1 change: 0 additions & 1 deletion core/lib/basic_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pub mod prover_dal;
pub mod pubdata_da;
pub mod secrets;
pub mod settlement;
pub mod tee_types;
pub mod url;
pub mod vm;
pub mod web3;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- there were manually added tee_proof_generation_details with status 'permanently_ignore'
UPDATE tee_proof_generation_details SET status = 'permanently_ignored' WHERE status = 'permanently_ignore';
16 changes: 16 additions & 0 deletions core/lib/dal/src/models/storage_tee_proof.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chrono::NaiveDateTime;
use zksync_types::{tee_types::LockedBatch, L1BatchNumber};

#[derive(Debug, Clone, sqlx::FromRow)]
pub struct StorageTeeProof {
Expand All @@ -8,3 +9,18 @@ pub struct StorageTeeProof {
pub updated_at: NaiveDateTime,
pub attestation: Option<Vec<u8>>,
}

#[derive(Debug, Clone, sqlx::FromRow)]
pub struct StorageLockedBatch {
pub l1_batch_number: i64,
pub created_at: NaiveDateTime,
}

impl From<StorageLockedBatch> for LockedBatch {
fn from(tx: StorageLockedBatch) -> LockedBatch {
LockedBatch {
l1_batch_number: L1BatchNumber::from(tx.l1_batch_number as u32),
created_at: tx.created_at,
}
}
}
32 changes: 22 additions & 10 deletions core/lib/dal/src/tee_proof_generation_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,31 @@ use zksync_db_connection::{
instrument::{InstrumentExt, Instrumented},
utils::pg_interval_from_duration,
};
use zksync_types::{tee_types::TeeType, L1BatchNumber};
use zksync_types::{
tee_types::{LockedBatch, TeeType},
L1BatchNumber,
};

use crate::{models::storage_tee_proof::StorageTeeProof, Core};
use crate::{
models::storage_tee_proof::{StorageLockedBatch, StorageTeeProof},
Core,
};

#[derive(Debug)]
pub struct TeeProofGenerationDal<'a, 'c> {
pub(crate) storage: &'a mut Connection<'c, Core>,
}

#[derive(Debug, EnumString, Display)]
enum TeeProofGenerationJobStatus {
pub enum TeeProofGenerationJobStatus {
#[strum(serialize = "unpicked")]
Unpicked,
#[strum(serialize = "picked_by_prover")]
PickedByProver,
#[strum(serialize = "generated")]
Generated,
#[strum(serialize = "permanently_ignored")]
PermanentlyIgnored,
}

impl TeeProofGenerationDal<'_, '_> {
Expand All @@ -33,10 +41,12 @@ impl TeeProofGenerationDal<'_, '_> {
tee_type: TeeType,
processing_timeout: Duration,
min_batch_number: L1BatchNumber,
) -> DalResult<Option<L1BatchNumber>> {
) -> DalResult<Option<LockedBatch>> {
let processing_timeout = pg_interval_from_duration(processing_timeout);
let min_batch_number = i64::from(min_batch_number.0);
sqlx::query!(

sqlx::query_as!(
StorageLockedBatch,
r#"
WITH upsert AS (
SELECT
Expand Down Expand Up @@ -66,7 +76,7 @@ impl TeeProofGenerationDal<'_, '_> {
)
FETCH FIRST ROW ONLY
)
INSERT INTO
tee_proof_generation_details (
l1_batch_number, tee_type, status, created_at, updated_at, prover_taken_at
Expand All @@ -87,7 +97,8 @@ impl TeeProofGenerationDal<'_, '_> {
updated_at = NOW(),
prover_taken_at = NOW()
RETURNING
l1_batch_number
l1_batch_number,
created_at
"#,
tee_type.to_string(),
TeeProofGenerationJobStatus::PickedByProver.to_string(),
Expand All @@ -100,14 +111,15 @@ impl TeeProofGenerationDal<'_, '_> {
.with_arg("processing_timeout", &processing_timeout)
.with_arg("l1_batch_number", &min_batch_number)
.fetch_optional(self.storage)
.await
.map(|record| record.map(|record| L1BatchNumber(record.l1_batch_number as u32)))
.await?
.map(Into::into)
}

pub async fn unlock_batch(
&mut self,
l1_batch_number: L1BatchNumber,
tee_type: TeeType,
status: TeeProofGenerationJobStatus,
) -> DalResult<()> {
let batch_number = i64::from(l1_batch_number.0);
sqlx::query!(
Expand All @@ -120,7 +132,7 @@ impl TeeProofGenerationDal<'_, '_> {
l1_batch_number = $2
AND tee_type = $3
"#,
TeeProofGenerationJobStatus::Unpicked.to_string(),
status.to_string(),
batch_number,
tee_type.to_string()
)
Expand Down
13 changes: 12 additions & 1 deletion core/lib/object_store/src/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ impl Request<'_> {
self,
store: &impl fmt::Debug,
max_retries: u16,
f: F,
) -> Result<T, ObjectStoreError>
where
Fut: Future<Output = Result<T, ObjectStoreError>>,
F: FnMut() -> Fut,
{
self.retry_internal(max_retries, f).await
}

async fn retry_internal<T, Fut, F>(
&self,
max_retries: u16,
mut f: F,
) -> Result<T, ObjectStoreError>
where
Expand All @@ -53,7 +65,6 @@ impl Request<'_> {
backoff_secs *= 2;
}
Err(err) => {
tracing::warn!(%err, "Failed request with a fatal error");
break Err(err);
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/lib/types/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use serde_json::Value;
use serde_with::{hex::Hex, serde_as};
use strum::Display;
use zksync_basic_types::{
tee_types::TeeType,
web3::{AccessList, Bytes, Index},
Bloom, L1BatchNumber, H160, H256, H64, U256, U64,
};
Expand All @@ -16,6 +15,7 @@ pub use crate::transaction_request::{
use crate::{
debug_flat_call::{DebugCallFlat, ResultDebugCallFlat},
protocol_version::L1VerifierConfig,
tee_types::TeeType,
Address, L2BlockNumber, ProtocolVersionId,
};

Expand Down
1 change: 1 addition & 0 deletions core/lib/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub mod protocol_upgrade;
pub mod snapshots;
pub mod storage;
pub mod system_contracts;
pub mod tee_types;
pub mod tokens;
pub mod tx;
pub mod zk_evm_types;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::fmt;

use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use zksync_basic_types::L1BatchNumber;

#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
Expand All @@ -17,6 +19,17 @@ impl fmt::Display for TeeType {
}
}

/// Representation of a locked batch. Used in DAL to fetch details about the locked batch to
/// determine whether it should be flagged as permanently ignored if it has no corresponding file in
/// the object store for an extended period.
#[derive(Clone, Debug)]
pub struct LockedBatch {
/// Locked batch number.
pub l1_batch_number: L1BatchNumber,
/// The creation time of the job for this batch.
pub created_at: NaiveDateTime,
}

#[cfg(test)]
mod tests {
use serde_json;
Expand Down
4 changes: 2 additions & 2 deletions core/node/proof_data_handler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ axum.workspace = true
tokio.workspace = true
tower-http = { workspace = true, features = ["compression-zstd", "decompression-zstd"] }
tracing.workspace = true
chrono.workspace = true

[dev-dependencies]
hyper.workspace = true
chrono.workspace = true
zksync_multivm.workspace = true
serde_json.workspace = true
tower.workspace = true
zksync_basic_types.workspace = true
zksync_contracts.workspace = true
zksync_basic_types.workspace = true
49 changes: 35 additions & 14 deletions core/node/proof_data_handler/src/tee_request_processor.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::sync::Arc;

use axum::{extract::Path, Json};
use chrono::{Duration as ChronoDuration, TimeZone, Utc};
use zksync_config::configs::ProofDataHandlerConfig;
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_dal::{
tee_proof_generation_dal::TeeProofGenerationJobStatus, ConnectionPool, Core, CoreDal,
};
use zksync_object_store::{ObjectStore, ObjectStoreError};
use zksync_prover_interface::{
api::{
Expand All @@ -13,7 +16,10 @@ use zksync_prover_interface::{
TeeVerifierInput, V1TeeVerifierInput, VMRunWitnessInputData, WitnessInputMerklePaths,
},
};
use zksync_types::{tee_types::TeeType, L1BatchNumber, L2ChainId};
use zksync_types::{
tee_types::{LockedBatch, TeeType},
L1BatchNumber, L2ChainId,
};
use zksync_vm_executor::storage::L1BatchParamsProvider;

use crate::errors::RequestProcessorError;
Expand Down Expand Up @@ -47,43 +53,57 @@ impl TeeRequestProcessor {
) -> Result<Option<Json<TeeProofGenerationDataResponse>>, RequestProcessorError> {
tracing::info!("Received request for proof generation data: {:?}", request);

let batch_ignored_timeout = ChronoDuration::days(10);
let mut min_batch_number = self.config.tee_config.first_tee_processed_batch;
let mut missing_range: Option<(L1BatchNumber, L1BatchNumber)> = None;

let result = loop {
let Some(l1_batch_number) = self
let Some(locked_batch) = self
.lock_batch_for_proving(request.tee_type, min_batch_number)
.await?
else {
// No job available
return Ok(None);
return Ok(None); // no job available
};
let batch_number = locked_batch.l1_batch_number;

match self
.tee_verifier_input_for_existing_batch(l1_batch_number)
.tee_verifier_input_for_existing_batch(batch_number)
.await
{
Ok(input) => {
break Ok(Some(Json(TeeProofGenerationDataResponse(Box::new(input)))));
}
Err(RequestProcessorError::ObjectStore(ObjectStoreError::KeyNotFound(_))) => {
missing_range = match missing_range {
Some((start, _)) => Some((start, l1_batch_number)),
None => Some((l1_batch_number, l1_batch_number)),
Some((start, _)) => Some((start, batch_number)),
None => Some((batch_number, batch_number)),
};
let datetime_utc = Utc.from_utc_datetime(&locked_batch.created_at);
let duration = Utc::now().signed_duration_since(datetime_utc);
let status = if duration > batch_ignored_timeout {
TeeProofGenerationJobStatus::PermanentlyIgnored
} else {
TeeProofGenerationJobStatus::Unpicked
};
self.unlock_batch(l1_batch_number, request.tee_type).await?;
min_batch_number = l1_batch_number + 1;
self.unlock_batch(batch_number, request.tee_type, status)
.await?;
min_batch_number = Some(min_batch_number.unwrap_or(batch_number) + 1);
}
Err(err) => {
self.unlock_batch(l1_batch_number, request.tee_type).await?;
self.unlock_batch(
batch_number,
request.tee_type,
TeeProofGenerationJobStatus::Unpicked,
)
.await?;
break Err(err);
}
}
};

if let Some((start, end)) = missing_range {
tracing::warn!(
"Blobs for batch numbers {} to {} not found in the object store. Marked as unpicked.",
"Blobs for batch numbers {} to {} not found in the object store. Marked as unpicked or permanently ignored.",
start,
end
);
Expand Down Expand Up @@ -157,7 +177,7 @@ impl TeeRequestProcessor {
&self,
tee_type: TeeType,
min_batch_number: L1BatchNumber,
) -> Result<Option<L1BatchNumber>, RequestProcessorError> {
) -> Result<Option<LockedBatch>, RequestProcessorError> {
self.pool
.connection_tagged("tee_request_processor")
.await?
Expand All @@ -175,12 +195,13 @@ impl TeeRequestProcessor {
&self,
l1_batch_number: L1BatchNumber,
tee_type: TeeType,
status: TeeProofGenerationJobStatus,
) -> Result<(), RequestProcessorError> {
self.pool
.connection_tagged("tee_request_processor")
.await?
.tee_proof_generation_dal()
.unlock_batch(l1_batch_number, tee_type)
.unlock_batch(l1_batch_number, tee_type, status)
.await?;
Ok(())
}
Expand Down

0 comments on commit b3cb2a3

Please sign in to comment.