Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(prover): Run for zero queue to allow scaling down to 0 #3115

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/lib/config/src/configs/prover_autoscaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub struct ProverAutoscalerScalerConfig {
pub cluster_priorities: HashMap<String, u32>,
/// Prover speed per GPU. Used to calculate desired number of provers for queue size.
pub prover_speed: HashMap<Gpu, u32>,
/// Maximum number of provers which can be run per cluster/GPU.
pub max_provers: HashMap<String, HashMap<Gpu, u32>>,
/// Duration after which pending pod considered long pending.
#[serde(default = "ProverAutoscalerScalerConfig::default_long_pending_duration")]
pub long_pending_duration: Duration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ message ProverSpeed {
optional uint32 speed = 2; // required
}

message MaxProver {
optional string cluster_and_gpu = 1; // required, format: <cluster>/<gpu>
optional uint32 max = 2; // required
}

message ProverAutoscalerScalerConfig {
optional uint32 prometheus_port = 1; // required
optional std.Duration scaler_run_interval = 2; // optional
Expand All @@ -43,4 +48,5 @@ message ProverAutoscalerScalerConfig {
repeated ClusterPriority cluster_priorities = 6; // optional
repeated ProverSpeed prover_speed = 7; // optional
optional uint32 long_pending_duration_s = 8; // optional
repeated MaxProver max_provers = 9; // optional
}
38 changes: 38 additions & 0 deletions core/lib/protobuf_config/src/prover_autoscaler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use anyhow::Context as _;
use time::Duration;
use zksync_config::configs::{self, prover_autoscaler::Gpu};
Expand Down Expand Up @@ -92,6 +94,15 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig {
Some(s) => Duration::seconds(s.into()),
None => Self::Type::default_long_pending_duration(),
},
max_provers: self.max_provers.iter().fold(HashMap::new(), |mut acc, e| {
let (cluster_and_gpu, max) = e.read().expect("max_provers");
if let Some((cluster, gpu)) = cluster_and_gpu.split_once('/') {
acc.entry(cluster.to_string())
.or_default()
.insert(gpu.parse().expect("max_provers/gpu"), max);
}
acc
}),
})
}

Expand All @@ -117,6 +128,15 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig {
.map(|(k, v)| proto::ProverSpeed::build(&(*k, *v)))
.collect(),
long_pending_duration_s: Some(this.long_pending_duration.whole_seconds() as u32),
max_provers: this
.max_provers
.iter()
.flat_map(|(cluster, inner_map)| {
inner_map.iter().map(move |(gpu, max)| {
proto::MaxProver::build(&(format!("{}/{}", cluster, gpu), *max))
})
})
.collect(),
}
}
}
Expand Down Expand Up @@ -170,3 +190,21 @@ impl ProtoRepr for proto::ProverSpeed {
}
}
}

impl ProtoRepr for proto::MaxProver {
type Type = (String, u32);
fn read(&self) -> anyhow::Result<Self::Type> {
Ok((
required(&self.cluster_and_gpu)
.context("cluster_and_gpu")?
.parse()?,
*required(&self.max).context("max")?,
))
}
fn build(this: &Self::Type) -> Self {
Self {
cluster_and_gpu: Some(this.0.to_string()),
max: Some(this.1),
}
}
}
10 changes: 1 addition & 9 deletions prover/crates/bin/prover_autoscaler/src/cluster_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,11 @@ pub struct Namespace {
pub pods: HashMap<String, Pod>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Cluster {
pub name: String,
pub namespaces: HashMap<String, Namespace>,
}
impl Default for Cluster {
fn default() -> Self {
Self {
name: "".to_string(),
namespaces: HashMap::new(),
}
}
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct Clusters {
Expand Down
59 changes: 47 additions & 12 deletions prover/crates/bin/prover_autoscaler/src/global/scaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct Scaler {

/// Which cluster to use first.
cluster_priorities: HashMap<String, u32>,
max_provers: HashMap<String, HashMap<Gpu, u32>>,
prover_speed: HashMap<Gpu, u32>,
long_pending_duration: chrono::Duration,
}
Expand Down Expand Up @@ -87,6 +88,7 @@ impl Scaler {
watcher,
queuer,
cluster_priorities: config.cluster_priorities,
max_provers: config.max_provers,
prover_speed: config.prover_speed,
long_pending_duration: chrono::Duration::seconds(
config.long_pending_duration.whole_seconds(),
Expand All @@ -112,7 +114,12 @@ impl Scaler {
let e = gp_map.entry(gpu).or_insert(GPUPool {
name: cluster.name.clone(),
gpu,
max_pool_size: 100, // TODO: get from the agent.
max_pool_size: self
.max_provers
.get(&cluster.name)
.and_then(|inner_map| inner_map.get(&gpu))
.copied()
.unwrap_or(0),
..Default::default()
});

Expand Down Expand Up @@ -265,23 +272,46 @@ impl Scaler {
}
}

tracing::debug!("run result: provers {:?}, total: {}", &provers, total);
tracing::debug!(
"run result for namespace {}: provers {:?}, total: {}",
namespace,
&provers,
total
);

provers
}
}

/// is_namespace_running returns true if there are some pods running in it.
fn is_namespace_running(namespace: &str, clusters: &Clusters) -> bool {
clusters
.clusters
.values()
.flat_map(|v| v.namespaces.iter())
.filter_map(|(k, v)| if k == namespace { Some(v) } else { None })
.flat_map(|v| v.deployments.values())
.map(
|d| d.running + d.desired, // If there is something running or expected to run, we
// should consider the namespace.
)
.sum::<i32>()
> 0
}

#[async_trait::async_trait]
impl Task for Scaler {
async fn invoke(&self) -> anyhow::Result<()> {
let queue = self.queuer.get_queue().await.unwrap();

// TODO: Check that clusters data is ready.
let clusters = self.watcher.clusters.lock().await;
let guard = self.watcher.data.lock().await;
watcher::check_is_ready(&guard.is_ready)?;

for (ns, ppv) in &self.namespaces {
let q = queue.queue.get(ppv).cloned().unwrap_or(0);
if q > 0 {
let provers = self.run(ns, q, &clusters);
tracing::debug!("Running eval for namespace {ns} and PPV {ppv} found queue {q}");
if q > 0 || is_namespace_running(ns, &guard.clusters) {
let provers = self.run(ns, q, &guard.clusters);
for (k, num) in &provers {
AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)]
.set(*num as u64);
Expand All @@ -302,22 +332,27 @@ mod tests {

use super::*;
use crate::{
cluster_types::{self, Deployment, Namespace, Pod},
cluster_types::{Deployment, Namespace, Pod},
global::{queuer, watcher},
};

#[test]
fn test_run() {
let watcher = watcher::Watcher {
cluster_agents: vec![],
clusters: Arc::new(Mutex::new(cluster_types::Clusters {
..Default::default()
})),
data: Arc::new(Mutex::new(watcher::WatchedData::default())),
};
let queuer = queuer::Queuer {
prover_job_monitor_url: "".to_string(),
};
let scaler = Scaler::new(watcher, queuer, ProverAutoscalerScalerConfig::default());
let scaler = Scaler::new(
watcher,
queuer,
ProverAutoscalerScalerConfig {
max_provers: HashMap::from([("foo".to_string(), HashMap::from([(Gpu::L4, 100)]))]),
..Default::default()
},
);
let got = scaler.run(
&"prover".to_string(),
1499,
Expand Down Expand Up @@ -355,6 +390,6 @@ mod tests {
},
3,
)]);
assert!(got == want);
assert_eq!(got, want);
}
}
49 changes: 34 additions & 15 deletions prover/crates/bin/prover_autoscaler/src/global/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, sync::Arc};

use anyhow::{Context, Ok};
use anyhow::{anyhow, Context, Ok, Result};
use futures::future;
use reqwest::Method;
use tokio::sync::Mutex;
Expand All @@ -12,15 +12,31 @@ use crate::{
task_wiring::Task,
};

#[derive(Default)]
pub struct WatchedData {
pub clusters: Clusters,
pub is_ready: Vec<bool>,
}

pub fn check_is_ready(v: &Vec<bool>) -> Result<()> {
for b in v {
if !b {
return Err(anyhow!("Clusters data is not ready"));
}
}
Ok(())
}

#[derive(Clone)]
pub struct Watcher {
/// List of base URLs of all agents.
pub cluster_agents: Vec<Arc<Url>>,
pub clusters: Arc<Mutex<Clusters>>,
pub data: Arc<Mutex<WatchedData>>,
}

impl Watcher {
pub fn new(agent_urls: Vec<String>) -> Self {
let size = agent_urls.len();
Self {
cluster_agents: agent_urls
.into_iter()
Expand All @@ -31,8 +47,11 @@ impl Watcher {
)
})
.collect(),
clusters: Arc::new(Mutex::new(Clusters {
clusters: HashMap::new(),
data: Arc::new(Mutex::new(WatchedData {
clusters: Clusters {
clusters: HashMap::new(),
},
is_ready: vec![false; size],
})),
}
}
Expand All @@ -45,7 +64,8 @@ impl Task for Watcher {
.cluster_agents
.clone()
.into_iter()
.map(|a| {
.enumerate()
.map(|(i, a)| {
tracing::debug!("Getting cluster data from agent {}.", a);
tokio::spawn(async move {
let url: String = a
Expand All @@ -55,13 +75,14 @@ impl Task for Watcher {
.to_string();
let response =
send_request_with_retries(&url, 5, Method::GET, None, None).await;
response
let res = response
.map_err(|err| {
anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}")
})?
.json::<Cluster>()
.await
.context("Failed to read response as json")
.context("Failed to read response as json");
Ok((i, res))
})
})
.collect();
Expand All @@ -71,18 +92,16 @@ impl Task for Watcher {
.await
.into_iter()
.map(|h| async move {
let c = h.unwrap().unwrap();
self.clusters
.lock()
.await
.clusters
.insert(c.name.clone(), c);
let (i, res) = h??;
let c = res?;
let mut guard = self.data.lock().await;
guard.clusters.clusters.insert(c.name.clone(), c);
guard.is_ready[i] = true;
Ok(())
})
.collect::<Vec<_>>(),
)
.await
.unwrap();
.await?;

Ok(())
}
Expand Down
14 changes: 6 additions & 8 deletions prover/crates/bin/prover_autoscaler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,21 @@ async fn main() -> anyhow::Result<()> {
let _ = rustls::crypto::ring::default_provider().install_default();
let client = kube::Client::try_default().await?;

tracing::info!("Starting ProverAutoscaler");

let mut tasks = vec![];

match opt.job {
AutoscalerType::Agent => {
let cluster = opt
.cluster_name
.context("cluster_name is required for Agent")?;
tracing::info!("Starting ProverAutoscaler Agent for cluster {}", cluster);
let agent_config = general_config.agent_config.context("agent_config")?;
let exporter_config = PrometheusExporterConfig::pull(agent_config.prometheus_port);
tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone())));

// TODO: maybe get cluster name from curl -H "Metadata-Flavor: Google"
// http://metadata.google.internal/computeMetadata/v1/instance/attributes/cluster-name
let watcher = Watcher::new(
client.clone(),
opt.cluster_name
.context("cluster_name is required for Agent")?,
agent_config.namespaces,
);
let watcher = Watcher::new(client.clone(), cluster, agent_config.namespaces);
let scaler = Scaler { client };
tasks.push(tokio::spawn(watcher.clone().run()));
tasks.push(tokio::spawn(agent::run_server(
Expand All @@ -108,6 +105,7 @@ async fn main() -> anyhow::Result<()> {
)))
}
AutoscalerType::Scaler => {
tracing::info!("Starting ProverAutoscaler Scaler");
let scaler_config = general_config.scaler_config.context("scaler_config")?;
let interval = scaler_config.scaler_run_interval.unsigned_abs();
let exporter_config = PrometheusExporterConfig::pull(scaler_config.prometheus_port);
Expand Down
Loading