From fa22fd6cc92b593f1d058f27888a9f09a680528c Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Fri, 25 Oct 2024 17:13:49 +0200 Subject: [PATCH 1/8] Cleanup unneeded pods. More tests. ref ZKD-1855 --- .../prover_autoscaler/src/cluster_types.rs | 1 + .../prover_autoscaler/src/global/scaler.rs | 124 +++++++++++++++++- .../bin/prover_autoscaler/src/k8s/watcher.rs | 5 + 3 files changed, 129 insertions(+), 1 deletion(-) diff --git a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs index e3e4c9b4df0..733615fad08 100644 --- a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs +++ b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs @@ -40,6 +40,7 @@ pub struct Namespace { #[serde(serialize_with = "ordered_map")] pub deployments: HashMap, pub pods: HashMap, + #[serde(default)] pub scale_errors: Vec, } diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index eb4249d071f..809bc32d7e9 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -421,7 +421,7 @@ impl Task for Scaler { mod tests { use super::*; use crate::{ - cluster_types::{Deployment, Namespace, Pod}, + cluster_types::{Deployment, Namespace, Pod, ScaleEvent}, global::{queuer, watcher}, }; @@ -765,4 +765,126 @@ mod tests { "Min 2 provers, 5 running" ); } + + #[tracing_test::traced_test] + #[test] + fn test_run_need_move() { + let scaler = Scaler::new( + watcher::Watcher::default(), + queuer::Queuer::default(), + ProverAutoscalerScalerConfig { + cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), + min_provers: [("prover".into(), 2)].into(), + max_provers: [ + ("foo".into(), [(Gpu::L4, 100)].into()), + ("bar".into(), [(Gpu::L4, 100)].into()), + ] + .into(), + long_pending_duration: ProverAutoscalerScalerConfig::default_long_pending_duration( + ), + ..Default::default() + }, + ); + + assert_eq!( + scaler.run( + &"prover".into(), + 1400, + &Clusters { + clusters: [ + ( + "foo".into(), + Cluster { + name: "foo".into(), + namespaces: [( + "prover".into(), + Namespace { + deployments: [( + "circuit-prover-gpu".into(), + Deployment { + running: 3, + desired: 3, + }, + )] + .into(), + pods: [ + ( + "circuit-prover-gpu-7c5f8fc747-gmtcr".into(), + Pod { + status: "Running".into(), + changed: Utc::now(), + ..Default::default() + }, + ), + ( + "circuit-prover-gpu-7c5f8fc747-gmtc2".into(), + Pod { + status: "Pending".into(), + changed: Utc::now(), + ..Default::default() + }, + ), + ( + "circuit-prover-gpu-7c5f8fc747-gmtc3".into(), + Pod { + status: "Running".into(), + changed: Utc::now(), + ..Default::default() + }, + ) + ] + .into(), + scale_errors: vec![ScaleEvent { + name: "circuit-prover-gpu-7c5f8fc747-gmtc2.123456" + .into(), + time: Utc::now() - chrono::Duration::hours(1) + }], + }, + )] + .into(), + }, + ), + ( + "bar".into(), + Cluster { + name: "bar".into(), + namespaces: [( + "prover".into(), + Namespace { + deployments: [( + "circuit-prover-gpu".into(), + Deployment::default(), + )] + .into(), + ..Default::default() + }, + )] + .into(), + }, + ) + ] + .into(), + ..Default::default() + }, + ), + [ + ( + GPUPoolKey { + cluster: "foo".into(), + gpu: Gpu::L4, + }, + 2, + ), + ( + GPUPoolKey { + cluster: "bar".into(), + gpu: Gpu::L4, + }, + 1, + ) + ] + .into(), + "Move 1 prover to bar" + ); + } } diff --git a/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs b/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs index 5384db082bc..707ff04f183 100644 --- a/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs @@ -134,6 +134,11 @@ impl Watcher { } pod.status = phase; + if pod.status == "Succeeded" || pod.status == "Failed" { + // Cleaning up list of pods. + v.pods.remove(&p.name_any()); + } + tracing::info!("Got pod: {}", p.name_any()) } Watched::Event(e) => { From 3b5b2518a6a60130642af37ab1bba14ddb09129b Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Mon, 28 Oct 2024 11:53:31 +0100 Subject: [PATCH 2/8] feat(prover): Add support for scaling WGs and compressor. ref ZKD-1855 --- .../config/src/configs/prover_autoscaler.rs | 43 +++ .../src/proto/config/prover_autoscaler.proto | 15 +- .../protobuf_config/src/prover_autoscaler.rs | 61 ++++ .../prover_autoscaler/src/cluster_types.rs | 1 + .../prover_autoscaler/src/global/queuer.rs | 56 ++- .../prover_autoscaler/src/global/scaler.rs | 343 +++++++++++++++--- .../bin/prover_autoscaler/src/metrics.rs | 5 +- 7 files changed, 457 insertions(+), 67 deletions(-) diff --git a/core/lib/config/src/configs/prover_autoscaler.rs b/core/lib/config/src/configs/prover_autoscaler.rs index b24a1a26651..d345b53e6f3 100644 --- a/core/lib/config/src/configs/prover_autoscaler.rs +++ b/core/lib/config/src/configs/prover_autoscaler.rs @@ -61,6 +61,8 @@ pub struct ProverAutoscalerScalerConfig { /// Duration after which pending pod considered long pending. #[serde(default = "ProverAutoscalerScalerConfig::default_long_pending_duration")] pub long_pending_duration: Duration, + /// List of simple autoscaler targets. + pub scaler_targets: Vec, } #[derive( @@ -93,6 +95,41 @@ pub enum Gpu { A100, } +// TODO: generate this enum by QueueReport from https://github.com/matter-labs/zksync-era/blob/main/prover/crates/bin/prover_job_monitor/src/autoscaler_queue_reporter.rs#L23 +// and remove allowing of non_camel_case_types by generating field name parser. +#[derive(Debug, Display, PartialEq, Eq, Hash, Clone, Deserialize, EnumString, Default)] +#[allow(non_camel_case_types)] +pub enum QueueReportFields { + #[strum(ascii_case_insensitive)] + basic_witness_jobs, + #[strum(ascii_case_insensitive)] + leaf_witness_jobs, + #[strum(ascii_case_insensitive)] + node_witness_jobs, + #[strum(ascii_case_insensitive)] + recursion_tip_witness_jobs, + #[strum(ascii_case_insensitive)] + scheduler_witness_jobs, + #[strum(ascii_case_insensitive)] + proof_compressor_jobs, + #[default] + #[strum(ascii_case_insensitive)] + prover_jobs, +} + +/// ScalerTarget can be configured to autoscale any of services for which queue is reported by +/// prover-job-monitor, except of provers. Provers need special treatment due to GPU requirement. +#[derive(Debug, Clone, PartialEq, Deserialize, Default)] +pub struct ScalerTarget { + pub queue_report_field: QueueReportFields, + pub pod_name_prefix: String, + /// Max replicas per cluster. + pub max_replicas: HashMap, + /// The queue will be divided by the speed and rounded up to get number of replicas. + #[serde(default = "ScalerTarget::default_speed")] + pub speed: usize, +} + impl ProverAutoscalerConfig { /// Default graceful shutdown timeout -- 5 seconds pub fn default_graceful_shutdown_timeout() -> Duration { @@ -126,3 +163,9 @@ impl ProverAutoscalerScalerConfig { Duration::minutes(10) } } + +impl ScalerTarget { + pub fn default_speed() -> usize { + 1 + } +} diff --git a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto index 9b7f201e9b7..0f723e22a93 100644 --- a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto +++ b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto @@ -45,15 +45,28 @@ message MinProver { optional uint32 min = 2; // required } +message MaxReplica { + optional string cluster = 1; // required + optional uint64 max = 2; // required +} + +message ScalerTarget { + optional string queue_report_field = 1; // required + optional string pod_name_prefix = 2; // required + repeated MaxReplica max_replicas = 3; // required at least one + optional uint64 speed = 4; // optional +} + message ProverAutoscalerScalerConfig { optional uint32 prometheus_port = 1; // required optional std.Duration scaler_run_interval = 2; // optional optional string prover_job_monitor_url = 3; // required repeated string agents = 4; // required at least one - repeated ProtocolVersion protocol_versions = 5; // repeated at least one + repeated ProtocolVersion protocol_versions = 5; // required at least one repeated ClusterPriority cluster_priorities = 6; // optional repeated ProverSpeed prover_speed = 7; // optional optional uint32 long_pending_duration_s = 8; // optional repeated MaxProver max_provers = 9; // optional repeated MinProver min_provers = 10; // optional + repeated ScalerTarget scaler_targets = 11; // optional } diff --git a/core/lib/protobuf_config/src/prover_autoscaler.rs b/core/lib/protobuf_config/src/prover_autoscaler.rs index 51f1b162d4c..c3e7c9719f1 100644 --- a/core/lib/protobuf_config/src/prover_autoscaler.rs +++ b/core/lib/protobuf_config/src/prover_autoscaler.rs @@ -112,6 +112,12 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig { .map(|(i, e)| e.read().context(i)) .collect::>() .context("min_provers")?, + scaler_targets: self + .scaler_targets + .iter() + .enumerate() + .map(|(i, x)| x.read().context(i).unwrap()) + .collect::>(), }) } @@ -151,6 +157,7 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig { .iter() .map(|(k, v)| proto::MinProver::build(&(k.clone(), *v))) .collect(), + scaler_targets: this.scaler_targets.iter().map(ProtoRepr::build).collect(), } } } @@ -238,3 +245,57 @@ impl ProtoRepr for proto::MinProver { } } } + +impl ProtoRepr for proto::MaxReplica { + type Type = (String, usize); + fn read(&self) -> anyhow::Result { + Ok(( + required(&self.cluster).context("cluster")?.parse()?, + *required(&self.max).context("max")? as usize, + )) + } + fn build(this: &Self::Type) -> Self { + Self { + cluster: Some(this.0.to_string()), + max: Some(this.1 as u64), + } + } +} + +impl ProtoRepr for proto::ScalerTarget { + type Type = configs::prover_autoscaler::ScalerTarget; + fn read(&self) -> anyhow::Result { + Ok(Self::Type { + queue_report_field: required(&self.queue_report_field) + .and_then(|x| Ok((*x).parse()?)) + .context("queue_report_field")?, + pod_name_prefix: required(&self.pod_name_prefix) + .context("pod_name_prefix")? + .clone(), + max_replicas: self + .max_replicas + .iter() + .enumerate() + .map(|(i, e)| e.read().context(i)) + .collect::>() + .context("max_replicas")?, + speed: match self.speed { + Some(x) => x as usize, + None => Self::Type::default_speed(), + }, + }) + } + + fn build(this: &Self::Type) -> Self { + Self { + queue_report_field: Some(this.queue_report_field.to_string()), + pod_name_prefix: Some(this.pod_name_prefix.clone()), + max_replicas: this + .max_replicas + .iter() + .map(|(k, v)| proto::MaxReplica::build(&(k.clone(), *v))) + .collect(), + speed: Some(this.speed as u64), + } + } +} diff --git a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs index 733615fad08..db215e570ef 100644 --- a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs +++ b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs @@ -65,4 +65,5 @@ pub enum PodStatus { Pending, LongPending, NeedToMove, + Failed, } diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs index 32610ebf3c3..d6801e0d1b5 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -2,31 +2,58 @@ use std::collections::HashMap; use anyhow::{Context, Ok}; use reqwest::Method; -use zksync_prover_job_monitor::autoscaler_queue_reporter::VersionedQueueReport; +use zksync_config::configs::prover_autoscaler::{QueueReportFields, ScalerTarget}; +use zksync_prover_job_monitor::autoscaler_queue_reporter::{QueueReport, VersionedQueueReport}; use zksync_utils::http_with_retries::send_request_with_retries; use crate::metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}; const MAX_RETRIES: usize = 5; -#[derive(Debug)] -pub struct Queue { - pub queue: HashMap, -} +pub type Queue = HashMap<(String, QueueReportFields), u64>; #[derive(Default)] pub struct Queuer { pub prover_job_monitor_url: String, + pub targets: Vec, +} + +fn target_to_queue(target: QueueReportFields, report: &QueueReport) -> u64 { + let q = match target { + QueueReportFields::basic_witness_jobs => { + report.basic_witness_jobs.queued + report.basic_witness_jobs.in_progress + } + QueueReportFields::leaf_witness_jobs => { + report.leaf_witness_jobs.queued + report.leaf_witness_jobs.in_progress + } + QueueReportFields::node_witness_jobs => { + report.node_witness_jobs.queued + report.node_witness_jobs.in_progress + } + QueueReportFields::recursion_tip_witness_jobs => { + report.recursion_tip_witness_jobs.queued + report.recursion_tip_witness_jobs.in_progress + } + QueueReportFields::scheduler_witness_jobs => { + report.scheduler_witness_jobs.queued + report.scheduler_witness_jobs.in_progress + } + QueueReportFields::proof_compressor_jobs => { + report.proof_compressor_jobs.queued + report.proof_compressor_jobs.in_progress + } + QueueReportFields::prover_jobs => { + report.prover_jobs.queued + report.prover_jobs.in_progress + } + }; + q as u64 } impl Queuer { pub fn new(pjm_url: String) -> Self { Self { prover_job_monitor_url: pjm_url, + targets: Vec::default(), } } - pub async fn get_queue(&self) -> anyhow::Result { + pub async fn get_queue(&self, jobs: &[QueueReportFields]) -> anyhow::Result { let url = &self.prover_job_monitor_url; let response = send_request_with_retries(url, MAX_RETRIES, Method::GET, None, None).await; let response = response.map_err(|err| { @@ -39,11 +66,16 @@ impl Queuer { .json::>() .await .context("Failed to read response as json")?; - Ok(Queue { - queue: response - .iter() - .map(|x| (x.version.to_string(), x.report.prover_jobs.queued as u64)) - .collect::>(), - }) + Ok(response + .iter() + .flat_map(|x| { + jobs.iter().map(move |j| { + ( + (x.version.to_string(), j.clone()), + target_to_queue(j.clone(), &x.report), + ) + }) + }) + .collect::>()) } } diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index 809bc32d7e9..fdc163aea0d 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -4,7 +4,9 @@ use chrono::Utc; use debug_map_sorted::SortedOutputExt; use once_cell::sync::Lazy; use regex::Regex; -use zksync_config::configs::prover_autoscaler::{Gpu, ProverAutoscalerScalerConfig}; +use zksync_config::configs::prover_autoscaler::{ + Gpu, ProverAutoscalerScalerConfig, QueueReportFields, ScalerTarget, +}; use super::{queuer, watcher}; use crate::{ @@ -65,6 +67,12 @@ pub struct Scaler { watcher: watcher::Watcher, queuer: queuer::Queuer, + jobs: Vec, + prover_scaler: GpuScaler, + simple_scalers: Vec, +} + +pub struct GpuScaler { /// Which cluster to use first. cluster_priorities: HashMap, min_provers: HashMap, @@ -73,6 +81,16 @@ pub struct Scaler { long_pending_duration: chrono::Duration, } +pub struct SimpleScaler { + queue_report_field: QueueReportFields, + pod_name_prefix: String, + /// Which cluster to use first. + cluster_priorities: HashMap, + max_replicas: HashMap, + speed: usize, + long_pending_duration: chrono::Duration, +} + struct ProverPodGpu<'a> { name: &'a str, pod: &'a Pod, @@ -102,10 +120,31 @@ impl Scaler { AUTOSCALER_METRICS.prover_protocol_version[&(namespace.clone(), version.clone())] .set(1); }); + + let mut simple_scalers = Vec::default(); + let mut jobs = vec![QueueReportFields::prover_jobs]; + for c in &config.scaler_targets { + jobs.push(c.queue_report_field.clone()); + simple_scalers.push(SimpleScaler::new( + c, + config.cluster_priorities.clone(), + chrono::Duration::seconds(config.long_pending_duration.whole_seconds()), + )) + } Self { - namespaces: config.protocol_versions, + namespaces: config.protocol_versions.clone(), watcher, queuer, + jobs, + prover_scaler: GpuScaler::new(config), + simple_scalers, + } + } +} + +impl GpuScaler { + pub fn new(config: ProverAutoscalerScalerConfig) -> Self { + Self { cluster_priorities: config.cluster_priorities, min_provers: config.min_provers, max_provers: config.max_provers, @@ -116,6 +155,7 @@ impl Scaler { } } + /// Converts a single cluster into vec of GPUPools, one for each GPU. fn convert_to_gpu_pool(&self, namespace: &String, cluster: &Cluster) -> Vec { let mut gp_map = HashMap::new(); // let Some(namespace_value) = &cluster.namespaces.get(namespace) else { @@ -218,6 +258,10 @@ impl Scaler { .then(b.max_pool_size.cmp(&a.max_pool_size)) // Reverse sort by cluster size. }); + gpu_pools.iter().for_each(|p| { + AUTOSCALER_METRICS.scale_errors[&p.name.clone()].set(p.scale_errors as u64); + }); + gpu_pools } @@ -323,6 +367,193 @@ impl Scaler { } } +#[derive(Default, Debug, PartialEq, Eq)] +struct Pool { + name: String, + pods: HashMap, + scale_errors: usize, + max_pool_size: usize, +} + +impl Pool { + fn sum_by_pod_status(&self, ps: PodStatus) -> usize { + self.pods.get(&ps).cloned().unwrap_or(0) + } +} + +impl SimpleScaler { + pub fn new( + config: &ScalerTarget, + cluster_priorities: HashMap, + long_pending_duration: chrono::Duration, + ) -> Self { + Self { + queue_report_field: config.queue_report_field.clone(), + pod_name_prefix: config.pod_name_prefix.clone(), + cluster_priorities, + max_replicas: config.max_replicas.clone(), + speed: config.speed, + long_pending_duration, + } + } + + fn convert_to_pool(&self, namespace: &String, cluster: &Cluster) -> Option { + let Some(namespace_value) = &cluster.namespaces.get(namespace) else { + // No namespace in config, ignoring. + return None; + }; + + // TODO: Check if related deployment exists. + let mut pool = Pool { + name: cluster.name.clone(), + max_pool_size: self.max_replicas.get(&cluster.name).copied().unwrap_or(0), + scale_errors: namespace_value + .scale_errors + .iter() + .filter(|v| v.time < Utc::now() - chrono::Duration::hours(1)) // TODO Move the duration into config. + .count(), + ..Default::default() + }; + + // Initialize pool only if we have ready deployments. + pool.pods.insert(PodStatus::Running, 0); + + let pod_re = Regex::new(&format!("^{}-", self.pod_name_prefix)).unwrap(); + for pod in namespace_value.pods.iter().filter_map(|(pod_name, pod)| { + if pod_re.is_match(pod_name) { + return Some(pod); + } + None + }) { + let mut status = PodStatus::from_str(&pod.status).unwrap_or_default(); + if status == PodStatus::Pending && pod.changed < Utc::now() - self.long_pending_duration + { + status = PodStatus::LongPending; + } + pool.pods.entry(status).and_modify(|n| *n += 1).or_insert(1); + } + + tracing::debug!("Pool pods {:?}", pool); + + Some(pool) + } + + fn sorted_clusters(&self, namespace: &String, clusters: &Clusters) -> Vec { + let mut pools: Vec = clusters + .clusters + .values() + .flat_map(|c| self.convert_to_pool(namespace, c)) + .collect(); + + pools.sort_by(|a, b| { + a.sum_by_pod_status(PodStatus::NeedToMove) + .cmp(&b.sum_by_pod_status(PodStatus::NeedToMove)) // Sort by need to evict. + .then( + a.sum_by_pod_status(PodStatus::LongPending) + .cmp(&b.sum_by_pod_status(PodStatus::LongPending)), + ) // Sort by long Pending pods. + .then(a.scale_errors.cmp(&b.scale_errors)) // Sort by scale_errors in the cluster. + .then( + self.cluster_priorities + .get(&a.name) + .unwrap_or(&1000) + .cmp(self.cluster_priorities.get(&b.name).unwrap_or(&1000)), + ) // Sort by priority. + .then(b.max_pool_size.cmp(&a.max_pool_size)) // Reverse sort by cluster size. + }); + + pools + } + + fn pods_to_speed(&self, n: usize) -> u64 { + (self.speed * n) as u64 + } + + fn normalize_queue(&self, queue: u64) -> u64 { + let speed = self.speed as u64; + // Divide and round up if there's any remainder. + (queue + speed - 1) / speed * speed + } + + fn run(&self, namespace: &String, queue: u64, clusters: &Clusters) -> HashMap { + let sorted_clusters = self.sorted_clusters(namespace, clusters); + tracing::debug!( + "Sorted clusters for namespace {}: {:?}", + namespace, + &sorted_clusters + ); + + let mut total: i64 = 0; + let mut pods: HashMap = HashMap::new(); + for cluster in &sorted_clusters { + for (status, replicas) in &cluster.pods { + match status { + PodStatus::Running | PodStatus::Pending => { + total += self.pods_to_speed(*replicas) as i64; + pods.entry(cluster.name.clone()) + .and_modify(|x| *x += replicas) + .or_insert(*replicas); + } + _ => (), // Ignore LongPending as not running here. + } + } + } + + // Remove unneeded pods. + if (total as u64) > self.normalize_queue(queue) { + for cluster in sorted_clusters.iter().rev() { + let mut excess_queue = total as u64 - self.normalize_queue(queue); + let mut excess_pods = excess_queue as usize / self.speed; + let replicas = pods.entry(cluster.name.clone()).or_default(); + if *replicas < excess_pods { + excess_pods = *replicas; + excess_queue = *replicas as u64 * self.speed as u64; + } + *replicas -= excess_pods; + total -= excess_queue as i64; + if total <= 0 { + break; + }; + } + } + + // Reduce load in over capacity pools. + for cluster in &sorted_clusters { + let replicas = pods.entry(cluster.name.clone()).or_default(); + if cluster.max_pool_size < *replicas { + let excess = *replicas - cluster.max_pool_size; + total -= (excess * self.speed) as i64; + *replicas -= excess; + } + } + + tracing::debug!("Queue covered with provers: {}", total); + // Add required pods. + if (total as u64) < queue { + for cluster in &sorted_clusters { + let mut required_queue = queue - total as u64; + let mut required_pods = self.normalize_queue(required_queue) as usize / self.speed; + let replicas = pods.entry(cluster.name.clone()).or_default(); + if *replicas + required_pods > cluster.max_pool_size { + required_pods = cluster.max_pool_size - *replicas; + required_queue = (required_pods * self.speed) as u64; + } + *replicas += required_pods; + total += required_queue as i64; + } + } + + tracing::debug!( + "run result for namespace {}: provers {:?}, total: {}", + namespace, + &pods, + total + ); + + pods + } +} + fn diff( namespace: &str, provers: HashMap, @@ -383,7 +614,7 @@ fn is_namespace_running(namespace: &str, clusters: &Clusters) -> bool { #[async_trait::async_trait] impl Task for Scaler { async fn invoke(&self) -> anyhow::Result<()> { - let queue = self.queuer.get_queue().await.unwrap(); + let queue = self.queuer.get_queue(&self.jobs).await.unwrap(); let mut scale_requests: HashMap = HashMap::new(); { @@ -396,16 +627,38 @@ impl Task for Scaler { } for (ns, ppv) in &self.namespaces { - let q = queue.queue.get(ppv).cloned().unwrap_or(0); + // Prover + let q = queue + .get(&(ppv.to_string(), QueueReportFields::prover_jobs)) + .cloned() + .unwrap_or(0); tracing::debug!("Running eval for namespace {ns} and PPV {ppv} found queue {q}"); if q > 0 || is_namespace_running(ns, &guard.clusters) { - let provers = self.run(ns, q, &guard.clusters); + let provers = self.prover_scaler.run(ns, q, &guard.clusters); for (k, num) in &provers { AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)] .set(*num as u64); } diff(ns, provers, &guard.clusters, &mut scale_requests); } + + // Simple Scalers. + for scaler in &self.simple_scalers { + let q = queue + .get(&(ppv.to_string(), scaler.queue_report_field.clone())) + .cloned() + .unwrap_or(0); + tracing::debug!("Running eval for namespace {ns}, PPV {ppv}, simple scaler {} found queue {q}", scaler.pod_name_prefix); + if q > 0 || is_namespace_running(ns, &guard.clusters) { + let pods = scaler.run(ns, q, &guard.clusters); + for (k, num) in &pods { + AUTOSCALER_METRICS.jobs + [&(scaler.pod_name_prefix.clone(), k.clone(), ns.clone())] + .set(*num as u64); + } + // TODO: diff and add into scale_requests. + } + } } } // Unlock self.watcher.data. @@ -420,28 +673,21 @@ impl Task for Scaler { #[cfg(test)] mod tests { use super::*; - use crate::{ - cluster_types::{Deployment, Namespace, Pod, ScaleEvent}, - global::{queuer, watcher}, - }; + use crate::cluster_types::{Deployment, Namespace, Pod, ScaleEvent}; #[tracing_test::traced_test] #[test] fn test_run() { - let scaler = Scaler::new( - watcher::Watcher::default(), - queuer::Queuer::default(), - ProverAutoscalerScalerConfig { - cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), - min_provers: [("prover-other".into(), 2)].into(), - max_provers: [ - ("foo".into(), [(Gpu::L4, 100)].into()), - ("bar".into(), [(Gpu::L4, 100)].into()), - ] - .into(), - ..Default::default() - }, - ); + let scaler = GpuScaler::new(ProverAutoscalerScalerConfig { + cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), + min_provers: [("prover-other".into(), 2)].into(), + max_provers: [ + ("foo".into(), [(Gpu::L4, 100)].into()), + ("bar".into(), [(Gpu::L4, 100)].into()), + ] + .into(), + ..Default::default() + }); assert_eq!( scaler.run( @@ -570,20 +816,16 @@ mod tests { #[tracing_test::traced_test] #[test] fn test_run_min_provers() { - let scaler = Scaler::new( - watcher::Watcher::default(), - queuer::Queuer::default(), - ProverAutoscalerScalerConfig { - cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), - min_provers: [("prover".into(), 2)].into(), - max_provers: [ - ("foo".into(), [(Gpu::L4, 100)].into()), - ("bar".into(), [(Gpu::L4, 100)].into()), - ] - .into(), - ..Default::default() - }, - ); + let scaler = GpuScaler::new(ProverAutoscalerScalerConfig { + cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), + min_provers: [("prover".into(), 2)].into(), + max_provers: [ + ("foo".into(), [(Gpu::L4, 100)].into()), + ("bar".into(), [(Gpu::L4, 100)].into()), + ] + .into(), + ..Default::default() + }); assert_eq!( scaler.run( @@ -769,22 +1011,17 @@ mod tests { #[tracing_test::traced_test] #[test] fn test_run_need_move() { - let scaler = Scaler::new( - watcher::Watcher::default(), - queuer::Queuer::default(), - ProverAutoscalerScalerConfig { - cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), - min_provers: [("prover".into(), 2)].into(), - max_provers: [ - ("foo".into(), [(Gpu::L4, 100)].into()), - ("bar".into(), [(Gpu::L4, 100)].into()), - ] - .into(), - long_pending_duration: ProverAutoscalerScalerConfig::default_long_pending_duration( - ), - ..Default::default() - }, - ); + let scaler = GpuScaler::new(ProverAutoscalerScalerConfig { + cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), + min_provers: [("prover".into(), 2)].into(), + max_provers: [ + ("foo".into(), [(Gpu::L4, 100)].into()), + ("bar".into(), [(Gpu::L4, 100)].into()), + ] + .into(), + long_pending_duration: ProverAutoscalerScalerConfig::default_long_pending_duration(), + ..Default::default() + }); assert_eq!( scaler.run( diff --git a/prover/crates/bin/prover_autoscaler/src/metrics.rs b/prover/crates/bin/prover_autoscaler/src/metrics.rs index d94ac8b97e9..853e3db000f 100644 --- a/prover/crates/bin/prover_autoscaler/src/metrics.rs +++ b/prover/crates/bin/prover_autoscaler/src/metrics.rs @@ -10,10 +10,13 @@ pub(crate) struct AutoscalerMetrics { pub prover_protocol_version: LabeledFamily<(String, String), Gauge, 2>, #[metrics(labels = ["target_cluster", "target_namespace", "gpu"])] pub provers: LabeledFamily<(String, String, Gpu), Gauge, 3>, + #[metrics(labels = ["job", "target_cluster", "target_namespace"])] + pub jobs: LabeledFamily<(String, String, String), Gauge, 3>, pub clusters_not_ready: Counter, #[metrics(labels = ["target", "status"])] pub calls: LabeledFamily<(String, u16), Counter, 2>, - // TODO: count of command send succes/fail + #[metrics(labels = ["target_cluster"])] + pub scale_errors: LabeledFamily, 1>, } #[vise::register] From 3be6663a2d766f38c8f26687aabf07554667533a Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Mon, 28 Oct 2024 12:03:11 +0100 Subject: [PATCH 3/8] Use sum_queue fn. --- .../prover_autoscaler/src/global/queuer.rs | 36 ++++++++----------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs index d6801e0d1b5..02510d81e62 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -4,6 +4,7 @@ use anyhow::{Context, Ok}; use reqwest::Method; use zksync_config::configs::prover_autoscaler::{QueueReportFields, ScalerTarget}; use zksync_prover_job_monitor::autoscaler_queue_reporter::{QueueReport, VersionedQueueReport}; +use zksync_types::prover_dal::JobCountStatistics; use zksync_utils::http_with_retries::send_request_with_retries; use crate::metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}; @@ -18,31 +19,22 @@ pub struct Queuer { pub targets: Vec, } +fn sum_queue(jobs: JobCountStatistics) -> u64 { + (jobs.queued + jobs.in_progress) as u64 +} + fn target_to_queue(target: QueueReportFields, report: &QueueReport) -> u64 { - let q = match target { - QueueReportFields::basic_witness_jobs => { - report.basic_witness_jobs.queued + report.basic_witness_jobs.in_progress - } - QueueReportFields::leaf_witness_jobs => { - report.leaf_witness_jobs.queued + report.leaf_witness_jobs.in_progress - } - QueueReportFields::node_witness_jobs => { - report.node_witness_jobs.queued + report.node_witness_jobs.in_progress - } + match target { + QueueReportFields::basic_witness_jobs => sum_queue(report.basic_witness_jobs), + QueueReportFields::leaf_witness_jobs => sum_queue(report.leaf_witness_jobs), + QueueReportFields::node_witness_jobs => sum_queue(report.node_witness_jobs), QueueReportFields::recursion_tip_witness_jobs => { - report.recursion_tip_witness_jobs.queued + report.recursion_tip_witness_jobs.in_progress + sum_queue(report.recursion_tip_witness_jobs) } - QueueReportFields::scheduler_witness_jobs => { - report.scheduler_witness_jobs.queued + report.scheduler_witness_jobs.in_progress - } - QueueReportFields::proof_compressor_jobs => { - report.proof_compressor_jobs.queued + report.proof_compressor_jobs.in_progress - } - QueueReportFields::prover_jobs => { - report.prover_jobs.queued + report.prover_jobs.in_progress - } - }; - q as u64 + QueueReportFields::scheduler_witness_jobs => sum_queue(report.scheduler_witness_jobs), + QueueReportFields::proof_compressor_jobs => sum_queue(report.proof_compressor_jobs), + QueueReportFields::prover_jobs => sum_queue(report.prover_jobs), + } } impl Queuer { From 795d987fe4474c9164d41e981a02ab75bbc71ea8 Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Mon, 28 Oct 2024 12:12:24 +0100 Subject: [PATCH 4/8] Remove unused targets --- prover/crates/bin/prover_autoscaler/src/global/queuer.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs index 02510d81e62..c66130354c9 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -16,7 +16,6 @@ pub type Queue = HashMap<(String, QueueReportFields), u64>; #[derive(Default)] pub struct Queuer { pub prover_job_monitor_url: String, - pub targets: Vec, } fn sum_queue(jobs: JobCountStatistics) -> u64 { @@ -41,7 +40,6 @@ impl Queuer { pub fn new(pjm_url: String) -> Self { Self { prover_job_monitor_url: pjm_url, - targets: Vec::default(), } } From ff93edd14425459e15b65ec89a083d133add4c7c Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Mon, 28 Oct 2024 12:24:40 +0100 Subject: [PATCH 5/8] Remove unused import --- prover/crates/bin/prover_autoscaler/src/global/queuer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs index c66130354c9..6983f954dc4 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use anyhow::{Context, Ok}; use reqwest::Method; -use zksync_config::configs::prover_autoscaler::{QueueReportFields, ScalerTarget}; +use zksync_config::configs::prover_autoscaler::QueueReportFields; use zksync_prover_job_monitor::autoscaler_queue_reporter::{QueueReport, VersionedQueueReport}; use zksync_types::prover_dal::JobCountStatistics; use zksync_utils::http_with_retries::send_request_with_retries; From 03e99bed99aaa7dca162d099f2632153a40733cf Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Mon, 28 Oct 2024 13:49:04 +0100 Subject: [PATCH 6/8] Move sum fn to basic_types lib. --- core/lib/basic_types/src/prover_dal.rs | 18 ++++++++----- .../prover_autoscaler/src/global/queuer.rs | 26 +++++++------------ 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/core/lib/basic_types/src/prover_dal.rs b/core/lib/basic_types/src/prover_dal.rs index bec5a55ced1..8bef0190e2a 100644 --- a/core/lib/basic_types/src/prover_dal.rs +++ b/core/lib/basic_types/src/prover_dal.rs @@ -28,12 +28,6 @@ pub struct ExtendedJobCountStatistics { pub successful: usize, } -#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] -pub struct JobCountStatistics { - pub queued: usize, - pub in_progress: usize, -} - impl Add for ExtendedJobCountStatistics { type Output = ExtendedJobCountStatistics; @@ -47,6 +41,18 @@ impl Add for ExtendedJobCountStatistics { } } +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] +pub struct JobCountStatistics { + pub queued: usize, + pub in_progress: usize, +} + +impl JobCountStatistics { + pub fn sum(&self) -> usize { + self.queued + self.in_progress + } +} + #[derive(Debug)] pub struct StuckJobs { pub id: u64, diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs index 6983f954dc4..a01530fb6c6 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -4,7 +4,6 @@ use anyhow::{Context, Ok}; use reqwest::Method; use zksync_config::configs::prover_autoscaler::QueueReportFields; use zksync_prover_job_monitor::autoscaler_queue_reporter::{QueueReport, VersionedQueueReport}; -use zksync_types::prover_dal::JobCountStatistics; use zksync_utils::http_with_retries::send_request_with_retries; use crate::metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}; @@ -18,22 +17,17 @@ pub struct Queuer { pub prover_job_monitor_url: String, } -fn sum_queue(jobs: JobCountStatistics) -> u64 { - (jobs.queued + jobs.in_progress) as u64 -} - fn target_to_queue(target: QueueReportFields, report: &QueueReport) -> u64 { - match target { - QueueReportFields::basic_witness_jobs => sum_queue(report.basic_witness_jobs), - QueueReportFields::leaf_witness_jobs => sum_queue(report.leaf_witness_jobs), - QueueReportFields::node_witness_jobs => sum_queue(report.node_witness_jobs), - QueueReportFields::recursion_tip_witness_jobs => { - sum_queue(report.recursion_tip_witness_jobs) - } - QueueReportFields::scheduler_witness_jobs => sum_queue(report.scheduler_witness_jobs), - QueueReportFields::proof_compressor_jobs => sum_queue(report.proof_compressor_jobs), - QueueReportFields::prover_jobs => sum_queue(report.prover_jobs), - } + let res = match target { + QueueReportFields::basic_witness_jobs => report.basic_witness_jobs.sum(), + QueueReportFields::leaf_witness_jobs => report.leaf_witness_jobs.sum(), + QueueReportFields::node_witness_jobs => report.node_witness_jobs.sum(), + QueueReportFields::recursion_tip_witness_jobs => report.recursion_tip_witness_jobs.sum(), + QueueReportFields::scheduler_witness_jobs => report.scheduler_witness_jobs.sum(), + QueueReportFields::proof_compressor_jobs => report.proof_compressor_jobs.sum(), + QueueReportFields::prover_jobs => report.prover_jobs.sum(), + }; + res as u64 } impl Queuer { From 8be71f2d1967fca7ea640157cee26958400ec058 Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Tue, 29 Oct 2024 12:55:13 +0100 Subject: [PATCH 7/8] Address the comments --- core/lib/basic_types/src/prover_dal.rs | 3 +- .../prover_autoscaler/src/global/queuer.rs | 53 +++++++++++-------- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/core/lib/basic_types/src/prover_dal.rs b/core/lib/basic_types/src/prover_dal.rs index 8bef0190e2a..d86f79ba77a 100644 --- a/core/lib/basic_types/src/prover_dal.rs +++ b/core/lib/basic_types/src/prover_dal.rs @@ -48,7 +48,8 @@ pub struct JobCountStatistics { } impl JobCountStatistics { - pub fn sum(&self) -> usize { + /// all returns sum of queued and in_progress. + pub fn all(&self) -> usize { self.queued + self.in_progress } } diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs index a01530fb6c6..e2cd1c6a4fb 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, ops::Deref}; use anyhow::{Context, Ok}; use reqwest::Method; @@ -10,22 +10,29 @@ use crate::metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}; const MAX_RETRIES: usize = 5; -pub type Queue = HashMap<(String, QueueReportFields), u64>; +pub struct Queue(HashMap<(String, QueueReportFields), u64>); + +impl Deref for Queue { + type Target = HashMap<(String, QueueReportFields), u64>; + fn deref(&self) -> &Self::Target { + &self.0 + } +} #[derive(Default)] pub struct Queuer { pub prover_job_monitor_url: String, } -fn target_to_queue(target: QueueReportFields, report: &QueueReport) -> u64 { +fn target_to_queue(target: &QueueReportFields, report: &QueueReport) -> u64 { let res = match target { - QueueReportFields::basic_witness_jobs => report.basic_witness_jobs.sum(), - QueueReportFields::leaf_witness_jobs => report.leaf_witness_jobs.sum(), - QueueReportFields::node_witness_jobs => report.node_witness_jobs.sum(), - QueueReportFields::recursion_tip_witness_jobs => report.recursion_tip_witness_jobs.sum(), - QueueReportFields::scheduler_witness_jobs => report.scheduler_witness_jobs.sum(), - QueueReportFields::proof_compressor_jobs => report.proof_compressor_jobs.sum(), - QueueReportFields::prover_jobs => report.prover_jobs.sum(), + QueueReportFields::basic_witness_jobs => report.basic_witness_jobs.all(), + QueueReportFields::leaf_witness_jobs => report.leaf_witness_jobs.all(), + QueueReportFields::node_witness_jobs => report.node_witness_jobs.all(), + QueueReportFields::recursion_tip_witness_jobs => report.recursion_tip_witness_jobs.all(), + QueueReportFields::scheduler_witness_jobs => report.scheduler_witness_jobs.all(), + QueueReportFields::proof_compressor_jobs => report.proof_compressor_jobs.all(), + QueueReportFields::prover_jobs => report.prover_jobs.all(), }; res as u64 } @@ -37,12 +44,14 @@ impl Queuer { } } + /// Requests queue report from prover-job-monitor and parse it into Queue HashMap for provided + /// list of jobs. pub async fn get_queue(&self, jobs: &[QueueReportFields]) -> anyhow::Result { let url = &self.prover_job_monitor_url; let response = send_request_with_retries(url, MAX_RETRIES, Method::GET, None, None).await; let response = response.map_err(|err| { AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc(); - anyhow::anyhow!("Failed fetching queue from url: {url}: {err:?}") + anyhow::anyhow!("Failed fetching queue from URL: {url}: {err:?}") })?; AUTOSCALER_METRICS.calls[&(url.clone(), response.status().as_u16())].inc(); @@ -50,16 +59,18 @@ impl Queuer { .json::>() .await .context("Failed to read response as json")?; - Ok(response - .iter() - .flat_map(|x| { - jobs.iter().map(move |j| { - ( - (x.version.to_string(), j.clone()), - target_to_queue(j.clone(), &x.report), - ) + Ok(Queue( + response + .iter() + .flat_map(|versioned_report| { + jobs.iter().map(move |j| { + ( + (versioned_report.version.to_string(), j.clone()), + target_to_queue(j, &versioned_report.report), + ) + }) }) - }) - .collect::>()) + .collect::>(), + )) } } From 8167e03d41fc4e059a9b8468d81ad7083a85349d Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Tue, 29 Oct 2024 14:02:07 +0100 Subject: [PATCH 8/8] Simpler pod filtering --- .../crates/bin/prover_autoscaler/src/global/scaler.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index fdc163aea0d..1bdd2b25104 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -419,12 +419,11 @@ impl SimpleScaler { pool.pods.insert(PodStatus::Running, 0); let pod_re = Regex::new(&format!("^{}-", self.pod_name_prefix)).unwrap(); - for pod in namespace_value.pods.iter().filter_map(|(pod_name, pod)| { - if pod_re.is_match(pod_name) { - return Some(pod); - } - None - }) { + for (_, pod) in namespace_value + .pods + .iter() + .filter(|(name, _)| pod_re.is_match(name)) + { let mut status = PodStatus::from_str(&pod.status).unwrap_or_default(); if status == PodStatus::Pending && pod.changed < Utc::now() - self.long_pending_duration {