diff --git a/chimp_chomp/src/jobs.rs b/chimp_chomp/src/jobs.rs index a82a6380..ba500d35 100644 --- a/chimp_chomp/src/jobs.rs +++ b/chimp_chomp/src/jobs.rs @@ -2,6 +2,7 @@ use crate::{ image_loading::{load_image, ChimpImage, WellImage}, postprocessing::Contents, }; +use anyhow::anyhow; use chimp_protocol::{Circle, Job, Response}; use futures::StreamExt; use lapin::{ @@ -15,9 +16,22 @@ use uuid::Uuid; /// Creates a RabbitMQ [`Connection`] with [`Default`] [`lapin::ConnectionProperties`]. /// -/// Returns a [`lapin::Error`] if a connection could not be established. -pub async fn setup_rabbitmq_client(address: Url) -> Result { - lapin::Connection::connect(address.as_str(), lapin::ConnectionProperties::default()).await +/// Returns a [`anyhow::Error`] if the URL could not be built or a connection could not be established. +pub async fn setup_rabbitmq_client( + mut address: Url, + username: Option<&str>, + password: Option<&str>, +) -> Result { + address + .set_username(username.unwrap_or_default()) + .map_err(|_| anyhow!("Could not set username"))?; + address + .set_password(password) + .map_err(|_| anyhow!("Could not set password"))?; + Ok( + lapin::Connection::connect(address.as_str(), lapin::ConnectionProperties::default()) + .await?, + ) } /// Joins a RabbitMQ channel, creating a [`Consumer`] with [`Default`] [`BasicConsumeOptions`] and [`FieldTable`]. diff --git a/chimp_chomp/src/main.rs b/chimp_chomp/src/main.rs index 3083d86e..485a6757 100644 --- a/chimp_chomp/src/main.rs +++ b/chimp_chomp/src/main.rs @@ -37,13 +37,19 @@ use url::Url; struct Cli { /// The URL of the RabbitMQ server. rabbitmq_url: Url, + /// The username used to access the RabbitMQ server. + #[arg(long, env)] + rabbitmq_username: Option, + /// The password used to authenticate with the RabbitMQ server. + #[arg(long, env)] + rabbitmq_password: Option, /// The RabbitMQ channel on which jobs are assigned. rabbitmq_channel: String, /// The duration (in milliseconds) to wait after completing all jobs before shutting down. - #[arg(long)] + #[arg(long, env)] timeout: Option, /// The number of worker threads to use - #[arg(long)] + #[arg(long, env)] threads: Option, } @@ -70,7 +76,13 @@ async fn run(args: Cli) { let input_height = session.inputs[0].dimensions[2].unwrap(); let batch_size = session.inputs[0].dimensions[0].unwrap().try_into().unwrap(); - let rabbitmq_client = setup_rabbitmq_client(args.rabbitmq_url).await.unwrap(); + let rabbitmq_client = setup_rabbitmq_client( + args.rabbitmq_url, + args.rabbitmq_username.as_deref(), + args.rabbitmq_password.as_deref(), + ) + .await + .unwrap(); let job_channel = rabbitmq_client.create_channel().await.unwrap(); let response_channel = rabbitmq_client.create_channel().await.unwrap(); let job_consumer = setup_job_consumer(job_channel, args.rabbitmq_channel)