Skip to content

Commit

Permalink
[WIP] Rust server
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo committed Nov 9, 2024
1 parent 3e97b32 commit 8f9d6e1
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 54 deletions.
4 changes: 4 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@
"rust-analyzer.cachePriming.enable": true,
"rust-analyzer.linkedProjects": [
"./packages/rust/armonik/Cargo.toml"
],
"rust-analyzer.cargo.features": [
"client",
"server"
]
}
1 change: 1 addition & 0 deletions packages/rust/armonik/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 18 additions & 3 deletions packages/rust/armonik/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ version = "3.21.0-beta-0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["client"]
client = ["dep:rustls", "dep:hyper-rustls", "dep:hyper"]
server = ["tonic/server", "dep:tokio-util", "dep:tokio"]

[dependencies]
tonic = "0.12"
prost = "0.13"
Expand All @@ -19,9 +24,11 @@ futures = "0.3"
async-trait = "0.1"
snafu = "0.8"
tracing = "0.1"
hyper = { version = "1.5", features = ["client", "http1", "http2"] }
hyper-rustls = { version = "0.27", features = ["http1", "http2"] }
rustls = { version = "0.23", features = ["ring"] }
hyper = { version = "1.5", features = ["client", "http1", "http2"], optional = true }
hyper-rustls = { version = "0.27", features = ["http1", "http2"], optional = true }
rustls = { version = "0.23", features = ["ring"], optional = true }
tokio-util = { version = "0.7", optional = true}
tokio = { version = "1.41", default-features = false, optional = true }

[dev-dependencies]
eyre = "0.6"
Expand All @@ -39,3 +46,11 @@ tokio = { version = "1.41", features = [

[build-dependencies]
tonic-build = "0.12"

[[example]]
name = "client"
required-features = ["client"]

[[example]]
name = "server"
required-features = ["server"]
95 changes: 49 additions & 46 deletions packages/rust/armonik/build.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,52 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_client(true)
.build_server(true)
.compile_protos(
&[
"protos/V1/agent_common.proto",
"protos/V1/agent_service.proto",
"protos/V1/applications_common.proto",
"protos/V1/applications_fields.proto",
"protos/V1/applications_filters.proto",
"protos/V1/applications_service.proto",
"protos/V1/auth_common.proto",
"protos/V1/auth_service.proto",
"protos/V1/events_common.proto",
"protos/V1/events_service.proto",
"protos/V1/filters_common.proto",
"protos/V1/objects.proto",
"protos/V1/partitions_common.proto",
"protos/V1/partitions_fields.proto",
"protos/V1/partitions_filters.proto",
"protos/V1/partitions_service.proto",
"protos/V1/result_status.proto",
"protos/V1/results_common.proto",
"protos/V1/results_fields.proto",
"protos/V1/results_filters.proto",
"protos/V1/results_service.proto",
"protos/V1/session_status.proto",
"protos/V1/sessions_common.proto",
"protos/V1/sessions_fields.proto",
"protos/V1/sessions_filters.proto",
"protos/V1/sessions_service.proto",
"protos/V1/sort_direction.proto",
"protos/V1/submitter_common.proto",
"protos/V1/submitter_service.proto",
"protos/V1/task_status.proto",
"protos/V1/tasks_common.proto",
"protos/V1/tasks_fields.proto",
"protos/V1/tasks_filters.proto",
"protos/V1/tasks_service.proto",
"protos/V1/versions_common.proto",
"protos/V1/versions_service.proto",
"protos/V1/worker_common.proto",
"protos/V1/worker_service.proto",
],
&["protos/V1"],
)?;
let builder = tonic_build::configure();

#[cfg(feature = "client")]
let builder = builder.build_client(true);
#[cfg(feature = "server")]
let builder = builder.build_server(true);
builder.compile_protos(
&[
"protos/V1/agent_common.proto",
"protos/V1/agent_service.proto",
"protos/V1/applications_common.proto",
"protos/V1/applications_fields.proto",
"protos/V1/applications_filters.proto",
"protos/V1/applications_service.proto",
"protos/V1/auth_common.proto",
"protos/V1/auth_service.proto",
"protos/V1/events_common.proto",
"protos/V1/events_service.proto",
"protos/V1/filters_common.proto",
"protos/V1/objects.proto",
"protos/V1/partitions_common.proto",
"protos/V1/partitions_fields.proto",
"protos/V1/partitions_filters.proto",
"protos/V1/partitions_service.proto",
"protos/V1/result_status.proto",
"protos/V1/results_common.proto",
"protos/V1/results_fields.proto",
"protos/V1/results_filters.proto",
"protos/V1/results_service.proto",
"protos/V1/session_status.proto",
"protos/V1/sessions_common.proto",
"protos/V1/sessions_fields.proto",
"protos/V1/sessions_filters.proto",
"protos/V1/sessions_service.proto",
"protos/V1/sort_direction.proto",
"protos/V1/submitter_common.proto",
"protos/V1/submitter_service.proto",
"protos/V1/task_status.proto",
"protos/V1/tasks_common.proto",
"protos/V1/tasks_fields.proto",
"protos/V1/tasks_filters.proto",
"protos/V1/tasks_service.proto",
"protos/V1/versions_common.proto",
"protos/V1/versions_service.proto",
"protos/V1/worker_common.proto",
"protos/V1/worker_service.proto",
],
&["protos/V1"],
)?;
Ok(())
}
20 changes: 20 additions & 0 deletions packages/rust/armonik/examples/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use tracing_subscriber::{prelude::*, EnvFilter};

#[tokio::main]
async fn main() -> Result<(), eyre::Report> {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(EnvFilter::from_default_env())
.init();
let client = armonik::Client::new().await?;

let session = tokio::time::timeout(
tokio::time::Duration::from_secs(1),
client.sessions().create([""], Default::default()),
)
.await??;

println!("Created session {session} using partition");

Ok(())
}
128 changes: 128 additions & 0 deletions packages/rust/armonik/examples/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use std::sync::Arc;

use tokio_util::sync::CancellationToken;
use tracing_subscriber::{prelude::*, EnvFilter};

use armonik::server::SessionsServiceExt;
use armonik::sessions;

pub struct Server;

impl armonik::server::SessionsService for Server {
/// Get a sessions list using pagination, filters and sorting.
async fn list(
self: Arc<Self>,
_request: sessions::list::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::list::Response, tonic::Status> {
todo!()
}

/// Get a session by its id.
async fn get(
self: Arc<Self>,
_request: sessions::get::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::get::Response, tonic::Status> {
todo!()
}

/// Cancel a session by its id.
async fn cancel(
self: Arc<Self>,
_request: sessions::cancel::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::cancel::Response, tonic::Status> {
todo!()
}

/// Create a session
async fn create(
self: Arc<Self>,
_request: sessions::create::Request,
cancellation_token: CancellationToken,
) -> std::result::Result<sessions::create::Response, tonic::Status> {
tracing::info!("create called");
if let Some(()) = cancellation_token
.run_until_cancelled(tokio::time::sleep(tokio::time::Duration::from_secs(2)))
.await
{
tracing::info!("create returned");
Ok(sessions::create::Response {
session_id: String::from("abc"),
})
} else {
tracing::info!("client cancelled RPC");
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
tracing::info!("future still running");
Err(tonic::Status::aborted("client cancelled RPC"))
}
}

/// Pause a session by its id.
async fn pause(
self: Arc<Self>,
_request: sessions::pause::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::pause::Response, tonic::Status> {
todo!()
}

/// Resume a paused session by its id.
async fn resume(
self: Arc<Self>,
_request: sessions::resume::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::resume::Response, tonic::Status> {
todo!()
}

/// Close a session by its id.
async fn close(
self: Arc<Self>,
_request: sessions::close::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::close::Response, tonic::Status> {
todo!()
}

/// Purge a session by its id. Removes Results data.
async fn purge(
self: Arc<Self>,
_request: sessions::purge::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::purge::Response, tonic::Status> {
todo!()
}

/// Delete a session by its id. Removes metadata from Results, Sessions and Tasks associated to the session.
async fn delete(
self: Arc<Self>,
_request: sessions::delete::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::delete::Response, tonic::Status> {
todo!()
}

/// Stops clients and/or workers from submitting new tasks in the given session.
async fn stop_submission(
self: Arc<Self>,
_request: sessions::stop_submission::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::stop_submission::Response, tonic::Status> {
todo!()
}
}

#[tokio::main]
pub async fn main() -> Result<(), eyre::Report> {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(EnvFilter::from_default_env())
.init();
tonic::transport::Server::builder()
.add_service(Server.sessions_server())
.serve("127.0.0.1:3456".parse()?)
.await?;
Ok(())
}
4 changes: 4 additions & 0 deletions packages/rust/armonik/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
//! Rust bindings for the ArmoniK API
pub mod api;
#[cfg(feature = "client")]
pub mod client;
mod objects;
#[cfg(feature = "server")]
pub mod server;

#[cfg(feature = "client")]
pub use client::{Client, ClientConfig};
pub use objects::*;

Expand Down
47 changes: 47 additions & 0 deletions packages/rust/armonik/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
mod sessions;

pub use sessions::{SessionsService, SessionsServiceExt};

macro_rules! define_trait_methods {
(trait $name:ident {$($(#[$attr:meta])* fn $service:ident::$method:ident ;)* $(--- $($body:tt)*)?}) => {
pub trait $name {
$(
$(#[$attr])*
fn $method(
self: Arc<Self>,
request: $service::$method::Request,
cancellation_token: tokio_util::sync::CancellationToken,
) -> impl std::future::Future<Output = std::result::Result<$service::$method::Response, tonic::Status>> + Send;
)*
$($($body)*)?
}
};
}

macro_rules! impl_trait_methods {
(impl ($name:ty) for Arc<$type:ident> {$(fn $method:ident($request:ty) -> $response:ty {$inner:ident})* $(--- $($body:tt)*)?}) => {
#[async_trait::async_trait]
impl<T: $type + Send + Sync + 'static> $name for Arc<T> {
$(
async fn $method(
&self,
request: tonic::Request<$request>,
) -> std::result::Result<tonic::Response<$response>, tonic::Status> {
let server = self.clone();
let ct = tokio_util::sync::CancellationToken::new();
let _cancel_guard = ct.clone().drop_guard();
let fut = tokio::spawn(async move { server.$inner(request.into_inner().into(), ct).await});
match fut.await {
Ok(Ok(res)) => Ok(tonic::Response::new(res.into())),
Ok(Err(err)) => Err(err),
Err(err) => Err(tonic::Status::internal(err.to_string())),
}
}
)*
$($($body)*)?
}
};
}

use define_trait_methods;
use impl_trait_methods;
Loading

0 comments on commit 8f9d6e1

Please sign in to comment.