From b73546267675822d3d2e467296e0ddc1a54a68c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9o=20Coletta?= Date: Wed, 29 May 2024 12:32:36 +0200 Subject: [PATCH] feat: Job stream now yield as much job as the concurrency option defines --- src/builder.rs | 23 +++++++++++++++++++++ src/runner.rs | 5 +++-- src/streams.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 75 insertions(+), 8 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 0925897..76afc59 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -46,6 +46,8 @@ pub enum WorkerBuildError { } impl WorkerOptions { + /// Build a new worker with the given options. + /// It migrate the database and fetch the tasks details. pub async fn init(self) -> Result { let pg_pool = match self.pg_pool { Some(pg_pool) => pg_pool, @@ -96,36 +98,52 @@ impl WorkerOptions { Ok(worker) } + /// Set the postgresql schema to use for the worker. pub fn schema(mut self, value: &str) -> Self { self.schema = Some(value.into()); self } + /// Set the number of concurrent jobs that can be run at the same time. + /// Default is the number of logical CPUs in the system. + /// + /// # Panics + /// + /// Panics if the value is 0. pub fn concurrency(mut self, value: usize) -> Self { + if value == 0 { + panic!("Concurrency must be greater than 0"); + } + self.concurrency = Some(value); self } + /// Set the interval at which the worker should poll the database for new jobs. pub fn poll_interval(mut self, value: Duration) -> Self { self.poll_interval = Some(value); self } + /// Set the postgresql pool to use for the worker. pub fn pg_pool(mut self, value: PgPool) -> Self { self.pg_pool = Some(value); self } + /// Set the postgresql database url to use for the worker. pub fn database_url(mut self, value: &str) -> Self { self.database_url = Some(value.into()); self } + /// Set the maximum number of postgresql connections to use for the worker. pub fn max_pg_conn(mut self, value: u32) -> Self { self.max_pg_conn = Some(value); self } + /// Define a job to be run by the worker. pub fn define_job(mut self) -> Self { let identifier = T::IDENTIFIER; @@ -139,11 +157,13 @@ impl WorkerOptions { self } + /// Adds a forbidden flag to the worker. pub fn add_forbidden_flag(mut self, flag: &str) -> Self { self.forbidden_flags.push(flag.into()); self } + /// Adds a crontab to the worker. pub fn with_crontab(mut self, input: &str) -> Result { let mut crontabs = parse_crontab(input)?; match self.crontabs.as_mut() { @@ -155,11 +175,14 @@ impl WorkerOptions { Ok(self) } + /// Set whether the worker should use local time or postgresql time. pub fn use_local_time(mut self, value: bool) -> Self { self.use_local_time = value; self } + /// Add an extension to the worker. + /// Usefull for providing custom app state. pub fn add_extension(mut self, value: T) -> Self { self.extensions.insert(value); self diff --git a/src/runner.rs b/src/runner.rs index 5a357e6..370a146 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -17,7 +17,7 @@ use graphile_worker_extensions::ReadOnlyExtensions; use graphile_worker_job::Job; use graphile_worker_shutdown_signal::ShutdownSignal; use thiserror::Error; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace, warn}; use crate::builder::WorkerOptions; use crate::sql::complete_job::complete_job; @@ -126,6 +126,7 @@ impl Worker { self.pg_pool.clone(), self.poll_interval, self.shutdown_signal.clone(), + self.concurrency, ) .await?; @@ -200,7 +201,7 @@ async fn process_one_job( } None => { // TODO: Retry one time because maybe synchronization issue - debug!(source = ?source, "No job found"); + trace!(source = ?source, "No job found"); Ok(None) } } diff --git a/src/streams.rs b/src/streams.rs index bf09986..597eb7e 100644 --- a/src/streams.rs +++ b/src/streams.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{num::NonZeroUsize, time::Duration}; use futures::{stream, Stream}; use graphile_worker_shutdown_signal::ShutdownSignal; @@ -11,30 +11,73 @@ use crate::{ Job, }; -#[derive(Debug)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum StreamSource { Polling, PgListener, RunOnce, } +struct JobSignalStreamData { + interval: tokio::time::Interval, + pg_listener: PgListener, + shutdown_signal: ShutdownSignal, + concurrency: usize, + yield_n: Option<(NonZeroUsize, StreamSource)>, +} + +impl JobSignalStreamData { + fn new( + interval: tokio::time::Interval, + pg_listener: PgListener, + shutdown_signal: ShutdownSignal, + concurrency: usize, + ) -> Self { + JobSignalStreamData { + interval, + pg_listener, + shutdown_signal, + concurrency, + yield_n: None, + } + } +} + /// Returns a stream that yield on postgres `NOTIFY 'jobs:insert'` and on interval specified by the /// poll_interval argument pub async fn job_signal_stream( pg_pool: PgPool, poll_interval: Duration, shutdown_signal: ShutdownSignal, + concurrency: usize, ) -> Result> { let interval = tokio::time::interval(poll_interval); let mut pg_listener = PgListener::connect_with(&pg_pool).await?; pg_listener.listen("jobs:insert").await?; + let stream_data = JobSignalStreamData::new(interval, pg_listener, shutdown_signal, concurrency); + let stream = stream::unfold(stream_data, |mut f| async { + if let Some((n, source)) = f.yield_n.take() { + let source = source.clone(); + if n.get() > 1 { + let remaining_yields = n.get() - 1; + f.yield_n = Some((NonZeroUsize::new(remaining_yields).unwrap(), source)); + } else { + f.yield_n = None; + } + return Some((source, f)); + } - let stream = stream::unfold((interval, pg_listener, shutdown_signal), |mut f| async { tokio::select! { - _ = (f.0).tick() => Some((StreamSource::Polling, f)), - _ = (f.1).recv() => Some((StreamSource::PgListener, f)), - _ = &mut f.2 => None, + _ = (f.interval).tick() => { + f.yield_n = Some((NonZeroUsize::new(f.concurrency).unwrap(), StreamSource::Polling)); + Some((StreamSource::Polling, f)) + }, + _ = (f.pg_listener).recv() => { + f.yield_n = Some((NonZeroUsize::new(f.concurrency).unwrap(), StreamSource::PgListener)); + Some((StreamSource::PgListener, f)) + }, + _ = &mut f.shutdown_signal => None, } });