Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: shared connection for apalis backends #532

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ members = [
"examples/rest-api",
"examples/stepped-tasks",
"examples/retries",
"examples/shared-postgres-conn",
]


Expand Down
12 changes: 6 additions & 6 deletions examples/postgres/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use apalis::layers::retry::RetryPolicy;

use apalis::prelude::*;
use apalis_sql::{
postgres::{PgListen, PgPool, PostgresStorage},
postgres::{PgPool, PostgresStorage},
Config,
};
use email_service::{send_email, Email};
Expand Down Expand Up @@ -38,13 +38,13 @@ async fn main() -> Result<()> {
let mut pg = PostgresStorage::new_with_config(pool.clone(), Config::new("apalis::Email"));
produce_jobs(&mut pg).await?;

let mut listener = PgListen::new(pool).await?;
// let mut listener = PgListen::new(pool).await?;

listener.subscribe_with(&mut pg);
// listener.subscribe_with(&mut pg);

tokio::spawn(async move {
listener.listen().await.unwrap();
});
// tokio::spawn(async move {
// listener.listen().await.unwrap();
// });

Monitor::new()
.register({
Expand Down
23 changes: 23 additions & 0 deletions examples/shared-postgres-conn/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "shared-postgres-conn"
version = "0.1.0"
edition.workspace = true
repository.workspace = true

[dependencies]
anyhow = "1"
apalis = { path = "../../", features = ["retry"] }
apalis-sql = { path = "../../packages/apalis-sql", features = [
"postgres",
"tokio-comp",
] }
serde = "1"
tracing-subscriber = "0.3.11"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
tokio = { version = "1", features = ["full"] }
email-service = { path = "../email-service" }
tower = { version = "*", features = ["buffer"] }

[dependencies.tracing]
default-features = false
version = "0.1"
84 changes: 84 additions & 0 deletions examples/shared-postgres-conn/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::time::Duration;

use anyhow::Result;
use apalis::layers::retry::RetryPolicy;

use apalis::prelude::*;
use apalis_sql::{
postgres::{PgPool, PostgresStorage},
Config,
};
use email_service::{send_email, Email};
use serde::{Deserialize, Serialize};
use tracing::{debug, info};

async fn produce_jobs(storage: &mut PostgresStorage<Email>) -> Result<()> {
for index in 0..10 {
storage
.push(Email {
to: format!("test{}@example.com", index),
text: "Test background job from apalis".to_string(),
subject: "Background email job".to_string(),
})
.await?;
}
// The sql way
tracing::info!("You can also add jobs via sql query, run this: \n Select apalis.push_job('apalis::Email', json_build_object('subject', 'Test apalis', 'to', '[email protected]', 'text', 'Lorem Ipsum'));");
Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug,sqlx::query=error");
tracing_subscriber::fmt::init();
let database_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://postgres:postgres@localhost/postgres".to_owned());

let pool = PgPool::connect(&database_url).await?;
PostgresStorage::setup(&pool)
.await
.expect("unable to run migrations for postgres");

let backend = PostgresStorage::new(pool);

let mut pg = Shared::new(backend);

let config = Config::new("apalis::Email")
.set_poll_interval(Duration::from_secs(100))
.set_buffer_size(1000);

let mut email_pg = pg.make_shared_with_config(config).unwrap();
produce_jobs(&mut email_pg).await?;

#[derive(Debug, Serialize, Deserialize, Clone)]
struct Sms {
to: String,
message: String,
}

let sms_pg = pg.make_shared().unwrap();

Monitor::new()
.register({
WorkerBuilder::new("tasty-orange")
.enable_tracing()
.backend(email_pg)
.build_fn(send_email)
})
.register({
WorkerBuilder::new("tasty-sms")
.enable_tracing()
.backend(sms_pg)
.build_fn(|sms: Sms| async move {
dbg!(sms);
})
})
.on_event(|e| debug!("{e}"))
.run_with_signal(async {
tokio::signal::ctrl_c().await?;
info!("Shutting down the system");
Ok(())
})
.await?;
Ok(())
}
164 changes: 161 additions & 3 deletions packages/apalis-core/src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
use std::{any::type_name, future::Future};
use std::{
any::type_name,
collections::HashMap,
future::Future,
marker::PhantomData,
pin::Pin,

Check warning on line 6 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused imports: `Mutex` and `pin::Pin`

Check warning on line 6 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused imports: `Mutex` and `pin::Pin`

Check warning on line 6 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

unused imports: `Mutex` and `pin::Pin`

Check warning on line 6 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

unused imports: `Mutex` and `pin::Pin`
sync::{Arc, Mutex, RwLock},
};

use futures::Stream;
use futures::{future::BoxFuture, FutureExt, Stream};

Check warning on line 10 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `FutureExt`

Check warning on line 10 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `future::BoxFuture`

Check warning on line 10 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `FutureExt`

Check warning on line 10 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `future::BoxFuture`

Check warning on line 10 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `FutureExt`

Check warning on line 10 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `future::BoxFuture`

Check warning on line 10 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `FutureExt`

Check warning on line 10 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `future::BoxFuture`
use serde::{Deserialize, Serialize};

use crate::{
codec::Codec,
error::Error,
notify::Notify,
poller::Poller,
request::State,
request::{BoxStream, Request, RequestStream, State},

Check warning on line 18 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused imports: `BoxStream` and `RequestStream`

Check warning on line 18 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused imports: `BoxStream` and `RequestStream`

Check warning on line 18 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

unused imports: `BoxStream` and `RequestStream`

Check warning on line 18 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

unused imports: `BoxStream` and `RequestStream`
response::Response,
worker::{Context, Worker},
};

Expand All @@ -29,6 +39,154 @@
fn poll(self, worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer>;
}

pub struct Shared<Backend, Context, Codec: crate::codec::Codec, Config> {

Check warning on line 42 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Test Suite

missing documentation for a struct

Check warning on line 42 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Test Suite

type does not implement `Debug`; consider adding `#[derive(Debug)]` or a manual implementation

Check warning on line 42 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct

Check warning on line 42 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

type does not implement `Debug`; consider adding `#[derive(Debug)]` or a manual implementation

Check warning on line 42 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct

Check warning on line 42 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

type does not implement `Debug`; consider adding `#[derive(Debug)]` or a manual implementation
pub instances: Arc<

Check warning on line 43 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Test Suite

missing documentation for a struct field

Check warning on line 43 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Clippy

very complex type used. Consider factoring parts into `type` definitions

Check warning on line 43 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field

Check warning on line 43 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field
RwLock<HashMap<String, Notify<Result<Option<Request<Codec::Compact, Context>>, Error>>>>,
>,
pub ack: Notify<(Context, Response<Codec::Compact>)>,

Check warning on line 46 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Test Suite

missing documentation for a struct field

Check warning on line 46 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field

Check warning on line 46 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field
pub backend: Arc<Backend>,

Check warning on line 47 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Test Suite

missing documentation for a struct field

Check warning on line 47 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field

Check warning on line 47 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field
pub config: PhantomData<Config>,

Check warning on line 48 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Test Suite

missing documentation for a struct field

Check warning on line 48 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field

Check warning on line 48 in packages/apalis-core/src/backend.rs

View workflow job for this annotation

GitHub Actions / Check

missing documentation for a struct field
}

impl<Backend, Context, Codec, Config> Shared<Backend, Context, Codec, Config>
where
Codec: crate::codec::Codec,
{
pub fn new(backend: Backend) -> Self {
Self {
instances: Arc::default(),
ack: Notify::new(),
backend: Arc::new(backend),
config: PhantomData,
}
}

pub fn make_shared<B>(&mut self) -> Result<B, Backend::MakeError>
where
Backend: Sharable<B, Codec, Context = Context, Config = Config>,
Backend::Config: Default,
{
Backend::share(self)
}

pub fn make_shared_with_config<B>(&mut self, config: Config) -> Result<B, Backend::MakeError>
where
Backend: Sharable<B, Codec, Context = Context, Config = Config>,
{
Backend::share_with_config(self, config)
}
}

#[derive(Debug)]
#[non_exhaustive]
pub enum BackendConnection<Compact, Context> {
StandAlone {
subscription: Notify<()>,
},
Shared {
receiver: Notify<Result<Option<Request<Compact, Context>>, Error>>,
},
}

impl<Compact, Context> Clone for BackendConnection<Compact, Context> {
fn clone(&self) -> Self {
match self {
BackendConnection::Shared { receiver } => BackendConnection::Shared {
receiver: receiver.clone(),
},
BackendConnection::StandAlone { subscription } => BackendConnection::StandAlone {
subscription: subscription.clone(),
},
}
}
}

/// Trait to make multiple backends using a shared connection
/// This can improve performance for example sql engines since it leads to only one query for multiple job types
pub trait Sharable<Backend, Codec: crate::codec::Codec>: Sized {
/// The context of the Backend being shared
type Context;
/// The Config for the backend
type Config;
/// The error returned if the backend cant be shared
type MakeError;

/// Returns the backend to be shared
fn share(
parent: &mut Shared<Self, Self::Context, Codec, Self::Config>,
) -> Result<Backend, Self::MakeError>;

/// Returns the backend with config
fn share_with_config(
parent: &mut Shared<Self, Self::Context, Codec, Self::Config>,
config: Self::Config,
) -> Result<Backend, Self::MakeError>;
}

pub trait SharedPolling {
type Output: Future<Output = ()>;

fn polling(&self) -> Self::Output;
}

// /// A generic connection wrapper that allows backend to be shareable
// // #[derive(Debug)]
// pub enum BackendConnection<Compact, Context> {
// Shared {
// /// The ack notification
// ack: Notify<(Context, Response<Compact>)>,
// /// The shared receiver
// receiver: Notify<Request<Compact, Context>>,
// /// The shared poller that drives the connection.
// /// When all workers are dead, polling stops
// poller: Shared<BoxFuture<'static, ()>>,
// },
// Single {
// /// The ack notification
// ack: Notify<(Context, Response<Compact>)>,
// /// This allows you to provide a stream that triggers the next poll
// ticker: Arc<futures::lock::Mutex<RequestStream<()>>>,
// },
// }

// impl<Compact> std::fmt::Debug for BackendConnection<Compact> {
// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// match self {
// Connection::Shared { .. } => f
// .debug_struct("Shared")
// .field("ack", &"Notify<(SqlContext, Response<Compact>)>")
// .field("receiver", &"Notify<Request<Args, SqlContext>>")
// .field("poller", &"<future>")
// .finish(),
// Connection::Single { .. } => f
// .debug_struct("Single")
// .field("ack", &"Notify<(SqlContext, Response<Compact>)>")
// .field("ticker", &"RequestStream<()>")
// .finish(),
// }
// }
// }

// impl<Compact, Context> Clone for BackendConnection<Compact, Context> {
// fn clone(&self) -> Self {
// match self {
// BackendConnection::Shared {
// ack,
// receiver,
// poller,
// } => BackendConnection::Shared {
// ack: ack.clone(),
// receiver: receiver.clone(),
// poller: poller.clone(),
// },
// BackendConnection::Single { ack, ticker } => BackendConnection::Single {
// ack: ack.clone(),
// ticker: ticker.clone(),
// },
// }
// }
// }

/// Represents functionality that allows reading of jobs and stats from a backend
/// Some backends esp MessageQueues may not currently implement this
pub trait BackendExpose<T>
Expand Down
24 changes: 24 additions & 0 deletions packages/apalis-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,30 @@ pub mod test_utils {
let res = t.len().await.unwrap();
assert_eq!(res, 0, "After vacuuming, there should be nothing");
}
#[tokio::test]
async fn integration_test_shared_push_and_consume() {
let backend = $setup().await;
let mut sharable = apalis_core::backend::Shared::new(backend);
let service = apalis_test_service_fn(|request: Request<u32, _>| async move {
Ok::<_, io::Error>(request.args)
});
let backend = sharable.make_shared().unwrap();
let (mut t, poller) = TestWrapper::new_with_service(backend, service);
tokio::spawn(poller);
let res = t.len().await.unwrap();
assert_eq!(res, 0, "There should be no jobs");
t.push(1).await.unwrap();
let res = t.len().await.unwrap();
assert_eq!(res, 1, "There should be 1 job");
let res = t.execute_next().await.unwrap();
assert_eq!(res.1, Ok("1".to_owned()));

apalis_core::sleep(Duration::from_secs(1)).await;
let res = t.len().await.unwrap();
assert_eq!(res, 0, "There should be no jobs");

t.vacuum().await.unwrap();
}
};
}
}
21 changes: 20 additions & 1 deletion packages/apalis-core/src/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::{

use futures::{
channel::mpsc::{channel, Receiver, Sender, TrySendError},
Stream, StreamExt,
future::BoxFuture,
FutureExt, Stream, StreamExt,
};

/// The `Notify` struct encapsulates asynchronous, multi-producer, single-consumer (MPSC) channel functionality.
Expand Down Expand Up @@ -55,6 +56,24 @@ impl<T> Notify<T> {
.await
.expect("sender is dropped");
}

/// Allows one to pipe a stream generating a heartbeat and a [`Notify`] instance
pub fn pipe_stream<S: Stream<Item = T> + Unpin + Send + 'static>(
mut stream: S,
) -> (BoxFuture<'static, ()>, Notify<T>)
where
T: Send + Sync + 'static,
{
let notify = Notify::new();
let n = notify.clone();
let poller = async move {
while let Some(value) = stream.next().await {
n.notify(value).unwrap();
}
}
.boxed();
(poller, notify)
}
}

impl<T> Default for Notify<T> {
Expand Down
Loading
Loading