Skip to content

Commit

Permalink
Rework database healthcheck
Browse files Browse the repository at this point in the history
  • Loading branch information
slowli committed Nov 27, 2024
1 parent 7b8f0ec commit 955d318
Showing 1 changed file with 58 additions and 59 deletions.
117 changes: 58 additions & 59 deletions core/node/node_framework/src/implementations/layers/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use std::time::Duration;
use std::{
sync::Arc,
time::{Duration, Instant},
};

use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use async_trait::async_trait;
use serde::Serialize;
use tokio::sync::RwLock;
use zksync_dal::{
metrics::PostgresMetrics, system_dal::DatabaseMigration, ConnectionPool, Core, CoreDal,
};
use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck};
use zksync_health_check::{CheckHealth, Health, HealthStatus};

use crate::{
implementations::resources::{
Expand Down Expand Up @@ -38,8 +42,6 @@ pub struct Input {
pub struct Output {
#[context(task)]
pub metrics_task: PostgresMetricsScrapingTask,
#[context(task)]
pub health_task: DatabaseHealthTask,
}

#[async_trait::async_trait]
Expand All @@ -58,16 +60,15 @@ impl WiringLayer for PostgresLayer {
};

let app_health = input.app_health.0;
let health_task = DatabaseHealthTask::new(pool);

app_health
.insert_component(health_task.health_check())
.insert_custom_component(Arc::new(DatabaseHealthCheck {
polling_interval: TASK_EXECUTION_INTERVAL,
pool,
cached: RwLock::default(),
}))
.map_err(WiringError::internal)?;

Ok(Output {
metrics_task,
health_task,
})
Ok(Output { metrics_task })
}
}

Expand Down Expand Up @@ -99,7 +100,7 @@ impl Task for PostgresMetricsScrapingTask {
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize)]
pub struct DatabaseInfo {
last_migration: DatabaseMigration,
}
Expand All @@ -111,62 +112,60 @@ impl From<DatabaseInfo> for Health {
}

#[derive(Debug)]
pub struct DatabaseHealthTask {
struct DatabaseHealthCheck {
polling_interval: Duration,
connection_pool: ConnectionPool<Core>,
updater: HealthUpdater,
pool: ConnectionPool<Core>,
cached: RwLock<Option<(DatabaseInfo, Instant)>>,
}

impl DatabaseHealthTask {
fn new(connection_pool: ConnectionPool<Core>) -> Self {
Self {
polling_interval: TASK_EXECUTION_INTERVAL,
connection_pool,
updater: ReactiveHealthCheck::new("database").1,
}
impl DatabaseHealthCheck {
async fn update(&self) -> anyhow::Result<DatabaseInfo> {
let mut conn = self.pool.connection_tagged("postgres_healthcheck").await?;
let last_migration = conn.system_dal().get_last_migration().await?;
Ok(DatabaseInfo { last_migration })
}

async fn run(self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()>
where
Self: Sized,
{
let timeout = self.polling_interval;
let mut conn = self
.connection_pool
.connection_tagged("postgres_healthcheck")
.await?;

tracing::info!("Starting database healthcheck with frequency: {timeout:?}",);

while !*stop_receiver.borrow_and_update() {
let last_migration = conn.system_dal().get_last_migration().await?;
self.updater.update(DatabaseInfo { last_migration }.into());

// Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this.
tokio::time::timeout(timeout, stop_receiver.changed())
.await
.ok();
fn validate_cache(&self, cache: Option<&(DatabaseInfo, Instant)>) -> Option<DatabaseInfo> {
let now = Instant::now();
if let Some((cached, cached_at)) = cache {
let elapsed = now
.checked_duration_since(*cached_at)
.unwrap_or(Duration::ZERO);
(elapsed <= self.polling_interval).then(|| cached.clone())
} else {
None
}
tracing::info!("Stop signal received; database healthcheck is shut down");
Ok(())
}

pub fn health_check(&self) -> ReactiveHealthCheck {
self.updater.subscribe()
}
}

#[async_trait::async_trait]
impl Task for DatabaseHealthTask {
fn kind(&self) -> TaskKind {
TaskKind::UnconstrainedTask
#[async_trait]
impl CheckHealth for DatabaseHealthCheck {
fn name(&self) -> &'static str {
"database"
}

fn id(&self) -> TaskId {
"database_health".into()
}
// If the DB malfunctions, this method would time out, which would lead to the health check marked as failed.
async fn check_health(&self) -> Health {
let cached = self.cached.read().await.clone();
if let Some(cache) = self.validate_cache(cached.as_ref()) {
return cache.into();
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
(*self).run(stop_receiver.0).await
let mut cached_lock = self.cached.write().await;
// The cached value may have been updated by another task.
if let Some(cache) = self.validate_cache(cached_lock.as_ref()) {
return cache.into();
}

match self.update().await {
Ok(info) => {
*cached_lock = Some((info.clone(), Instant::now()));
info.into()
}
Err(err) => {
tracing::warn!("Error updating database health: {err:#}");
cached.map_or_else(|| HealthStatus::Affected.into(), |(info, _)| info.into())
}
}
}
}

0 comments on commit 955d318

Please sign in to comment.