diff --git a/core/lib/config/src/configs/prover_autoscaler.rs b/core/lib/config/src/configs/prover_autoscaler.rs index 41131fc1b8c..6f83f0d2d18 100644 --- a/core/lib/config/src/configs/prover_autoscaler.rs +++ b/core/lib/config/src/configs/prover_autoscaler.rs @@ -51,6 +51,8 @@ pub struct ProverAutoscalerScalerConfig { pub cluster_priorities: HashMap, /// Prover speed per GPU. Used to calculate desired number of provers for queue size. pub prover_speed: HashMap, + /// Maximum number of provers which can be run per cluster/GPU. + pub max_provers: HashMap>, /// Duration after which pending pod considered long pending. #[serde(default = "ProverAutoscalerScalerConfig::default_long_pending_duration")] pub long_pending_duration: Duration, diff --git a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto index e1d11b94d8f..8363b625119 100644 --- a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto +++ b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto @@ -34,6 +34,11 @@ message ProverSpeed { optional uint32 speed = 2; // required } +message MaxProver { + optional string cluster_and_gpu = 1; // required, format: / + optional uint32 max = 2; // required +} + message ProverAutoscalerScalerConfig { optional uint32 prometheus_port = 1; // required optional std.Duration scaler_run_interval = 2; // optional @@ -43,4 +48,5 @@ message ProverAutoscalerScalerConfig { repeated ClusterPriority cluster_priorities = 6; // optional repeated ProverSpeed prover_speed = 7; // optional optional uint32 long_pending_duration_s = 8; // optional + repeated MaxProver max_provers = 9; // optional } diff --git a/core/lib/protobuf_config/src/prover_autoscaler.rs b/core/lib/protobuf_config/src/prover_autoscaler.rs index f7da099cb82..e95e4003972 100644 --- a/core/lib/protobuf_config/src/prover_autoscaler.rs +++ b/core/lib/protobuf_config/src/prover_autoscaler.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::Context as _; use time::Duration; use zksync_config::configs::{self, prover_autoscaler::Gpu}; @@ -92,6 +94,15 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig { Some(s) => Duration::seconds(s.into()), None => Self::Type::default_long_pending_duration(), }, + max_provers: self.max_provers.iter().fold(HashMap::new(), |mut acc, e| { + let (cluster_and_gpu, max) = e.read().expect("max_provers"); + if let Some((cluster, gpu)) = cluster_and_gpu.split_once('/') { + acc.entry(cluster.to_string()) + .or_default() + .insert(gpu.parse().expect("max_provers/gpu"), max); + } + acc + }), }) } @@ -117,6 +128,15 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig { .map(|(k, v)| proto::ProverSpeed::build(&(*k, *v))) .collect(), long_pending_duration_s: Some(this.long_pending_duration.whole_seconds() as u32), + max_provers: this + .max_provers + .iter() + .flat_map(|(cluster, inner_map)| { + inner_map.iter().map(move |(gpu, max)| { + proto::MaxProver::build(&(format!("{}/{}", cluster, gpu), *max)) + }) + }) + .collect(), } } } @@ -170,3 +190,21 @@ impl ProtoRepr for proto::ProverSpeed { } } } + +impl ProtoRepr for proto::MaxProver { + type Type = (String, u32); + fn read(&self) -> anyhow::Result { + Ok(( + required(&self.cluster_and_gpu) + .context("cluster_and_gpu")? + .parse()?, + *required(&self.max).context("max")?, + )) + } + fn build(this: &Self::Type) -> Self { + Self { + cluster_and_gpu: Some(this.0.to_string()), + max: Some(this.1), + } + } +} diff --git a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs index b074e0774c9..c25b624b5d4 100644 --- a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs +++ b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs @@ -36,19 +36,11 @@ pub struct Namespace { pub pods: HashMap, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct Cluster { pub name: String, pub namespaces: HashMap, } -impl Default for Cluster { - fn default() -> Self { - Self { - name: "".to_string(), - namespaces: HashMap::new(), - } - } -} #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct Clusters { diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index 9f37c4d1167..75c9e2e3e42 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -56,6 +56,7 @@ pub struct Scaler { /// Which cluster to use first. cluster_priorities: HashMap, + max_provers: HashMap>, prover_speed: HashMap, long_pending_duration: chrono::Duration, } @@ -87,6 +88,7 @@ impl Scaler { watcher, queuer, cluster_priorities: config.cluster_priorities, + max_provers: config.max_provers, prover_speed: config.prover_speed, long_pending_duration: chrono::Duration::seconds( config.long_pending_duration.whole_seconds(), @@ -112,7 +114,12 @@ impl Scaler { let e = gp_map.entry(gpu).or_insert(GPUPool { name: cluster.name.clone(), gpu, - max_pool_size: 100, // TODO: get from the agent. + max_pool_size: self + .max_provers + .get(&cluster.name) + .and_then(|inner_map| inner_map.get(&gpu)) + .copied() + .unwrap_or(0), ..Default::default() }); @@ -265,23 +272,46 @@ impl Scaler { } } - tracing::debug!("run result: provers {:?}, total: {}", &provers, total); + tracing::debug!( + "run result for namespace {}: provers {:?}, total: {}", + namespace, + &provers, + total + ); provers } } +/// is_namespace_running returns true if there are some pods running in it. +fn is_namespace_running(namespace: &str, clusters: &Clusters) -> bool { + clusters + .clusters + .values() + .flat_map(|v| v.namespaces.iter()) + .filter_map(|(k, v)| if k == namespace { Some(v) } else { None }) + .flat_map(|v| v.deployments.values()) + .map( + |d| d.running + d.desired, // If there is something running or expected to run, we + // should consider the namespace. + ) + .sum::() + > 0 +} + #[async_trait::async_trait] impl Task for Scaler { async fn invoke(&self) -> anyhow::Result<()> { let queue = self.queuer.get_queue().await.unwrap(); - // TODO: Check that clusters data is ready. - let clusters = self.watcher.clusters.lock().await; + let guard = self.watcher.data.lock().await; + watcher::check_is_ready(&guard.is_ready)?; + for (ns, ppv) in &self.namespaces { let q = queue.queue.get(ppv).cloned().unwrap_or(0); - if q > 0 { - let provers = self.run(ns, q, &clusters); + tracing::debug!("Running eval for namespace {ns} and PPV {ppv} found queue {q}"); + if q > 0 || is_namespace_running(ns, &guard.clusters) { + let provers = self.run(ns, q, &guard.clusters); for (k, num) in &provers { AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)] .set(*num as u64); @@ -302,7 +332,7 @@ mod tests { use super::*; use crate::{ - cluster_types::{self, Deployment, Namespace, Pod}, + cluster_types::{Deployment, Namespace, Pod}, global::{queuer, watcher}, }; @@ -310,14 +340,19 @@ mod tests { fn test_run() { let watcher = watcher::Watcher { cluster_agents: vec![], - clusters: Arc::new(Mutex::new(cluster_types::Clusters { - ..Default::default() - })), + data: Arc::new(Mutex::new(watcher::WatchedData::default())), }; let queuer = queuer::Queuer { prover_job_monitor_url: "".to_string(), }; - let scaler = Scaler::new(watcher, queuer, ProverAutoscalerScalerConfig::default()); + let scaler = Scaler::new( + watcher, + queuer, + ProverAutoscalerScalerConfig { + max_provers: HashMap::from([("foo".to_string(), HashMap::from([(Gpu::L4, 100)]))]), + ..Default::default() + }, + ); let got = scaler.run( &"prover".to_string(), 1499, @@ -355,6 +390,6 @@ mod tests { }, 3, )]); - assert!(got == want); + assert_eq!(got, want); } } diff --git a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs index ef3ebd3b819..01fa68c60f8 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc}; -use anyhow::{Context, Ok}; +use anyhow::{anyhow, Context, Ok, Result}; use futures::future; use reqwest::Method; use tokio::sync::Mutex; @@ -12,15 +12,31 @@ use crate::{ task_wiring::Task, }; +#[derive(Default)] +pub struct WatchedData { + pub clusters: Clusters, + pub is_ready: Vec, +} + +pub fn check_is_ready(v: &Vec) -> Result<()> { + for b in v { + if !b { + return Err(anyhow!("Clusters data is not ready")); + } + } + Ok(()) +} + #[derive(Clone)] pub struct Watcher { /// List of base URLs of all agents. pub cluster_agents: Vec>, - pub clusters: Arc>, + pub data: Arc>, } impl Watcher { pub fn new(agent_urls: Vec) -> Self { + let size = agent_urls.len(); Self { cluster_agents: agent_urls .into_iter() @@ -31,8 +47,11 @@ impl Watcher { ) }) .collect(), - clusters: Arc::new(Mutex::new(Clusters { - clusters: HashMap::new(), + data: Arc::new(Mutex::new(WatchedData { + clusters: Clusters { + clusters: HashMap::new(), + }, + is_ready: vec![false; size], })), } } @@ -45,7 +64,8 @@ impl Task for Watcher { .cluster_agents .clone() .into_iter() - .map(|a| { + .enumerate() + .map(|(i, a)| { tracing::debug!("Getting cluster data from agent {}.", a); tokio::spawn(async move { let url: String = a @@ -55,13 +75,14 @@ impl Task for Watcher { .to_string(); let response = send_request_with_retries(&url, 5, Method::GET, None, None).await; - response + let res = response .map_err(|err| { anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}") })? .json::() .await - .context("Failed to read response as json") + .context("Failed to read response as json"); + Ok((i, res)) }) }) .collect(); @@ -71,18 +92,16 @@ impl Task for Watcher { .await .into_iter() .map(|h| async move { - let c = h.unwrap().unwrap(); - self.clusters - .lock() - .await - .clusters - .insert(c.name.clone(), c); + let (i, res) = h??; + let c = res?; + let mut guard = self.data.lock().await; + guard.clusters.clusters.insert(c.name.clone(), c); + guard.is_ready[i] = true; Ok(()) }) .collect::>(), ) - .await - .unwrap(); + .await?; Ok(()) } diff --git a/prover/crates/bin/prover_autoscaler/src/main.rs b/prover/crates/bin/prover_autoscaler/src/main.rs index 196bd6deb81..e3aec1fbd39 100644 --- a/prover/crates/bin/prover_autoscaler/src/main.rs +++ b/prover/crates/bin/prover_autoscaler/src/main.rs @@ -80,24 +80,21 @@ async fn main() -> anyhow::Result<()> { let _ = rustls::crypto::ring::default_provider().install_default(); let client = kube::Client::try_default().await?; - tracing::info!("Starting ProverAutoscaler"); - let mut tasks = vec![]; match opt.job { AutoscalerType::Agent => { + let cluster = opt + .cluster_name + .context("cluster_name is required for Agent")?; + tracing::info!("Starting ProverAutoscaler Agent for cluster {}", cluster); let agent_config = general_config.agent_config.context("agent_config")?; let exporter_config = PrometheusExporterConfig::pull(agent_config.prometheus_port); tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone()))); // TODO: maybe get cluster name from curl -H "Metadata-Flavor: Google" // http://metadata.google.internal/computeMetadata/v1/instance/attributes/cluster-name - let watcher = Watcher::new( - client.clone(), - opt.cluster_name - .context("cluster_name is required for Agent")?, - agent_config.namespaces, - ); + let watcher = Watcher::new(client.clone(), cluster, agent_config.namespaces); let scaler = Scaler { client }; tasks.push(tokio::spawn(watcher.clone().run())); tasks.push(tokio::spawn(agent::run_server( @@ -108,6 +105,7 @@ async fn main() -> anyhow::Result<()> { ))) } AutoscalerType::Scaler => { + tracing::info!("Starting ProverAutoscaler Scaler"); let scaler_config = general_config.scaler_config.context("scaler_config")?; let interval = scaler_config.scaler_run_interval.unsigned_abs(); let exporter_config = PrometheusExporterConfig::pull(scaler_config.prometheus_port);