diff --git a/.vscode/settings.json b/.vscode/settings.json index 15c632896..1e9c5e111 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -16,5 +16,9 @@ "rust-analyzer.cachePriming.enable": true, "rust-analyzer.linkedProjects": [ "./packages/rust/armonik/Cargo.toml" + ], + "rust-analyzer.cargo.features": [ + "client", + "server" ] } diff --git a/packages/rust/armonik/Cargo.lock b/packages/rust/armonik/Cargo.lock index 5ffcdc1e8..d670bb124 100644 --- a/packages/rust/armonik/Cargo.lock +++ b/packages/rust/armonik/Cargo.lock @@ -50,6 +50,7 @@ dependencies = [ "serial_test", "snafu", "tokio", + "tokio-util", "tonic", "tonic-build", "tracing", diff --git a/packages/rust/armonik/Cargo.toml b/packages/rust/armonik/Cargo.toml index 4754f4f30..263583f5b 100644 --- a/packages/rust/armonik/Cargo.toml +++ b/packages/rust/armonik/Cargo.toml @@ -11,6 +11,11 @@ version = "3.21.0-beta-0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["client"] +client = ["dep:rustls", "dep:hyper-rustls", "dep:hyper"] +server = ["tonic/server", "dep:tokio-util", "dep:tokio"] + [dependencies] tonic = "0.12" prost = "0.13" @@ -19,9 +24,11 @@ futures = "0.3" async-trait = "0.1" snafu = "0.8" tracing = "0.1" -hyper = { version = "1.5", features = ["client", "http1", "http2"] } -hyper-rustls = { version = "0.27", features = ["http1", "http2"] } -rustls = { version = "0.23", features = ["ring"] } +hyper = { version = "1.5", features = ["client", "http1", "http2"], optional = true } +hyper-rustls = { version = "0.27", features = ["http1", "http2"], optional = true } +rustls = { version = "0.23", features = ["ring"], optional = true } +tokio-util = { version = "0.7", optional = true} +tokio = { version = "1.41", default-features = false, optional = true } [dev-dependencies] eyre = "0.6" @@ -39,3 +46,11 @@ tokio = { version = "1.41", features = [ [build-dependencies] tonic-build = "0.12" + +[[example]] +name = "client" +required-features = ["client"] + +[[example]] +name = "server" +required-features = ["server"] diff --git a/packages/rust/armonik/build.rs b/packages/rust/armonik/build.rs index 801f43a43..82e7f5d68 100644 --- a/packages/rust/armonik/build.rs +++ b/packages/rust/armonik/build.rs @@ -1,49 +1,52 @@ fn main() -> Result<(), Box> { - tonic_build::configure() - .build_client(true) - .build_server(true) - .compile_protos( - &[ - "protos/V1/agent_common.proto", - "protos/V1/agent_service.proto", - "protos/V1/applications_common.proto", - "protos/V1/applications_fields.proto", - "protos/V1/applications_filters.proto", - "protos/V1/applications_service.proto", - "protos/V1/auth_common.proto", - "protos/V1/auth_service.proto", - "protos/V1/events_common.proto", - "protos/V1/events_service.proto", - "protos/V1/filters_common.proto", - "protos/V1/objects.proto", - "protos/V1/partitions_common.proto", - "protos/V1/partitions_fields.proto", - "protos/V1/partitions_filters.proto", - "protos/V1/partitions_service.proto", - "protos/V1/result_status.proto", - "protos/V1/results_common.proto", - "protos/V1/results_fields.proto", - "protos/V1/results_filters.proto", - "protos/V1/results_service.proto", - "protos/V1/session_status.proto", - "protos/V1/sessions_common.proto", - "protos/V1/sessions_fields.proto", - "protos/V1/sessions_filters.proto", - "protos/V1/sessions_service.proto", - "protos/V1/sort_direction.proto", - "protos/V1/submitter_common.proto", - "protos/V1/submitter_service.proto", - "protos/V1/task_status.proto", - "protos/V1/tasks_common.proto", - "protos/V1/tasks_fields.proto", - "protos/V1/tasks_filters.proto", - "protos/V1/tasks_service.proto", - "protos/V1/versions_common.proto", - "protos/V1/versions_service.proto", - "protos/V1/worker_common.proto", - "protos/V1/worker_service.proto", - ], - &["protos/V1"], - )?; + let builder = tonic_build::configure(); + + #[cfg(feature = "client")] + let builder = builder.build_client(true); + #[cfg(feature = "server")] + let builder = builder.build_server(true); + builder.compile_protos( + &[ + "protos/V1/agent_common.proto", + "protos/V1/agent_service.proto", + "protos/V1/applications_common.proto", + "protos/V1/applications_fields.proto", + "protos/V1/applications_filters.proto", + "protos/V1/applications_service.proto", + "protos/V1/auth_common.proto", + "protos/V1/auth_service.proto", + "protos/V1/events_common.proto", + "protos/V1/events_service.proto", + "protos/V1/filters_common.proto", + "protos/V1/objects.proto", + "protos/V1/partitions_common.proto", + "protos/V1/partitions_fields.proto", + "protos/V1/partitions_filters.proto", + "protos/V1/partitions_service.proto", + "protos/V1/result_status.proto", + "protos/V1/results_common.proto", + "protos/V1/results_fields.proto", + "protos/V1/results_filters.proto", + "protos/V1/results_service.proto", + "protos/V1/session_status.proto", + "protos/V1/sessions_common.proto", + "protos/V1/sessions_fields.proto", + "protos/V1/sessions_filters.proto", + "protos/V1/sessions_service.proto", + "protos/V1/sort_direction.proto", + "protos/V1/submitter_common.proto", + "protos/V1/submitter_service.proto", + "protos/V1/task_status.proto", + "protos/V1/tasks_common.proto", + "protos/V1/tasks_fields.proto", + "protos/V1/tasks_filters.proto", + "protos/V1/tasks_service.proto", + "protos/V1/versions_common.proto", + "protos/V1/versions_service.proto", + "protos/V1/worker_common.proto", + "protos/V1/worker_service.proto", + ], + &["protos/V1"], + )?; Ok(()) } diff --git a/packages/rust/armonik/examples/client.rs b/packages/rust/armonik/examples/client.rs new file mode 100644 index 000000000..0e4c27d43 --- /dev/null +++ b/packages/rust/armonik/examples/client.rs @@ -0,0 +1,20 @@ +use tracing_subscriber::{prelude::*, EnvFilter}; + +#[tokio::main] +async fn main() -> Result<(), eyre::Report> { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + let client = armonik::Client::new().await?; + + let session = tokio::time::timeout( + tokio::time::Duration::from_secs(1), + client.sessions().create([""], Default::default()), + ) + .await??; + + println!("Created session {session} using partition"); + + Ok(()) +} diff --git a/packages/rust/armonik/examples/server.rs b/packages/rust/armonik/examples/server.rs new file mode 100644 index 000000000..3465b4286 --- /dev/null +++ b/packages/rust/armonik/examples/server.rs @@ -0,0 +1,128 @@ +use std::sync::Arc; + +use tokio_util::sync::CancellationToken; +use tracing_subscriber::{prelude::*, EnvFilter}; + +use armonik::server::SessionsServiceExt; +use armonik::sessions; + +pub struct Server; + +impl armonik::server::SessionsService for Server { + /// Get a sessions list using pagination, filters and sorting. + async fn list( + self: Arc, + _request: sessions::list::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Get a session by its id. + async fn get( + self: Arc, + _request: sessions::get::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Cancel a session by its id. + async fn cancel( + self: Arc, + _request: sessions::cancel::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Create a session + async fn create( + self: Arc, + _request: sessions::create::Request, + cancellation_token: CancellationToken, + ) -> std::result::Result { + tracing::info!("create called"); + if let Some(()) = cancellation_token + .run_until_cancelled(tokio::time::sleep(tokio::time::Duration::from_secs(2))) + .await + { + tracing::info!("create returned"); + Ok(sessions::create::Response { + session_id: String::from("abc"), + }) + } else { + tracing::info!("client cancelled RPC"); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + tracing::info!("future still running"); + Err(tonic::Status::aborted("client cancelled RPC")) + } + } + + /// Pause a session by its id. + async fn pause( + self: Arc, + _request: sessions::pause::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Resume a paused session by its id. + async fn resume( + self: Arc, + _request: sessions::resume::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Close a session by its id. + async fn close( + self: Arc, + _request: sessions::close::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Purge a session by its id. Removes Results data. + async fn purge( + self: Arc, + _request: sessions::purge::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Delete a session by its id. Removes metadata from Results, Sessions and Tasks associated to the session. + async fn delete( + self: Arc, + _request: sessions::delete::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Stops clients and/or workers from submitting new tasks in the given session. + async fn stop_submission( + self: Arc, + _request: sessions::stop_submission::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } +} + +#[tokio::main] +pub async fn main() -> Result<(), eyre::Report> { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + tonic::transport::Server::builder() + .add_service(Server.sessions_server()) + .serve("127.0.0.1:3456".parse()?) + .await?; + Ok(()) +} diff --git a/packages/rust/armonik/src/lib.rs b/packages/rust/armonik/src/lib.rs index 594140db4..ee698ac77 100644 --- a/packages/rust/armonik/src/lib.rs +++ b/packages/rust/armonik/src/lib.rs @@ -1,9 +1,13 @@ //! Rust bindings for the ArmoniK API pub mod api; +#[cfg(feature = "client")] pub mod client; mod objects; +#[cfg(feature = "server")] +pub mod server; +#[cfg(feature = "client")] pub use client::{Client, ClientConfig}; pub use objects::*; diff --git a/packages/rust/armonik/src/server/mod.rs b/packages/rust/armonik/src/server/mod.rs new file mode 100644 index 000000000..73eaf16db --- /dev/null +++ b/packages/rust/armonik/src/server/mod.rs @@ -0,0 +1,47 @@ +mod sessions; + +pub use sessions::{SessionsService, SessionsServiceExt}; + +macro_rules! define_trait_methods { + (trait $name:ident {$($(#[$attr:meta])* fn $service:ident::$method:ident ;)* $(--- $($body:tt)*)?}) => { + pub trait $name { + $( + $(#[$attr])* + fn $method( + self: Arc, + request: $service::$method::Request, + cancellation_token: tokio_util::sync::CancellationToken, + ) -> impl std::future::Future> + Send; + )* + $($($body)*)? + } + }; +} + +macro_rules! impl_trait_methods { + (impl ($name:ty) for Arc<$type:ident> {$(fn $method:ident($request:ty) -> $response:ty {$inner:ident})* $(--- $($body:tt)*)?}) => { + #[async_trait::async_trait] + impl $name for Arc { + $( + async fn $method( + &self, + request: tonic::Request<$request>, + ) -> std::result::Result, tonic::Status> { + let server = self.clone(); + let ct = tokio_util::sync::CancellationToken::new(); + let _cancel_guard = ct.clone().drop_guard(); + let fut = tokio::spawn(async move { server.$inner(request.into_inner().into(), ct).await}); + match fut.await { + Ok(Ok(res)) => Ok(tonic::Response::new(res.into())), + Ok(Err(err)) => Err(err), + Err(err) => Err(tonic::Status::internal(err.to_string())), + } + } + )* + $($($body)*)? + } + }; +} + +use define_trait_methods; +use impl_trait_methods; diff --git a/packages/rust/armonik/src/server/sessions.rs b/packages/rust/armonik/src/server/sessions.rs new file mode 100644 index 000000000..464896142 --- /dev/null +++ b/packages/rust/armonik/src/server/sessions.rs @@ -0,0 +1,64 @@ +use std::sync::Arc; + +use crate::api::v3; +use crate::sessions; + +super::define_trait_methods!{ + trait SessionsService { + /// Get a sessions list using pagination, filters and sorting. + fn sessions::list; + + /// Get a session by its id. + fn sessions::get; + + /// Cancel a session by its id. + fn sessions::cancel; + + /// Create a session + fn sessions::create; + + /// Pause a session by its id. + fn sessions::pause; + + /// Resume a paused session by its id. + fn sessions::resume; + + /// Close a session by its id. + fn sessions::close; + + /// Purge a session by its id. Removes Results data. + fn sessions::purge; + + /// Delete a session by its id. Removes metadata from Results, Sessions and Tasks associated to the session. + fn sessions::delete; + + /// Stops clients and/or workers from submitting new tasks in the given session. + fn sessions::stop_submission; + } +} + +pub trait SessionsServiceExt { + fn sessions_server(self) -> v3::sessions::sessions_server::SessionsServer> where Self: Sized; +} + +impl SessionsServiceExt for T { + fn sessions_server(self) -> v3::sessions::sessions_server::SessionsServer> { + v3::sessions::sessions_server::SessionsServer::new(Arc::new(self)) + } +} + + +super::impl_trait_methods!{ + impl (v3::sessions::sessions_server::Sessions) for Arc { + fn list_sessions(v3::sessions::ListSessionsRequest) -> v3::sessions::ListSessionsResponse { list } + fn get_session(v3::sessions::GetSessionRequest) -> v3::sessions::GetSessionResponse { get } + fn cancel_session(v3::sessions::CancelSessionRequest) -> v3::sessions::CancelSessionResponse { cancel } + fn create_session(v3::sessions::CreateSessionRequest) -> v3::sessions::CreateSessionReply { create } + fn pause_session(v3::sessions::PauseSessionRequest) -> v3::sessions::PauseSessionResponse { pause } + fn resume_session(v3::sessions::ResumeSessionRequest) -> v3::sessions::ResumeSessionResponse { resume } + fn close_session(v3::sessions::CloseSessionRequest) -> v3::sessions::CloseSessionResponse { close } + fn purge_session(v3::sessions::PurgeSessionRequest) -> v3::sessions::PurgeSessionResponse { purge } + fn delete_session(v3::sessions::DeleteSessionRequest) -> v3::sessions::DeleteSessionResponse { delete } + fn stop_submission(v3::sessions::StopSubmissionRequest) -> v3::sessions::StopSubmissionResponse { stop_submission } + } +} diff --git a/packages/rust/armonik/src/utils.rs b/packages/rust/armonik/src/utils.rs index 3ecaf2ea5..6278988cc 100644 --- a/packages/rust/armonik/src/utils.rs +++ b/packages/rust/armonik/src/utils.rs @@ -69,6 +69,7 @@ pub enum ReadEnvError { #[derive(Debug)] pub(crate) struct InsecureCertVerifier; +#[cfg(feature = "client")] impl rustls::client::danger::ServerCertVerifier for InsecureCertVerifier { fn verify_server_cert( &self, @@ -118,11 +119,6 @@ impl rustls::client::danger::ServerCertVerifier for InsecureCertVerifier { } } -struct Foo {} -struct Bar(Vec); - -impl_vec_wrapper!(Bar(Foo)); - /// Implement all traits and functions to define a wrapper around a [`Vec`] /// /// # Examples