Skip to content

Commit

Permalink
Treat connection loss
Browse files Browse the repository at this point in the history
  • Loading branch information
imbolc committed Jun 27, 2024
1 parent dee7b2b commit 4cf5154
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 10 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use chrono::{DateTime, Utc};
use sqlx::{types::Uuid, PgPool};
use std::time::Duration;

const LOST_CONNECTION_SLEEP: Duration = Duration::from_secs(1);

/// Enqueues the task to be run immediately
pub async fn enqueue(db: &PgPool, task: &impl Scheduler) -> Result<Uuid> {
task.enqueue(db).await
Expand Down
11 changes: 11 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,14 @@ pub fn ordinal(n: i32) -> String {
},
}
}

/// Waits for the db reconnection
pub async fn wait_for_reconnection(db: &sqlx::PgPool, sleep: std::time::Duration) {
while let Err(sqlx::Error::Io(_)) = sqlx::query!("SELECT id FROM pg_task LIMIT 1")
.fetch_optional(db)
.await
{
tracing::trace!("Waiting for db reconnection");
tokio::time::sleep(sleep).await;
}
}
15 changes: 12 additions & 3 deletions src/waiter.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::{util, LOST_CONNECTION_SLEEP};
use sqlx::{postgres::PgListener, PgPool};
use std::{sync::Arc, time::Duration};
use tokio::{
sync::{futures::Notified, Notify},
time::timeout,
time::{sleep, timeout},
};
use tracing::trace;
use tracing::{trace, warn};

/// Waits for tasks table to change
pub struct Waiter(Arc<Notify>);
Expand Down Expand Up @@ -32,7 +33,15 @@ impl Waiter {
let notify = self.0.clone();
tokio::spawn(async move {
loop {
listener.recv().await.unwrap();
if let Err(e) = listener.recv().await {
warn!("Listening for the tasks table changes is interrupted (probably due to db connection loss):\n{}", source_chain::to_string(&e));
sleep(LOST_CONNECTION_SLEEP).await;
util::wait_for_reconnection(&db, LOST_CONNECTION_SLEEP).await;
warn!("Listening for the tasks table changes is probably restored");
// Absence of `continue` isn't a bug. We have to behave as
// if a change was detected since a notification could be
// lost during the interruption
}
notify.notify_waiters();
}
});
Expand Down
23 changes: 16 additions & 7 deletions src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{util, waiter::Waiter, NextStep, Step, StepError};
use crate::{util, waiter::Waiter, NextStep, Step, StepError, LOST_CONNECTION_SLEEP};
use chrono::{DateTime, Utc};
use code_path::code_path;
use sqlx::{
Expand All @@ -7,7 +7,7 @@ use sqlx::{
};
use std::{marker::PhantomData, time::Duration};
use tokio::time::sleep;
use tracing::{debug, error, info, trace};
use tracing::{debug, error, info, trace, warn};

/// An error report to log from the worker
#[derive(Debug, displaydoc::Display, thiserror::Error)]
Expand Down Expand Up @@ -85,11 +85,20 @@ impl<S: Step<S>> Worker<S> {

// TODO concurrency
loop {
let Ok(task) = self.recv_task().await.map_err(ErrorReport::log) else {
sleep(Duration::from_secs(1)).await;
continue;
match self.recv_task().await {
Ok(task) => {
self.run_step(task).await.map_err(ErrorReport::log).ok();
}
Err(e) => {
warn!(
"Can't fetch a task (probably due to db connection loss):\n{}",
source_chain::to_string(&e)
);
sleep(LOST_CONNECTION_SLEEP).await;
util::wait_for_reconnection(&self.db, LOST_CONNECTION_SLEEP).await;
warn!("Task fetching is probably restored");
}
};
self.run_step(task).await.map_err(ErrorReport::log).ok();
}
}

Expand Down Expand Up @@ -151,7 +160,7 @@ impl<S: Step<S>> Worker<S> {
Ok(())
}

/// Waits until the next task is ready, locks it as running and returns it.
/// Waits until the next task is ready, marks it as running and returns it.
async fn recv_task(&self) -> ReportResult<Task> {
trace!("Receiving the next task");

Expand Down

0 comments on commit 4cf5154

Please sign in to comment.