Skip to content

Commit

Permalink
Better waiter
Browse files Browse the repository at this point in the history
  • Loading branch information
imbolc committed Jun 26, 2024
1 parent 0e6bc52 commit 92c2f1a
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 134 deletions.
2 changes: 2 additions & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
wrap_comments = true
imports_granularity = "Crate"
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ version = "0.0.7"
async-trait = "0.1"
chrono = { version = "0.4", features = ["std", "serde"] }
code-path = "0.3"
displaydoc = "0.2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
source-chain = "0.1"
Expand Down
24 changes: 5 additions & 19 deletions examples/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use chrono::{DateTime, Utc};
use pg_task::{NextStep, Step, StepResult};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::{env, time::Duration};
use std::time::Duration;

mod util;

// It wraps the task step into an enum which proxies necessary methods
pg_task::task!(Count {
Expand All @@ -17,14 +19,13 @@ pg_task::scheduler!(Tasks { Count });

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let db = connect().await?;
init_logging()?;
let db = util::init().await?;

// Let's schedule a few tasks
pg_task::enqueue(&db, &Tasks::Count(Start { up_to: 1000 }.into())).await?;

// And run a worker
pg_task::Worker::<Tasks>::new(db).run().await;
pg_task::Worker::<Tasks>::new(db).run().await?;

Ok(())
}
Expand Down Expand Up @@ -103,18 +104,3 @@ fn num_seconds(duration: chrono::Duration) -> f64 {
let nanos = duration.num_nanoseconds().unwrap() % 1_000_000_000;
seconds as f64 + (nanos as f64 / 1_000_000_000.0)
}

async fn connect() -> anyhow::Result<sqlx::PgPool> {
dotenv::dotenv().ok();
let db = PgPool::connect(&env::var("DATABASE_URL")?).await?;
sqlx::migrate!().run(&db).await?;
Ok(db)
}

fn init_logging() -> anyhow::Result<()> {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.finish();
tracing::subscriber::set_global_default(subscriber)?;
Ok(())
}
35 changes: 15 additions & 20 deletions examples/delay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,47 @@ use async_trait::async_trait;
use pg_task::{NextStep, Step, StepResult};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::{env, time::Duration};
use std::time::Duration;

mod util;

// It wraps the task step into an enum which proxies necessary methods
pg_task::task!(FooBar { Foo, Bar });
pg_task::task!(Sleeper { Sleep, Wakeup });

// Also we need a enum representing all the possible tasks
pg_task::scheduler!(Tasks { FooBar });
pg_task::scheduler!(Tasks { Sleeper });

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let db = connect().await?;
let db = util::init().await?;

// Let's schedule a few tasks
for delay in [3, 1, 2] {
pg_task::enqueue(&db, &Tasks::FooBar(Foo(delay).into())).await?;
pg_task::enqueue(&db, &Tasks::Sleeper(Sleep(delay).into())).await?;
}

// And run a worker
pg_task::Worker::<Tasks>::new(db).run().await;
pg_task::Worker::<Tasks>::new(db).run().await?;

Ok(())
}

#[derive(Debug, Deserialize, Serialize)]
pub struct Foo(u64);
pub struct Sleep(u64);
#[async_trait]
impl Step<FooBar> for Foo {
async fn step(self, _db: &PgPool) -> StepResult<FooBar> {
impl Step<Sleeper> for Sleep {
async fn step(self, _db: &PgPool) -> StepResult<Sleeper> {
println!("Sleeping for {} sec", self.0);
NextStep::delay(Bar(self.0), Duration::from_secs(self.0))
NextStep::delay(Wakeup(self.0), Duration::from_secs(self.0))
}
}

#[derive(Debug, Deserialize, Serialize)]
pub struct Bar(u64);
pub struct Wakeup(u64);
#[async_trait]
impl Step<FooBar> for Bar {
async fn step(self, _db: &PgPool) -> StepResult<FooBar> {
impl Step<Sleeper> for Wakeup {
async fn step(self, _db: &PgPool) -> StepResult<Sleeper> {
println!("Woke up after {} sec", self.0);
NextStep::none()
}
}

async fn connect() -> anyhow::Result<sqlx::PgPool> {
dotenv::dotenv().ok();
let db = PgPool::connect(&env::var("DATABASE_URL")?).await?;
sqlx::migrate!().run(&db).await?;
Ok(db)
}
27 changes: 27 additions & 0 deletions examples/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use anyhow::Result;
use sqlx::PgPool;
use tracing_subscriber::{EnvFilter, FmtSubscriber};

pub async fn init() -> Result<PgPool> {
dotenv::dotenv().ok();
init_logging()?;
connect().await
}

async fn connect() -> Result<PgPool> {
let db = sqlx::PgPool::connect(&std::env::var("DATABASE_URL")?).await?;
sqlx::migrate!().run(&db).await?;
Ok(db)
}

fn init_logging() -> Result<()> {
let subscriber = FmtSubscriber::builder()
.with_env_filter(EnvFilter::from_default_env())
.finish();
tracing::subscriber::set_global_default(subscriber)?;
Ok(())
}

// Make `cagro check --examples` happy
#[allow(dead_code)]
fn main() {}
15 changes: 9 additions & 6 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use crate::NextStep;
use std::{error::Error as StdError, result::Result as StdResult};
use tracing::error;

/// The crate error
#[derive(Debug, thiserror::Error)]
#[derive(Debug, displaydoc::Display, thiserror::Error)]
pub enum Error {
/// Db error during task saving
#[error("add task")]
/// can't add task
AddTask(#[source] sqlx::Error),
/// Task serialization issue
#[error("serialize step")]
/// can't serialize step
SerializeStep(#[source] serde_json::Error),
/// can't unlock stale tasks
UnlockStaleTasks(#[source] sqlx::Error),
/// waiter can't connect to the db
WaiterConnect(#[source] sqlx::Error),
/// waiter can't start listening to tables changes
WaiterListen(#[source] sqlx::Error),
}

/// The crate result
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod macros;
mod next_step;
mod traits;
mod util;
mod waiter;
mod worker;

pub use error::{Error, Result, StepError, StepResult};
Expand Down
16 changes: 15 additions & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/// Converts a chrono duration to std, it uses absolute value of the chrono duration
/// Converts a chrono duration to std, it uses absolute value of the chrono
/// duration
pub fn chrono_duration_to_std(chrono_duration: chrono::Duration) -> std::time::Duration {
let seconds = chrono_duration.num_seconds();
let nanos = chrono_duration.num_nanoseconds().unwrap_or(0) % 1_000_000_000;
Expand All @@ -9,3 +10,16 @@ pub fn chrono_duration_to_std(chrono_duration: chrono::Duration) -> std::time::D
pub fn std_duration_to_chrono(std_duration: std::time::Duration) -> chrono::Duration {
chrono::Duration::from_std(std_duration).unwrap_or_else(|_| chrono::Duration::max_value())
}

/// Returns the ordinal string of a given integer
pub fn ordinal(n: i32) -> String {
match n.abs() {
11..=13 => format!("{}-th", n),
_ => match n % 10 {
1 => format!("{}-st", n),
2 => format!("{}-nd", n),
3 => format!("{}-rd", n),
_ => format!("{}-th", n),
},
}
}
65 changes: 65 additions & 0 deletions src/waiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use sqlx::{postgres::PgListener, PgPool};
use std::{sync::Arc, time::Duration};
use tokio::{
sync::{futures::Notified, Notify},
time::timeout,
};
use tracing::trace;

/// Waits for tasks table to change
pub struct Waiter(Arc<Notify>);
pub struct Subscription<'a>(Notified<'a>);

const PG_NOTIFICATION_CHANNEL: &str = "pg_task_changed";

impl Waiter {
/// Creates a waiter
pub fn new() -> Self {
let notify = Arc::new(Notify::new());
Self(notify)
}

/// Connects to the db and starts to listen to tasks table changes
pub async fn listen(&self, db: PgPool) -> crate::Result<()> {
let mut listener = PgListener::connect_with(&db)
.await
.map_err(crate::Error::WaiterListen)?;
listener
.listen(PG_NOTIFICATION_CHANNEL)
.await
.map_err(crate::Error::WaiterListen)?;

let notify = self.0.clone();
tokio::spawn(async move {
loop {
listener.recv().await.unwrap();
notify.notify_waiters();
}
});
Ok(())
}

/// Subscribes for notifications.
///
/// Awaiting on the result ends on the first notification after the
/// subscription, even if it happens between the subscription and awaiting.
pub fn subscribe(&self) -> Subscription<'_> {
Subscription(self.0.notified())
}
}

impl<'a> Subscription<'a> {
pub async fn wait_for(self, period: Duration) {
trace!("⌛Waiting for the tasks table to change for {period:?}");
match timeout(period, self.0).await {
Ok(_) => trace!("⚡The tasks table has changed"),
Err(_) => trace!("⏰The waiting timeout has expired"),
}
}

pub async fn wait_forever(self) {
trace!("⌛Waiting for the tasks table to change");
self.0.await;
trace!("⚡The tasks table has changed");
}
}
Loading

0 comments on commit 92c2f1a

Please sign in to comment.