diff --git a/.github/release-please/manifest.json b/.github/release-please/manifest.json index a56866a8bd7..a0d1d73bdda 100644 --- a/.github/release-please/manifest.json +++ b/.github/release-please/manifest.json @@ -1,5 +1,5 @@ { - "core": "24.29.0", + "core": "25.0.0", "prover": "16.5.0", "zkstack_cli": "0.1.2" } diff --git a/Cargo.lock b/Cargo.lock index 05c26a74834..7e4cad34cf8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10220,7 +10220,7 @@ dependencies = [ [[package]] name = "zksync_external_node" -version = "24.29.0" +version = "25.0.0" dependencies = [ "anyhow", "assert_matches", diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 59b49af1554..56239303cd4 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,5 +1,40 @@ # Changelog +## [25.0.0](https://github.com/matter-labs/zksync-era/compare/core-v24.29.0...core-v25.0.0) (2024-10-23) + + +### ⚠ BREAKING CHANGES + +* **contracts:** integrate protocol defense changes ([#2737](https://github.com/matter-labs/zksync-era/issues/2737)) + +### Features + +* Add CoinMarketCap external API ([#2971](https://github.com/matter-labs/zksync-era/issues/2971)) ([c1cb30e](https://github.com/matter-labs/zksync-era/commit/c1cb30e59ca1d0b5fea5fe0980082aea0eb04aa2)) +* **api:** Implement eth_maxPriorityFeePerGas ([#3135](https://github.com/matter-labs/zksync-era/issues/3135)) ([35e84cc](https://github.com/matter-labs/zksync-era/commit/35e84cc03a7fdd315932fb3020fe41c95a6e4bca)) +* **api:** Make acceptable values cache lag configurable ([#3028](https://github.com/matter-labs/zksync-era/issues/3028)) ([6747529](https://github.com/matter-labs/zksync-era/commit/67475292ff770d2edd6884be27f976a4144778ae)) +* **contracts:** integrate protocol defense changes ([#2737](https://github.com/matter-labs/zksync-era/issues/2737)) ([c60a348](https://github.com/matter-labs/zksync-era/commit/c60a3482ee09b3e371163e62f49e83bc6d6f4548)) +* **external-node:** save protocol version before opening a batch ([#3136](https://github.com/matter-labs/zksync-era/issues/3136)) ([d6de4f4](https://github.com/matter-labs/zksync-era/commit/d6de4f40ddce339c760c95e2bf4b8aceb571af7f)) +* Prover e2e test ([#2975](https://github.com/matter-labs/zksync-era/issues/2975)) ([0edd796](https://github.com/matter-labs/zksync-era/commit/0edd7962429b3530ae751bd7cc947c97193dd0ca)) +* **prover:** Add min_provers and dry_run features. Improve metrics and test. ([#3129](https://github.com/matter-labs/zksync-era/issues/3129)) ([7c28964](https://github.com/matter-labs/zksync-era/commit/7c289649b7b3c418c7193a35b51c264cf4970f3c)) +* **tee_verifier:** speedup SQL query for new jobs ([#3133](https://github.com/matter-labs/zksync-era/issues/3133)) ([30ceee8](https://github.com/matter-labs/zksync-era/commit/30ceee8a48046e349ff0234ebb24d468a0e0876c)) +* vm2 tracers can access storage ([#3114](https://github.com/matter-labs/zksync-era/issues/3114)) ([e466b52](https://github.com/matter-labs/zksync-era/commit/e466b52948e3c4ed1cb5af4fd999a52028e4d216)) +* **vm:** Return compressed bytecodes from `push_transaction()` ([#3126](https://github.com/matter-labs/zksync-era/issues/3126)) ([37f209f](https://github.com/matter-labs/zksync-era/commit/37f209fec8e7cb65c0e60003d46b9ea69c43caf1)) + + +### Bug Fixes + +* **call_tracer:** Flat call tracer fixes for blocks ([#3095](https://github.com/matter-labs/zksync-era/issues/3095)) ([30ddb29](https://github.com/matter-labs/zksync-era/commit/30ddb292977340beab37a81f75c35480cbdd59d3)) +* **consensus:** preventing config update reverts ([#3148](https://github.com/matter-labs/zksync-era/issues/3148)) ([caee55f](https://github.com/matter-labs/zksync-era/commit/caee55fef4eed0ec58cceaeba277bbdedf5c6f51)) +* **en:** Return `SyncState` health check ([#3142](https://github.com/matter-labs/zksync-era/issues/3142)) ([abeee81](https://github.com/matter-labs/zksync-era/commit/abeee8190d3c3a5e577d71024bdfb30ff516ad03)) +* **external-node:** delete empty unsealed batch on EN initialization ([#3125](https://github.com/matter-labs/zksync-era/issues/3125)) ([5d5214b](https://github.com/matter-labs/zksync-era/commit/5d5214ba983823b306495d34fdd1d46abacce07a)) +* Fix counter metric type to be Counter. ([#3153](https://github.com/matter-labs/zksync-era/issues/3153)) ([08a3fe7](https://github.com/matter-labs/zksync-era/commit/08a3fe7ffd0410c51334193068649905337d5e84)) +* **mempool:** minor mempool improvements ([#3113](https://github.com/matter-labs/zksync-era/issues/3113)) ([cd16083](https://github.com/matter-labs/zksync-era/commit/cd160830a0b7ebe5af4ecbd944da1cd51af3528a)) +* **prover:** Run for zero queue to allow scaling down to 0 ([#3115](https://github.com/matter-labs/zksync-era/issues/3115)) ([bbe1919](https://github.com/matter-labs/zksync-era/commit/bbe191937fa5c5711a7164fd4f0c2ae65cda0833)) +* restore instruction count functionality ([#3081](https://github.com/matter-labs/zksync-era/issues/3081)) ([6159f75](https://github.com/matter-labs/zksync-era/commit/6159f7531a0340a69c4926c4e0325811ed7cabb8)) +* **state-keeper:** save call trace for upgrade txs ([#3132](https://github.com/matter-labs/zksync-era/issues/3132)) ([e1c363f](https://github.com/matter-labs/zksync-era/commit/e1c363f8f5e03c8d62bba1523f17b87d6a0e25ad)) +* **tee_prover:** add zstd compression ([#3144](https://github.com/matter-labs/zksync-era/issues/3144)) ([7241ae1](https://github.com/matter-labs/zksync-era/commit/7241ae139b2b6bf9a9966eaa2f22203583a3786f)) +* **tee_verifier:** correctly initialize storage for re-execution ([#3017](https://github.com/matter-labs/zksync-era/issues/3017)) ([9d88373](https://github.com/matter-labs/zksync-era/commit/9d88373f1b745c489e98e5ef542644a70e815498)) + ## [24.29.0](https://github.com/matter-labs/zksync-era/compare/core-v24.28.0...core-v24.29.0) (2024-10-14) diff --git a/core/bin/external_node/Cargo.toml b/core/bin/external_node/Cargo.toml index 25f2400c79b..4e3dc548cf8 100644 --- a/core/bin/external_node/Cargo.toml +++ b/core/bin/external_node/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "zksync_external_node" description = "Non-validator ZKsync node" -version = "24.29.0" # x-release-please-version +version = "25.0.0" # x-release-please-version edition.workspace = true authors.workspace = true homepage.workspace = true diff --git a/core/bin/zksync_tee_prover/src/metrics.rs b/core/bin/zksync_tee_prover/src/metrics.rs index 9f535967f79..769a8bbc7e0 100644 --- a/core/bin/zksync_tee_prover/src/metrics.rs +++ b/core/bin/zksync_tee_prover/src/metrics.rs @@ -2,7 +2,7 @@ use std::time::Duration; -use vise::{Buckets, Gauge, Histogram, Metrics, Unit}; +use vise::{Buckets, Counter, Gauge, Histogram, Metrics, Unit}; #[derive(Debug, Metrics)] #[metrics(prefix = "tee_prover")] @@ -13,7 +13,7 @@ pub(crate) struct TeeProverMetrics { pub proof_generation_time: Histogram, #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] pub proof_submitting_time: Histogram, - pub network_errors_counter: Gauge, + pub network_errors_counter: Counter, pub last_batch_number_processed: Gauge, } diff --git a/core/bin/zksync_tee_prover/src/tee_prover.rs b/core/bin/zksync_tee_prover/src/tee_prover.rs index bb7176644e6..5d22d1e7c63 100644 --- a/core/bin/zksync_tee_prover/src/tee_prover.rs +++ b/core/bin/zksync_tee_prover/src/tee_prover.rs @@ -155,7 +155,7 @@ impl Task for TeeProver { } } Err(err) => { - METRICS.network_errors_counter.inc_by(1); + METRICS.network_errors_counter.inc(); if !err.is_retriable() || retries > config.max_retries { return Err(err.into()); } diff --git a/core/lib/dal/src/consensus_dal/mod.rs b/core/lib/dal/src/consensus_dal/mod.rs index 9515e93f2b3..4516434868c 100644 --- a/core/lib/dal/src/consensus_dal/mod.rs +++ b/core/lib/dal/src/consensus_dal/mod.rs @@ -16,10 +16,48 @@ use crate::{Core, CoreDal}; #[cfg(test)] mod tests; +/// Hash of the batch. pub fn batch_hash(info: &StoredBatchInfo) -> attester::BatchHash { attester::BatchHash(Keccak256::from_bytes(info.hash().0)) } +/// Verifies that the transition from `old` to `new` is admissible. +pub fn verify_config_transition(old: &GlobalConfig, new: &GlobalConfig) -> anyhow::Result<()> { + anyhow::ensure!( + old.genesis.chain_id == new.genesis.chain_id, + "changing chain_id is not allowed: old = {:?}, new = {:?}", + old.genesis.chain_id, + new.genesis.chain_id, + ); + // Note that it may happen that the fork number didn't change, + // in case the binary was updated to support more fields in genesis struct. + // In such a case, the old binary was not able to connect to the consensus network, + // because of the genesis hash mismatch. + // TODO: Perhaps it would be better to deny unknown fields in the genesis instead. + // It would require embedding the genesis either as a json string or protobuf bytes within + // the global config, so that the global config can be parsed with + // `deny_unknown_fields:false` while genesis would be parsed with + // `deny_unknown_fields:true`. + anyhow::ensure!( + old.genesis.fork_number <= new.genesis.fork_number, + "transition to a past fork is not allowed: old = {:?}, new = {:?}", + old.genesis.fork_number, + new.genesis.fork_number, + ); + new.genesis.verify().context("genesis.verify()")?; + // This is a temporary hack until the `consensus_genesis()` RPC is disabled. + if new + == (&GlobalConfig { + genesis: old.genesis.clone(), + registry_address: None, + seed_peers: [].into(), + }) + { + anyhow::bail!("new config is equal to truncated old config, which means that it was sourced from the wrong endpoint"); + } + Ok(()) +} + /// Storage access methods for `zksync_core::consensus` module. #[derive(Debug)] pub struct ConsensusDal<'a, 'c> { @@ -94,6 +132,8 @@ impl ConsensusDal<'_, '_> { if got == want { return Ok(()); } + verify_config_transition(got, want)?; + // If genesis didn't change, just update the config. if got.genesis == want.genesis { let s = zksync_protobuf::serde::Serialize; @@ -112,30 +152,6 @@ impl ConsensusDal<'_, '_> { txn.commit().await?; return Ok(()); } - - // Verify the genesis change. - anyhow::ensure!( - got.genesis.chain_id == want.genesis.chain_id, - "changing chain_id is not allowed: old = {:?}, new = {:?}", - got.genesis.chain_id, - want.genesis.chain_id, - ); - // Note that it may happen that the fork number didn't change, - // in case the binary was updated to support more fields in genesis struct. - // In such a case, the old binary was not able to connect to the consensus network, - // because of the genesis hash mismatch. - // TODO: Perhaps it would be better to deny unknown fields in the genesis instead. - // It would require embedding the genesis either as a json string or protobuf bytes within - // the global config, so that the global config can be parsed with - // `deny_unknown_fields:false` while genesis would be parsed with - // `deny_unknown_fields:true`. - anyhow::ensure!( - got.genesis.fork_number <= want.genesis.fork_number, - "transition to a past fork is not allowed: old = {:?}, new = {:?}", - got.genesis.fork_number, - want.genesis.fork_number, - ); - want.genesis.verify().context("genesis.verify()")?; } // Reset the consensus state. diff --git a/core/lib/web3_decl/src/namespaces/eth.rs b/core/lib/web3_decl/src/namespaces/eth.rs index 399773b845d..40cb6300cff 100644 --- a/core/lib/web3_decl/src/namespaces/eth.rs +++ b/core/lib/web3_decl/src/namespaces/eth.rs @@ -185,6 +185,9 @@ pub trait EthNamespace { newest_block: BlockNumber, reward_percentiles: Vec, ) -> RpcResult; + + #[method(name = "maxPriorityFeePerGas")] + async fn max_priority_fee_per_gas(&self) -> RpcResult; } #[cfg(feature = "server")] diff --git a/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/eth.rs b/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/eth.rs index cc2209a35d3..34275601375 100644 --- a/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/eth.rs +++ b/core/node/api_server/src/web3/backend_jsonrpsee/namespaces/eth.rs @@ -268,4 +268,8 @@ impl EthNamespaceServer for EthNamespace { .await .map_err(|err| self.current_method().map_err(err)) } + + async fn max_priority_fee_per_gas(&self) -> RpcResult { + Ok(self.max_priority_fee_per_gas_impl()) + } } diff --git a/core/node/api_server/src/web3/namespaces/eth.rs b/core/node/api_server/src/web3/namespaces/eth.rs index 5206cd3bc2b..ee37cb989f1 100644 --- a/core/node/api_server/src/web3/namespaces/eth.rs +++ b/core/node/api_server/src/web3/namespaces/eth.rs @@ -863,6 +863,11 @@ impl EthNamespace { } }) } + + pub fn max_priority_fee_per_gas_impl(&self) -> U256 { + // ZKsync does not require priority fee. + 0u64.into() + } } // Bogus methods. diff --git a/core/node/consensus/src/en.rs b/core/node/consensus/src/en.rs index 518a7ebb29a..8158cc5aeb2 100644 --- a/core/node/consensus/src/en.rs +++ b/core/node/consensus/src/en.rs @@ -100,7 +100,12 @@ impl EN { let old = old; loop { if let Ok(new) = self.fetch_global_config(ctx).await { - if new != old { + // We verify the transition here to work around the situation + // where `consenus_global_config()` RPC fails randomly and fallback + // to `consensus_genesis()` RPC activates. + if new != old + && consensus_dal::verify_config_transition(&old, &new).is_ok() + { return Err(anyhow::format_err!( "global config changed: old {old:?}, new {new:?}" ) diff --git a/core/tests/ts-integration/tests/api/web3.test.ts b/core/tests/ts-integration/tests/api/web3.test.ts index 6f1b6c3aa6b..ceed9654df9 100644 --- a/core/tests/ts-integration/tests/api/web3.test.ts +++ b/core/tests/ts-integration/tests/api/web3.test.ts @@ -189,7 +189,8 @@ describe('web3 API compatibility tests', () => { ['eth_getCompilers', [], []], ['eth_hashrate', [], '0x0'], ['eth_mining', [], false], - ['eth_getUncleCountByBlockNumber', ['0x0'], '0x0'] + ['eth_getUncleCountByBlockNumber', ['0x0'], '0x0'], + ['eth_maxPriorityFeePerGas', [], '0x0'] ])('Should test bogus web3 methods (%s)', async (method: string, input: string[], output: string) => { await expect(alice.provider.send(method, input)).resolves.toEqual(output); }); @@ -271,7 +272,8 @@ describe('web3 API compatibility tests', () => { const eip1559ApiReceipt = await alice.provider.getTransaction(eip1559Tx.hash); expect(eip1559ApiReceipt.maxFeePerGas).toEqual(eip1559Tx.maxFeePerGas!); - expect(eip1559ApiReceipt.maxPriorityFeePerGas).toEqual(eip1559Tx.maxPriorityFeePerGas!); + // `ethers` will use value provided by `eth_maxPriorityFeePerGas`, and we return 0 there. + expect(eip1559ApiReceipt.maxPriorityFeePerGas).toEqual(0n); }); test('Should test getFilterChanges for pending transactions', async () => { diff --git a/etc/env/file_based/overrides/mainnet.yaml b/etc/env/file_based/overrides/mainnet.yaml index 0600abf694c..7565aac869a 100644 --- a/etc/env/file_based/overrides/mainnet.yaml +++ b/etc/env/file_based/overrides/mainnet.yaml @@ -1,5 +1,6 @@ state_keeper: - block_commit_deadline_ms: 3600000 + # Default batch seal time deadline: 8 hours + block_commit_deadline_ms: 28000000 minimal_l2_gas_price: 45250000 eth: sender: diff --git a/etc/env/file_based/overrides/testnet.yaml b/etc/env/file_based/overrides/testnet.yaml index e4da1ac96e2..d36cf9fc7bc 100644 --- a/etc/env/file_based/overrides/testnet.yaml +++ b/etc/env/file_based/overrides/testnet.yaml @@ -1,5 +1,6 @@ state_keeper: - block_commit_deadline_ms: 3600000 + # Default batch seal time deadline: 8 hours + block_commit_deadline_ms: 28000000 minimal_l2_gas_price: 25000000 eth: sender: diff --git a/prover/crates/bin/prover_autoscaler/src/agent.rs b/prover/crates/bin/prover_autoscaler/src/agent.rs index 3269a43815c..f810bc41672 100644 --- a/prover/crates/bin/prover_autoscaler/src/agent.rs +++ b/prover/crates/bin/prover_autoscaler/src/agent.rs @@ -84,14 +84,14 @@ async fn get_cluster(State(app): State) -> Result, AppError> Ok(Json(cluster)) } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct ScaleDeploymentRequest { pub namespace: String, pub name: String, pub size: i32, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct ScaleRequest { pub deployments: Vec, } diff --git a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs index c25b624b5d4..b800b86f3c2 100644 --- a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs +++ b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs @@ -45,6 +45,8 @@ pub struct Cluster { #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct Clusters { pub clusters: HashMap, + /// Map from cluster to index in agent URLs Vec. + pub agent_ids: HashMap, } #[derive(Default, Debug, EnumString, Display, Hash, PartialEq, Eq, Clone, Copy)] diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index f10902f5dd2..884174562a1 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -8,6 +8,7 @@ use zksync_config::configs::prover_autoscaler::{Gpu, ProverAutoscalerScalerConfi use super::{queuer, watcher}; use crate::{ + agent::{ScaleDeploymentRequest, ScaleRequest}, cluster_types::{Cluster, Clusters, Pod, PodStatus}, metrics::AUTOSCALER_METRICS, task_wiring::Task, @@ -48,6 +49,16 @@ static PROVER_DEPLOYMENT_RE: Lazy = static PROVER_POD_RE: Lazy = Lazy::new(|| Regex::new(r"^circuit-prover-gpu(-(?[ltvpa]\d+))?").unwrap()); +/// gpu_to_prover converts Gpu type to corresponding deployment name. +fn gpu_to_prover(gpu: Gpu) -> String { + let s = "circuit-prover-gpu"; + match gpu { + Gpu::Unknown => "".into(), + Gpu::L4 => s.into(), + _ => format!("{}-{}", s, gpu.to_string().to_lowercase()), + } +} + pub struct Scaler { /// namespace to Protocol Version configuration. namespaces: HashMap, @@ -299,6 +310,47 @@ impl Scaler { } } +fn diff( + namespace: &str, + provers: HashMap, + clusters: &Clusters, + requests: &mut HashMap, +) { + provers + .into_iter() + .for_each(|(GPUPoolKey { cluster, gpu }, n)| { + let prover = gpu_to_prover(gpu); + clusters + .clusters + .get(&cluster) + .and_then(|c| c.namespaces.get(namespace)) + .and_then(|ns| ns.deployments.get(&prover)) + .map_or_else( + || { + tracing::error!( + "Wasn't able to find deployment {} in cluster {}, namespace {}", + prover, + cluster, + namespace + ) + }, + |d| { + if d.desired != n as i32 { + requests + .entry(cluster.clone()) + .or_default() + .deployments + .push(ScaleDeploymentRequest { + namespace: namespace.into(), + name: prover.clone(), + size: n as i32, + }); + } + }, + ); + }) +} + /// is_namespace_running returns true if there are some pods running in it. fn is_namespace_running(namespace: &str, clusters: &Clusters) -> bool { clusters @@ -309,7 +361,7 @@ fn is_namespace_running(namespace: &str, clusters: &Clusters) -> bool { .flat_map(|v| v.deployments.values()) .map( |d| d.running + d.desired, // If there is something running or expected to run, we - // should consider the namespace. + // should re-evaluate the namespace. ) .sum::() > 0 @@ -320,24 +372,32 @@ impl Task for Scaler { async fn invoke(&self) -> anyhow::Result<()> { let queue = self.queuer.get_queue().await.unwrap(); - let guard = self.watcher.data.lock().await; - if let Err(err) = watcher::check_is_ready(&guard.is_ready) { - AUTOSCALER_METRICS.clusters_not_ready.inc(); - tracing::warn!("Skipping Scaler run: {}", err); - return Ok(()); - } + let mut scale_requests: HashMap = HashMap::new(); + { + let guard = self.watcher.data.lock().await; // Keeping the lock during all calls of run() for + // consitency. + if let Err(err) = watcher::check_is_ready(&guard.is_ready) { + AUTOSCALER_METRICS.clusters_not_ready.inc(); + tracing::warn!("Skipping Scaler run: {}", err); + return Ok(()); + } - for (ns, ppv) in &self.namespaces { - let q = queue.queue.get(ppv).cloned().unwrap_or(0); - tracing::debug!("Running eval for namespace {ns} and PPV {ppv} found queue {q}"); - if q > 0 || is_namespace_running(ns, &guard.clusters) { - let provers = self.run(ns, q, &guard.clusters); - for (k, num) in &provers { - AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)] - .set(*num as u64); + for (ns, ppv) in &self.namespaces { + let q = queue.queue.get(ppv).cloned().unwrap_or(0); + tracing::debug!("Running eval for namespace {ns} and PPV {ppv} found queue {q}"); + if q > 0 || is_namespace_running(ns, &guard.clusters) { + let provers = self.run(ns, q, &guard.clusters); + for (k, num) in &provers { + AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)] + .set(*num as u64); + } + diff(ns, provers, &guard.clusters, &mut scale_requests); } - // TODO: compare before and desired, send commands [cluster,namespace,deployment] -> provers } + } // Unlock self.watcher.data. + + if let Err(err) = self.watcher.send_scale(scale_requests).await { + tracing::error!("Failed scale request: {}", err); } Ok(()) @@ -401,6 +461,7 @@ mod tests { }, )] .into(), + ..Default::default() }, ), [( @@ -467,6 +528,7 @@ mod tests { ) ] .into(), + ..Default::default() }, ), [ @@ -552,6 +614,7 @@ mod tests { ) ] .into(), + ..Default::default() }, ), [ @@ -662,6 +725,7 @@ mod tests { ) ] .into(), + ..Default::default() }, ), [ diff --git a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs index 646b320e12d..6e02c0fe2fd 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs @@ -2,12 +2,16 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::{anyhow, Context, Ok, Result}; use futures::future; -use reqwest::Method; +use reqwest::{ + header::{HeaderMap, HeaderValue, CONTENT_TYPE}, + Method, +}; use tokio::sync::Mutex; use url::Url; use zksync_utils::http_with_retries::send_request_with_retries; use crate::{ + agent::{ScaleRequest, ScaleResponse}, cluster_types::{Cluster, Clusters}, metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}, task_wiring::Task, @@ -51,13 +55,96 @@ impl Watcher { }) .collect(), data: Arc::new(Mutex::new(WatchedData { - clusters: Clusters { - clusters: HashMap::new(), - }, + clusters: Clusters::default(), is_ready: vec![false; size], })), } } + + pub async fn send_scale(&self, requests: HashMap) -> anyhow::Result<()> { + let id_requests: HashMap; + { + // Convert cluster names into ids. Holding the data lock. + let guard = self.data.lock().await; + id_requests = requests + .into_iter() + .filter_map(|(cluster, scale_request)| { + guard.clusters.agent_ids.get(&cluster).map_or_else( + || { + tracing::error!("Failed to find id for cluster {}", cluster); + None + }, + |id| Some((*id, scale_request)), + ) + }) + .collect(); + } + + let handles: Vec<_> = id_requests + .into_iter() + .map(|(id, sr)| { + let url: String = self.cluster_agents[id] + .clone() + .join("/scale") + .unwrap() + .to_string(); + tracing::debug!("Sending scale request to {}, data: {:?}.", url, sr); + tokio::spawn(async move { + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + let response = send_request_with_retries( + &url, + MAX_RETRIES, + Method::POST, + Some(headers), + Some(serde_json::to_vec(&sr)?), + ) + .await; + let response = response.map_err(|err| { + AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc(); + anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}") + })?; + AUTOSCALER_METRICS.calls[&(url, response.status().as_u16())].inc(); + let response = response + .json::() + .await + .context("Failed to read response as json"); + Ok((id, response)) + }) + }) + .collect(); + + future::try_join_all( + future::join_all(handles) + .await + .into_iter() + .map(|h| async move { + let (id, res) = h??; + + let errors: Vec<_> = res + .expect("failed to do request to Agent") + .scale_result + .iter() + .filter_map(|e| { + if !e.is_empty() { + Some(format!("Agent {} failed to scale: {}", id, e)) + } else { + None + } + }) + .collect(); + + if !errors.is_empty() { + return Err(anyhow!(errors.join(";"))); + } + Ok(()) + }) + .collect::>(), + ) + .await?; + + Ok(()) + } } #[async_trait::async_trait] @@ -102,6 +189,7 @@ impl Task for Watcher { let (i, res) = h??; let c = res?; let mut guard = self.data.lock().await; + guard.clusters.agent_ids.insert(c.name.clone(), i); guard.clusters.clusters.insert(c.name.clone(), c); guard.is_ready[i] = true; Ok(()) diff --git a/zkstack_cli/crates/common/src/term/spinner.rs b/zkstack_cli/crates/common/src/term/spinner.rs index b97ba075ac4..3ec2631804a 100644 --- a/zkstack_cli/crates/common/src/term/spinner.rs +++ b/zkstack_cli/crates/common/src/term/spinner.rs @@ -1,34 +1,40 @@ -use std::time::Instant; +use std::{fmt::Display, io::IsTerminal, time::Instant}; use cliclack::{spinner, ProgressBar}; -use crate::config::global_config; +use crate::{config::global_config, logger}; /// Spinner is a helper struct to show a spinner while some operation is running. pub struct Spinner { msg: String, - pb: ProgressBar, + output: SpinnerOutput, time: Instant, } impl Spinner { /// Create a new spinner with a message. pub fn new(msg: &str) -> Self { - let pb = spinner(); - pb.start(msg); - if global_config().verbose { - pb.stop(msg); - } + let output = if std::io::stdout().is_terminal() { + let pb = spinner(); + pb.start(msg); + if global_config().verbose { + pb.stop(msg); + } + SpinnerOutput::Progress(pb) + } else { + logger::info(msg); + SpinnerOutput::Plain() + }; Spinner { msg: msg.to_owned(), - pb, + output, time: Instant::now(), } } /// Manually finish the spinner. pub fn finish(self) { - self.pb.stop(format!( + self.output.stop(format!( "{} done in {} secs", self.msg, self.time.elapsed().as_secs_f64() @@ -37,7 +43,7 @@ impl Spinner { /// Interrupt the spinner with a failed message. pub fn fail(self) { - self.pb.error(format!( + self.output.error(format!( "{} failed in {} secs", self.msg, self.time.elapsed().as_secs_f64() @@ -46,6 +52,33 @@ impl Spinner { /// Freeze the spinner with current message. pub fn freeze(self) { - self.pb.stop(self.msg); + self.output.stop(self.msg); + } +} + +/// An abstraction that makes interactive progress bar optional in environments where virtual +/// terminal is not available. +/// +/// Uses plain `logger::{info,error}` as the fallback. +/// +/// See https://github.com/console-rs/indicatif/issues/530 for more details. +enum SpinnerOutput { + Progress(ProgressBar), + Plain(), +} + +impl SpinnerOutput { + fn error(&self, msg: impl Display) { + match self { + SpinnerOutput::Progress(pb) => pb.error(msg), + SpinnerOutput::Plain() => logger::error(msg), + } + } + + fn stop(self, msg: impl Display) { + match self { + SpinnerOutput::Progress(pb) => pb.stop(msg), + SpinnerOutput::Plain() => logger::info(msg), + } } }