diff --git a/rust/Cargo.lock b/rust/Cargo.lock index e3d90e2f0..3eddaaa6c 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1708,6 +1708,7 @@ dependencies = [ "backoff", "numaflow-core", "numaflow-models", + "rustls 0.23.19", "servesink", "serving", "tokio", diff --git a/rust/numaflow-core/Cargo.toml b/rust/numaflow-core/Cargo.toml index 4a98303a1..33c7e1cbe 100644 --- a/rust/numaflow-core/Cargo.toml +++ b/rust/numaflow-core/Cargo.toml @@ -6,7 +6,8 @@ edition = "2021" [features] nats-tests = [] pulsar-tests = [] -all-tests = ["nats-tests", "pulsar-tests"] +redis-tests = [] +all-tests = ["nats-tests", "pulsar-tests", "redis-tests"] [lints] workspace = true diff --git a/rust/numaflow-core/src/source/serving.rs b/rust/numaflow-core/src/source/serving.rs index b9fb6c72e..431cfbba3 100644 --- a/rust/numaflow-core/src/source/serving.rs +++ b/rust/numaflow-core/src/source/serving.rs @@ -139,6 +139,7 @@ mod tests { } } + #[cfg(feature = "redis-tests")] #[tokio::test] async fn test_serving_source_reader_acker() -> Result<()> { let settings = Settings { @@ -146,6 +147,10 @@ mod tests { ..Default::default() }; let settings = Arc::new(settings); + // Setup the CryptoProvider (controls core cryptography used by rustls) for the process + // ServingSource starts an Axum HTTPS server in the background. Rustls is used to generate + // self-signed certs when starting the server. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); let mut serving_source = ServingSource::new( Arc::clone(&settings), 10, diff --git a/rust/numaflow/Cargo.toml b/rust/numaflow/Cargo.toml index 6d5fc0dd6..58de96a9c 100644 --- a/rust/numaflow/Cargo.toml +++ b/rust/numaflow/Cargo.toml @@ -14,4 +14,5 @@ numaflow-models.workspace = true backoff.workspace = true tokio.workspace = true tracing.workspace = true +rustls.workspace = true tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } \ No newline at end of file diff --git a/rust/numaflow/src/main.rs b/rust/numaflow/src/main.rs index 9a5ab6fe8..e0836ce21 100644 --- a/rust/numaflow/src/main.rs +++ b/rust/numaflow/src/main.rs @@ -19,6 +19,12 @@ async fn main() -> Result<(), Box> { ) .with(tracing_subscriber::fmt::layer().with_ansi(false)) .init(); + + // Setup the CryptoProvider (controls core cryptography used by rustls) for the process + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .expect("Installing default CryptoProvider"); + if let Err(e) = run().await { error!("{e:?}"); return Err(e); diff --git a/rust/serving/src/app/jetstream_proxy.rs b/rust/serving/src/app/jetstream_proxy.rs index 6f61a0530..eb083d57e 100644 --- a/rust/serving/src/app/jetstream_proxy.rs +++ b/rust/serving/src/app/jetstream_proxy.rs @@ -32,7 +32,6 @@ use crate::{app::callback::state, Message, MessageWrapper}; // "from_vertex": "a" // } -const CALLBACK_URL_KEY: &str = "X-Numaflow-Callback-Url"; const NUMAFLOW_RESP_ARRAY_LEN: &str = "Numaflow-Array-Len"; const NUMAFLOW_RESP_ARRAY_IDX_LEN: &str = "Numaflow-Array-Index-Len"; @@ -40,7 +39,6 @@ struct ProxyState { message: mpsc::Sender, tid_header: String, callback: state::State, - callback_url: String, } pub(crate) async fn jetstream_proxy( @@ -50,10 +48,6 @@ pub(crate) async fn jetstream_proxy( message: state.message.clone(), tid_header: state.settings.tid_header.clone(), callback: state.callback_state.clone(), - callback_url: format!( - "https://{}:{}/v1/process/callback", - state.settings.host_ip, state.settings.app_listen_port - ), }); let router = Router::new() diff --git a/rust/serving/src/lib.rs b/rust/serving/src/lib.rs index 001065ddf..bdc3aeab9 100644 --- a/rust/serving/src/lib.rs +++ b/rust/serving/src/lib.rs @@ -39,9 +39,6 @@ pub(crate) async fn serve( where T: Clone + Send + Sync + Store + 'static, { - // Setup the CryptoProvider (controls core cryptography used by rustls) for the process - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - let (cert, key) = generate_certs()?; let tls_config = RustlsConfig::from_pem(cert.pem().into(), key.serialize_pem().into()) diff --git a/rust/serving/src/pipeline.rs b/rust/serving/src/pipeline.rs index cb491d7d8..cc0c2298b 100644 --- a/rust/serving/src/pipeline.rs +++ b/rust/serving/src/pipeline.rs @@ -65,7 +65,7 @@ pub(crate) struct Edge { /// DCG (directed compute graph) of the pipeline with minimal information build using vertices and edges /// from the pipeline spec #[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] -pub(crate) struct PipelineDCG { +pub struct PipelineDCG { pub(crate) vertices: Vec, pub(crate) edges: Vec, }