diff --git a/Cargo.lock b/Cargo.lock index 6c6a8d96123f..f60faf9fdf96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8050,6 +8050,7 @@ dependencies = [ "rand 0.8.5", "tokio", "tracing", + "vise", "zksync_config", "zksync_contracts", "zksync_dal", @@ -8057,6 +8058,7 @@ dependencies = [ "zksync_external_price_api", "zksync_node_fee_model", "zksync_types", + "zksync_utils", ] [[package]] diff --git a/core/lib/config/src/configs/base_token_adjuster.rs b/core/lib/config/src/configs/base_token_adjuster.rs index 0ae451a62d9c..c8a0fe6312e3 100644 --- a/core/lib/config/src/configs/base_token_adjuster.rs +++ b/core/lib/config/src/configs/base_token_adjuster.rs @@ -26,6 +26,12 @@ const DEFAULT_L1_TX_SENDING_MAX_ATTEMPTS: u32 = 3; /// Default number of milliseconds to sleep between receipt checking attempts const DEFAULT_L1_RECEIPT_CHECKING_SLEEP_MS: u64 = 30_000; +/// Default maximum number of attempts to fetch price from a remote API +const DEFAULT_PRICE_FETCHING_MAX_ATTEMPTS: u32 = 3; + +/// Default number of milliseconds to sleep between price fetching attempts +const DEFAULT_PRICE_FETCHING_SLEEP_MS: u64 = 5_000; + /// Default number of milliseconds to sleep between transaction sending attempts const DEFAULT_L1_TX_SENDING_SLEEP_MS: u64 = 30_000; @@ -73,6 +79,14 @@ pub struct BaseTokenAdjusterConfig { #[serde(default = "BaseTokenAdjusterConfig::default_l1_tx_sending_sleep_ms")] pub l1_tx_sending_sleep_ms: u64, + /// Maximum number of attempts to fetch quote from a remote API before failing over + #[serde(default = "BaseTokenAdjusterConfig::default_price_fetching_max_attempts")] + pub price_fetching_max_attempts: u32, + + /// Number of seconds to sleep between price fetching attempts + #[serde(default = "BaseTokenAdjusterConfig::default_price_fetching_sleep_ms")] + pub price_fetching_sleep_ms: u64, + /// Defines whether base_token_adjuster should halt the process if there was an error while /// fetching or persisting the quote. Generally that should be set to false to not to halt /// the server process if an external api is not available or if L1 is congested. @@ -93,6 +107,8 @@ impl Default for BaseTokenAdjusterConfig { l1_receipt_checking_sleep_ms: Self::default_l1_receipt_checking_sleep_ms(), l1_tx_sending_max_attempts: Self::default_l1_tx_sending_max_attempts(), l1_tx_sending_sleep_ms: Self::default_l1_tx_sending_sleep_ms(), + price_fetching_sleep_ms: Self::default_price_fetching_sleep_ms(), + price_fetching_max_attempts: Self::default_price_fetching_max_attempts(), halt_on_error: Self::default_halt_on_error(), } } @@ -135,6 +151,10 @@ impl BaseTokenAdjusterConfig { Duration::from_millis(self.l1_tx_sending_sleep_ms) } + pub fn price_fetching_sleep_duration(&self) -> Duration { + Duration::from_millis(self.price_fetching_sleep_ms) + } + pub fn default_l1_receipt_checking_max_attempts() -> u32 { DEFAULT_L1_RECEIPT_CHECKING_MAX_ATTEMPTS } @@ -151,6 +171,14 @@ impl BaseTokenAdjusterConfig { DEFAULT_L1_TX_SENDING_SLEEP_MS } + pub fn default_price_fetching_sleep_ms() -> u64 { + DEFAULT_PRICE_FETCHING_SLEEP_MS + } + + pub fn default_price_fetching_max_attempts() -> u32 { + DEFAULT_PRICE_FETCHING_MAX_ATTEMPTS + } + pub fn default_max_tx_gas() -> u64 { DEFAULT_MAX_TX_GAS } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 1f4bfbc0265b..e028c3d3aec0 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -1045,6 +1045,8 @@ impl Distribution for Enc l1_receipt_checking_sleep_ms: self.sample(rng), l1_tx_sending_max_attempts: self.sample(rng), l1_tx_sending_sleep_ms: self.sample(rng), + price_fetching_max_attempts: self.sample(rng), + price_fetching_sleep_ms: self.sample(rng), halt_on_error: self.sample(rng), } } diff --git a/core/lib/env_config/src/base_token_adjuster.rs b/core/lib/env_config/src/base_token_adjuster.rs index 67cdef9425cd..f94e9c8f92a2 100644 --- a/core/lib/env_config/src/base_token_adjuster.rs +++ b/core/lib/env_config/src/base_token_adjuster.rs @@ -26,6 +26,8 @@ mod tests { l1_receipt_checking_sleep_ms: 20_000, l1_tx_sending_max_attempts: 10, l1_tx_sending_sleep_ms: 30_000, + price_fetching_max_attempts: 20, + price_fetching_sleep_ms: 10_000, halt_on_error: true, } } @@ -41,6 +43,8 @@ mod tests { l1_receipt_checking_sleep_ms: 30_000, l1_tx_sending_max_attempts: 3, l1_tx_sending_sleep_ms: 30_000, + price_fetching_max_attempts: 3, + price_fetching_sleep_ms: 5_000, halt_on_error: false, } } @@ -58,6 +62,8 @@ mod tests { BASE_TOKEN_ADJUSTER_L1_RECEIPT_CHECKING_SLEEP_MS=20000 BASE_TOKEN_ADJUSTER_L1_TX_SENDING_MAX_ATTEMPTS=10 BASE_TOKEN_ADJUSTER_L1_TX_SENDING_SLEEP_MS=30000 + BASE_TOKEN_ADJUSTER_PRICE_FETCHING_MAX_ATTEMPTS=20 + BASE_TOKEN_ADJUSTER_PRICE_FETCHING_SLEEP_MS=10000 BASE_TOKEN_ADJUSTER_HALT_ON_ERROR=true "#; lock.set_env(config); @@ -79,6 +85,8 @@ mod tests { "BASE_TOKEN_ADJUSTER_L1_RECEIPT_CHECKING_SLEEP_MS", "BASE_TOKEN_ADJUSTER_L1_TX_SENDING_MAX_ATTEMPTS", "BASE_TOKEN_ADJUSTER_L1_TX_SENDING_SLEEP_MS", + "BASE_TOKEN_ADJUSTER_PRICE_FETCHING_MAX_ATTEMPTS", + "BASE_TOKEN_ADJUSTER_PRICE_FETCHING_SLEEP_MS", "BASE_TOKEN_ADJUSTER_HALT_ON_ERROR", ]); diff --git a/core/lib/protobuf_config/src/base_token_adjuster.rs b/core/lib/protobuf_config/src/base_token_adjuster.rs index d68db5fd9796..951feac16533 100644 --- a/core/lib/protobuf_config/src/base_token_adjuster.rs +++ b/core/lib/protobuf_config/src/base_token_adjuster.rs @@ -30,6 +30,12 @@ impl ProtoRepr for proto::BaseTokenAdjuster { l1_receipt_checking_max_attempts: self .l1_receipt_checking_max_attempts .unwrap_or(Self::Type::default_l1_receipt_checking_max_attempts()), + price_fetching_sleep_ms: self + .price_fetching_sleep_ms + .unwrap_or(Self::Type::default_price_fetching_sleep_ms()), + price_fetching_max_attempts: self + .price_fetching_max_attempts + .unwrap_or(Self::Type::default_price_fetching_max_attempts()), l1_tx_sending_max_attempts: self .l1_tx_sending_max_attempts .unwrap_or(Self::Type::default_l1_tx_sending_max_attempts()), @@ -47,6 +53,8 @@ impl ProtoRepr for proto::BaseTokenAdjuster { l1_receipt_checking_max_attempts: Some(this.l1_receipt_checking_max_attempts), l1_tx_sending_max_attempts: Some(this.l1_tx_sending_max_attempts), l1_tx_sending_sleep_ms: Some(this.l1_tx_sending_sleep_ms), + price_fetching_max_attempts: Some(this.price_fetching_max_attempts), + price_fetching_sleep_ms: Some(this.price_fetching_sleep_ms), max_tx_gas: Some(this.max_tx_gas), default_priority_fee_per_gas: Some(this.default_priority_fee_per_gas), max_acceptable_priority_fee_in_gwei: Some(this.max_acceptable_priority_fee_in_gwei), diff --git a/core/lib/protobuf_config/src/proto/config/base_token_adjuster.proto b/core/lib/protobuf_config/src/proto/config/base_token_adjuster.proto index 1132858bfa6f..396bd400c04b 100644 --- a/core/lib/protobuf_config/src/proto/config/base_token_adjuster.proto +++ b/core/lib/protobuf_config/src/proto/config/base_token_adjuster.proto @@ -13,4 +13,6 @@ message BaseTokenAdjuster { optional uint32 l1_tx_sending_max_attempts = 8; optional uint64 l1_tx_sending_sleep_ms = 9; optional bool halt_on_error = 10; + optional uint32 price_fetching_max_attempts = 11; + optional uint64 price_fetching_sleep_ms = 12; } diff --git a/core/node/base_token_adjuster/Cargo.toml b/core/node/base_token_adjuster/Cargo.toml index c21576e37327..3a0beb2ea137 100644 --- a/core/node/base_token_adjuster/Cargo.toml +++ b/core/node/base_token_adjuster/Cargo.toml @@ -19,7 +19,8 @@ zksync_external_price_api.workspace = true zksync_contracts.workspace = true zksync_eth_client.workspace = true zksync_node_fee_model.workspace = true - +zksync_utils.workspace = true +vise.workspace = true tokio = { workspace = true, features = ["time"] } anyhow.workspace = true diff --git a/core/node/base_token_adjuster/src/base_token_ratio_persister.rs b/core/node/base_token_adjuster/src/base_token_ratio_persister.rs index 41796cf2197a..12cd6233efbb 100644 --- a/core/node/base_token_adjuster/src/base_token_ratio_persister.rs +++ b/core/node/base_token_adjuster/src/base_token_ratio_persister.rs @@ -1,4 +1,4 @@ -use std::{cmp::max, fmt::Debug, sync::Arc, time::Duration}; +use std::{cmp::max, fmt::Debug, sync::Arc, time::Instant}; use anyhow::Context as _; use tokio::{sync::watch, time::sleep}; @@ -14,6 +14,8 @@ use zksync_types::{ Address, U256, }; +use crate::metrics::{OperationResult, OperationResultLabels, METRICS}; + #[derive(Debug, Clone)] pub struct BaseTokenRatioPersisterL1Params { pub eth_client: Box, @@ -82,47 +84,7 @@ impl BaseTokenRatioPersister { // TODO(PE-148): Consider shifting retry upon adding external API redundancy. let new_ratio = self.retry_fetch_ratio().await?; self.persist_ratio(new_ratio).await?; - - let Some(l1_params) = &self.l1_params else { - return Ok(()); - }; - - let max_attempts = self.config.l1_tx_sending_max_attempts; - let sleep_duration = self.config.l1_tx_sending_sleep_duration(); - let mut result: anyhow::Result<()> = Ok(()); - let mut prev_base_fee_per_gas: Option = None; - let mut prev_priority_fee_per_gas: Option = None; - - for attempt in 0..max_attempts { - let (base_fee_per_gas, priority_fee_per_gas) = - self.get_eth_fees(l1_params, prev_base_fee_per_gas, prev_priority_fee_per_gas); - - result = self - .send_ratio_to_l1(l1_params, new_ratio, base_fee_per_gas, priority_fee_per_gas) - .await; - if let Some(err) = result.as_ref().err() { - tracing::info!( - "Failed to update base token multiplier on L1, attempt {}, base_fee_per_gas {}, priority_fee_per_gas {}: {}", - attempt + 1, - base_fee_per_gas, - priority_fee_per_gas, - err - ); - tokio::time::sleep(sleep_duration).await; - prev_base_fee_per_gas = Some(base_fee_per_gas); - prev_priority_fee_per_gas = Some(priority_fee_per_gas); - } else { - tracing::info!( - "Updated base token multiplier on L1: numerator {}, denominator {}, base_fee_per_gas {}, priority_fee_per_gas {}", - new_ratio.numerator.get(), - new_ratio.denominator.get(), - base_fee_per_gas, - priority_fee_per_gas - ); - return result; - } - } - result + self.retry_update_ratio_on_l1(new_ratio).await } fn get_eth_fees( @@ -157,36 +119,110 @@ impl BaseTokenRatioPersister { (base_fee_per_gas, priority_fee_per_gas) } + async fn retry_update_ratio_on_l1(&self, new_ratio: BaseTokenAPIRatio) -> anyhow::Result<()> { + let Some(l1_params) = &self.l1_params else { + return Ok(()); + }; + + let max_attempts = self.config.l1_tx_sending_max_attempts; + let sleep_duration = self.config.l1_tx_sending_sleep_duration(); + let mut prev_base_fee_per_gas: Option = None; + let mut prev_priority_fee_per_gas: Option = None; + let mut last_error = None; + for attempt in 0..max_attempts { + let (base_fee_per_gas, priority_fee_per_gas) = + self.get_eth_fees(l1_params, prev_base_fee_per_gas, prev_priority_fee_per_gas); + + let start_time = Instant::now(); + let result = self + .update_ratio_on_l1(l1_params, new_ratio, base_fee_per_gas, priority_fee_per_gas) + .await; + + match result { + Ok(x) => { + tracing::info!( + "Updated base token multiplier on L1: numerator {}, denominator {}, base_fee_per_gas {}, priority_fee_per_gas {}", + new_ratio.numerator.get(), + new_ratio.denominator.get(), + base_fee_per_gas, + priority_fee_per_gas + ); + METRICS + .l1_gas_used + .set(x.unwrap_or(U256::zero()).low_u128() as u64); + METRICS.l1_update_latency[&OperationResultLabels { + result: OperationResult::Success, + }] + .observe(start_time.elapsed()); + + return Ok(()); + } + Err(err) => { + tracing::info!( + "Failed to update base token multiplier on L1, attempt {}, base_fee_per_gas {}, priority_fee_per_gas {}: {}", + attempt, + base_fee_per_gas, + priority_fee_per_gas, + err + ); + METRICS.l1_update_latency[&OperationResultLabels { + result: OperationResult::Failure, + }] + .observe(start_time.elapsed()); + + tokio::time::sleep(sleep_duration).await; + prev_base_fee_per_gas = Some(base_fee_per_gas); + prev_priority_fee_per_gas = Some(priority_fee_per_gas); + last_error = Some(err) + } + } + } + + let error_message = "Failed to update base token multiplier on L1"; + Err(last_error + .map(|x| x.context(error_message)) + .unwrap_or_else(|| anyhow::anyhow!(error_message))) + } + async fn retry_fetch_ratio(&self) -> anyhow::Result { - let sleep_duration = Duration::from_secs(1); - let max_retries = 5; - let mut attempts = 0; + let sleep_duration = self.config.price_fetching_sleep_duration(); + let max_retries = self.config.price_fetching_max_attempts; + let mut last_error = None; - loop { + for attempt in 0..max_retries { + let start_time = Instant::now(); match self .price_api_client .fetch_ratio(self.base_token_address) .await { Ok(ratio) => { + METRICS.external_price_api_latency[&OperationResultLabels { + result: OperationResult::Success, + }] + .observe(start_time.elapsed()); return Ok(ratio); } - Err(err) if attempts < max_retries => { - attempts += 1; + Err(err) => { tracing::warn!( - "Attempt {}/{} to fetch ratio from coingecko failed with err: {}. Retrying...", - attempts, + "Attempt {}/{} to fetch ratio from external price api failed with err: {}. Retrying...", + attempt, max_retries, err ); + last_error = Some(err); + METRICS.external_price_api_latency[&OperationResultLabels { + result: OperationResult::Failure, + }] + .observe(start_time.elapsed()); sleep(sleep_duration).await; } - Err(err) => { - return Err(err) - .context("Failed to fetch base token ratio after multiple attempts"); - } } } + let error_message = "Failed to fetch base token ratio after multiple attempts"; + Err(last_error + .map(|x| x.context(error_message)) + .unwrap_or_else(|| anyhow::anyhow!(error_message))) } async fn persist_ratio(&self, api_ratio: BaseTokenAPIRatio) -> anyhow::Result { @@ -209,13 +245,13 @@ impl BaseTokenRatioPersister { Ok(id) } - async fn send_ratio_to_l1( + async fn update_ratio_on_l1( &self, l1_params: &BaseTokenRatioPersisterL1Params, api_ratio: BaseTokenAPIRatio, base_fee_per_gas: u64, priority_fee_per_gas: u64, - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { let fn_set_token_multiplier = l1_params .chain_admin_contract .function("setTokenMultiplier") @@ -276,7 +312,7 @@ impl BaseTokenRatioPersister { .context("failed getting receipt for `setTokenMultiplier` transaction")?; if let Some(receipt) = maybe_receipt { if receipt.status == Some(1.into()) { - return Ok(()); + return Ok(receipt.gas_used); } return Err(anyhow::Error::msg(format!( "`setTokenMultiplier` transaction {:?} failed with status {:?}", diff --git a/core/node/base_token_adjuster/src/lib.rs b/core/node/base_token_adjuster/src/lib.rs index 332fb5f47aab..d786b440f622 100644 --- a/core/node/base_token_adjuster/src/lib.rs +++ b/core/node/base_token_adjuster/src/lib.rs @@ -5,3 +5,4 @@ pub use self::{ mod base_token_ratio_persister; mod base_token_ratio_provider; +mod metrics; diff --git a/core/node/base_token_adjuster/src/metrics.rs b/core/node/base_token_adjuster/src/metrics.rs new file mode 100644 index 000000000000..e6f6571adc1d --- /dev/null +++ b/core/node/base_token_adjuster/src/metrics.rs @@ -0,0 +1,28 @@ +use std::time::Duration; + +use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)] +#[metrics(label = "operation_result", rename_all = "snake_case")] +pub(super) enum OperationResult { + Success, + Failure, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet)] +pub(crate) struct OperationResultLabels { + pub result: OperationResult, +} + +#[derive(Debug, Metrics)] +#[metrics(prefix = "snapshots_creator")] +pub(crate) struct BaseTokenAdjusterMetrics { + pub l1_gas_used: Gauge, + #[metrics(buckets = Buckets::LATENCIES)] + pub external_price_api_latency: Family>, + #[metrics(buckets = Buckets::LATENCIES)] + pub l1_update_latency: Family>, +} + +#[vise::register] +pub(crate) static METRICS: vise::Global = vise::Global::new();