diff --git a/core/node/node_framework/src/implementations/layers/postgres.rs b/core/node/node_framework/src/implementations/layers/postgres.rs index 8a81b8709895..bf602f1de631 100644 --- a/core/node/node_framework/src/implementations/layers/postgres.rs +++ b/core/node/node_framework/src/implementations/layers/postgres.rs @@ -1,11 +1,15 @@ -use std::time::Duration; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; -use serde::{Deserialize, Serialize}; -use tokio::sync::watch; +use async_trait::async_trait; +use serde::Serialize; +use tokio::sync::RwLock; use zksync_dal::{ metrics::PostgresMetrics, system_dal::DatabaseMigration, ConnectionPool, Core, CoreDal, }; -use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; +use zksync_health_check::{CheckHealth, Health, HealthStatus}; use crate::{ implementations::resources::{ @@ -38,8 +42,6 @@ pub struct Input { pub struct Output { #[context(task)] pub metrics_task: PostgresMetricsScrapingTask, - #[context(task)] - pub health_task: DatabaseHealthTask, } #[async_trait::async_trait] @@ -58,16 +60,15 @@ impl WiringLayer for PostgresLayer { }; let app_health = input.app_health.0; - let health_task = DatabaseHealthTask::new(pool); - app_health - .insert_component(health_task.health_check()) + .insert_custom_component(Arc::new(DatabaseHealthCheck { + polling_interval: TASK_EXECUTION_INTERVAL, + pool, + cached: RwLock::default(), + })) .map_err(WiringError::internal)?; - Ok(Output { - metrics_task, - health_task, - }) + Ok(Output { metrics_task }) } } @@ -99,7 +100,7 @@ impl Task for PostgresMetricsScrapingTask { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] pub struct DatabaseInfo { last_migration: DatabaseMigration, } @@ -111,62 +112,60 @@ impl From for Health { } #[derive(Debug)] -pub struct DatabaseHealthTask { +struct DatabaseHealthCheck { polling_interval: Duration, - connection_pool: ConnectionPool, - updater: HealthUpdater, + pool: ConnectionPool, + cached: RwLock>, } -impl DatabaseHealthTask { - fn new(connection_pool: ConnectionPool) -> Self { - Self { - polling_interval: TASK_EXECUTION_INTERVAL, - connection_pool, - updater: ReactiveHealthCheck::new("database").1, - } +impl DatabaseHealthCheck { + async fn update(&self) -> anyhow::Result { + let mut conn = self.pool.connection_tagged("postgres_healthcheck").await?; + let last_migration = conn.system_dal().get_last_migration().await?; + Ok(DatabaseInfo { last_migration }) } - async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> - where - Self: Sized, - { - let timeout = self.polling_interval; - let mut conn = self - .connection_pool - .connection_tagged("postgres_healthcheck") - .await?; - - tracing::info!("Starting database healthcheck with frequency: {timeout:?}",); - - while !*stop_receiver.borrow_and_update() { - let last_migration = conn.system_dal().get_last_migration().await?; - self.updater.update(DatabaseInfo { last_migration }.into()); - - // Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this. - tokio::time::timeout(timeout, stop_receiver.changed()) - .await - .ok(); + fn validate_cache(&self, cache: Option<&(DatabaseInfo, Instant)>) -> Option { + let now = Instant::now(); + if let Some((cached, cached_at)) = cache { + let elapsed = now + .checked_duration_since(*cached_at) + .unwrap_or(Duration::ZERO); + (elapsed <= self.polling_interval).then(|| cached.clone()) + } else { + None } - tracing::info!("Stop signal received; database healthcheck is shut down"); - Ok(()) - } - - pub fn health_check(&self) -> ReactiveHealthCheck { - self.updater.subscribe() } } -#[async_trait::async_trait] -impl Task for DatabaseHealthTask { - fn kind(&self) -> TaskKind { - TaskKind::UnconstrainedTask +#[async_trait] +impl CheckHealth for DatabaseHealthCheck { + fn name(&self) -> &'static str { + "database" } - fn id(&self) -> TaskId { - "database_health".into() - } + // If the DB malfunctions, this method would time out, which would lead to the health check marked as failed. + async fn check_health(&self) -> Health { + let cached = self.cached.read().await.clone(); + if let Some(cache) = self.validate_cache(cached.as_ref()) { + return cache.into(); + } - async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { - (*self).run(stop_receiver.0).await + let mut cached_lock = self.cached.write().await; + // The cached value may have been updated by another task. + if let Some(cache) = self.validate_cache(cached_lock.as_ref()) { + return cache.into(); + } + + match self.update().await { + Ok(info) => { + *cached_lock = Some((info.clone(), Instant::now())); + info.into() + } + Err(err) => { + tracing::warn!("Error updating database health: {err:#}"); + cached.map_or_else(|| HealthStatus::Affected.into(), |(info, _)| info.into()) + } + } } }