Skip to content

Commit

Permalink
feat: Job stream now yield as much job as the concurrency option defines
Browse files Browse the repository at this point in the history
  • Loading branch information
leo91000 committed May 29, 2024
1 parent 72b924a commit b735462
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 8 deletions.
23 changes: 23 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub enum WorkerBuildError {
}

impl WorkerOptions {
/// Build a new worker with the given options.
/// It migrate the database and fetch the tasks details.
pub async fn init(self) -> Result<Worker, WorkerBuildError> {
let pg_pool = match self.pg_pool {
Some(pg_pool) => pg_pool,
Expand Down Expand Up @@ -96,36 +98,52 @@ impl WorkerOptions {
Ok(worker)
}

/// Set the postgresql schema to use for the worker.
pub fn schema(mut self, value: &str) -> Self {
self.schema = Some(value.into());
self
}

/// Set the number of concurrent jobs that can be run at the same time.
/// Default is the number of logical CPUs in the system.
///
/// # Panics
///
/// Panics if the value is 0.
pub fn concurrency(mut self, value: usize) -> Self {
if value == 0 {
panic!("Concurrency must be greater than 0");
}

self.concurrency = Some(value);
self
}

/// Set the interval at which the worker should poll the database for new jobs.
pub fn poll_interval(mut self, value: Duration) -> Self {
self.poll_interval = Some(value);
self
}

/// Set the postgresql pool to use for the worker.
pub fn pg_pool(mut self, value: PgPool) -> Self {
self.pg_pool = Some(value);
self
}

/// Set the postgresql database url to use for the worker.
pub fn database_url(mut self, value: &str) -> Self {
self.database_url = Some(value.into());
self
}

/// Set the maximum number of postgresql connections to use for the worker.
pub fn max_pg_conn(mut self, value: u32) -> Self {
self.max_pg_conn = Some(value);
self
}

/// Define a job to be run by the worker.
pub fn define_job<T: TaskHandler>(mut self) -> Self {
let identifier = T::IDENTIFIER;

Expand All @@ -139,11 +157,13 @@ impl WorkerOptions {
self
}

/// Adds a forbidden flag to the worker.
pub fn add_forbidden_flag(mut self, flag: &str) -> Self {
self.forbidden_flags.push(flag.into());
self
}

/// Adds a crontab to the worker.
pub fn with_crontab(mut self, input: &str) -> Result<Self, CrontabParseError> {
let mut crontabs = parse_crontab(input)?;
match self.crontabs.as_mut() {
Expand All @@ -155,11 +175,14 @@ impl WorkerOptions {
Ok(self)
}

/// Set whether the worker should use local time or postgresql time.
pub fn use_local_time(mut self, value: bool) -> Self {
self.use_local_time = value;
self
}

/// Add an extension to the worker.
/// Usefull for providing custom app state.
pub fn add_extension<T: Clone + Send + Sync + Debug + 'static>(mut self, value: T) -> Self {
self.extensions.insert(value);
self
Expand Down
5 changes: 3 additions & 2 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use graphile_worker_extensions::ReadOnlyExtensions;
use graphile_worker_job::Job;
use graphile_worker_shutdown_signal::ShutdownSignal;
use thiserror::Error;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, trace, warn};

use crate::builder::WorkerOptions;
use crate::sql::complete_job::complete_job;
Expand Down Expand Up @@ -126,6 +126,7 @@ impl Worker {
self.pg_pool.clone(),
self.poll_interval,
self.shutdown_signal.clone(),
self.concurrency,
)
.await?;

Expand Down Expand Up @@ -200,7 +201,7 @@ async fn process_one_job(
}
None => {
// TODO: Retry one time because maybe synchronization issue
debug!(source = ?source, "No job found");
trace!(source = ?source, "No job found");
Ok(None)
}
}
Expand Down
55 changes: 49 additions & 6 deletions src/streams.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{num::NonZeroUsize, time::Duration};

use futures::{stream, Stream};
use graphile_worker_shutdown_signal::ShutdownSignal;
Expand All @@ -11,30 +11,73 @@ use crate::{
Job,
};

#[derive(Debug)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum StreamSource {
Polling,
PgListener,
RunOnce,
}

struct JobSignalStreamData {
interval: tokio::time::Interval,
pg_listener: PgListener,
shutdown_signal: ShutdownSignal,
concurrency: usize,
yield_n: Option<(NonZeroUsize, StreamSource)>,
}

impl JobSignalStreamData {
fn new(
interval: tokio::time::Interval,
pg_listener: PgListener,
shutdown_signal: ShutdownSignal,
concurrency: usize,
) -> Self {
JobSignalStreamData {
interval,
pg_listener,
shutdown_signal,
concurrency,
yield_n: None,
}
}
}

/// Returns a stream that yield on postgres `NOTIFY 'jobs:insert'` and on interval specified by the
/// poll_interval argument
pub async fn job_signal_stream(
pg_pool: PgPool,
poll_interval: Duration,
shutdown_signal: ShutdownSignal,
concurrency: usize,
) -> Result<impl Stream<Item = StreamSource>> {
let interval = tokio::time::interval(poll_interval);

let mut pg_listener = PgListener::connect_with(&pg_pool).await?;
pg_listener.listen("jobs:insert").await?;
let stream_data = JobSignalStreamData::new(interval, pg_listener, shutdown_signal, concurrency);
let stream = stream::unfold(stream_data, |mut f| async {
if let Some((n, source)) = f.yield_n.take() {
let source = source.clone();

Check warning on line 61 in src/streams.rs

View workflow job for this annotation

GitHub Actions / clippy

using `clone` on type `StreamSource` which implements the `Copy` trait

warning: using `clone` on type `StreamSource` which implements the `Copy` trait --> src/streams.rs:61:26 | 61 | let source = source.clone(); | ^^^^^^^^^^^^^^ help: try removing the `clone` call: `source` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#clone_on_copy = note: `#[warn(clippy::clone_on_copy)]` on by default
if n.get() > 1 {
let remaining_yields = n.get() - 1;
f.yield_n = Some((NonZeroUsize::new(remaining_yields).unwrap(), source));
} else {
f.yield_n = None;
}
return Some((source, f));
}

let stream = stream::unfold((interval, pg_listener, shutdown_signal), |mut f| async {
tokio::select! {
_ = (f.0).tick() => Some((StreamSource::Polling, f)),
_ = (f.1).recv() => Some((StreamSource::PgListener, f)),
_ = &mut f.2 => None,
_ = (f.interval).tick() => {
f.yield_n = Some((NonZeroUsize::new(f.concurrency).unwrap(), StreamSource::Polling));
Some((StreamSource::Polling, f))
},
_ = (f.pg_listener).recv() => {
f.yield_n = Some((NonZeroUsize::new(f.concurrency).unwrap(), StreamSource::PgListener));
Some((StreamSource::PgListener, f))
},
_ = &mut f.shutdown_signal => None,
}
});

Expand Down

0 comments on commit b735462

Please sign in to comment.