Skip to content

Commit

Permalink
fix: timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
atanmarko committed Oct 23, 2024
1 parent bb4d832 commit 0174cf4
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 53 deletions.
30 changes: 0 additions & 30 deletions .cargo/katex-header.html

This file was deleted.

1 change: 1 addition & 0 deletions examples/hello-world-rabbitmq/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
environment:
- RUST_LOG=info
- AMQP_URI=amqp://rabbitmq:5672
- JOB_TIMEOUT=10
worker:
build:
context: ../../
Expand Down
50 changes: 28 additions & 22 deletions examples/hello-world-rabbitmq/leader/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
Expand All @@ -10,53 +11,58 @@ use paladin::{
directive::{indexed_stream::IndexedStream, Directive},
runtime::Runtime,
};
use tracing::info;
use tracing::{error, info, warn};

mod init;

#[derive(Parser, Debug)]
pub struct Cli {
#[command(flatten)]
pub options: Config,
#[arg(long, short)]
/// Optional timeout for job in the seconds
#[arg(long, short, env = "JOB_TIMEOUT")]
pub timeout: Option<u64>,
}

const INPUT: &str = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.";

#[tokio::main]
async fn main() -> Result<()> {
dotenv().ok();
init::tracing();

let args = Cli::parse();
let runtime = std::sync::Arc::new(Runtime::from_config(&args.options, register()).await?);

let input: Vec<char> = INPUT.chars().collect();
let computation = IndexedStream::from(input)
.map(&CharToString)
.fold(&StringConcat);

let runtime_ = runtime.clone();
async fn set_abort_timeout_job(timeout: u64, runtime: Arc<Runtime>) {
tokio::spawn(async move {
let command_channel = runtime_
let command_channel = runtime
.get_command_ipc_sender()
.await
.expect("retrieved ipc sender");
println!("Waiting to abort the execution...");
tokio::time::sleep(Duration::from_secs(10)).await;
println!("Aborting the execution...");
tokio::time::sleep(Duration::from_secs(timeout)).await;
warn!("User timeout expired, aborting the execution...");
if let Err(e) = command_channel
.publish(&CommandIpc::Abort {
routing_key: paladin::runtime::COMMAND_IPC_ABORT_ALL_KEY.into(),
})
.await
{
println!("Unable to send abort signal: {e}");
error!("Unable to send abort signal: {e:?}");
} else {
println!("Abort signal successfully sent");
info!("Abort signal successfully sent");
}
});
}

#[tokio::main]
async fn main() -> Result<()> {
dotenv().ok();
init::tracing();

let args = Cli::parse();
let runtime = std::sync::Arc::new(Runtime::from_config(&args.options, register()).await?);

let input: Vec<char> = INPUT.chars().collect();
let computation = IndexedStream::from(input)
.map(&CharToString)
.fold(&StringConcat);

if let Some(timeout) = args.timeout {
set_abort_timeout_job(timeout, runtime.clone()).await;
}

let result = computation.run(&runtime).await;
runtime
Expand Down
2 changes: 1 addition & 1 deletion examples/hello-world-rabbitmq/ops/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Operation for CharToString {
.load(std::sync::atomic::Ordering::SeqCst)
{
return Err(OperationError::Fatal {
err: anyhow::anyhow!("aborted on command at CharToString iteration {i} for input {input:?}"),
err: anyhow::anyhow!("aborted per request at CharToString iteration {i} for input {input:?}"),
strategy: FatalStrategy::Terminate,
});
}
Expand Down

0 comments on commit 0174cf4

Please sign in to comment.