Skip to content

Commit

Permalink
[WIP] Server tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo committed Nov 14, 2024
1 parent fc975bd commit 5af4524
Show file tree
Hide file tree
Showing 15 changed files with 971 additions and 176 deletions.
1 change: 1 addition & 0 deletions packages/rust/armonik/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions packages/rust/armonik/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ hyper-util = { version = "0.1", features = ["client", "http1"] }
http-body-util = "0.1"
serde_json = "1.0"
serial_test = "3.1"
async-stream = "0.3"
tokio = { version = "1.41", features = [
"rt-multi-thread",
"macros",
Expand All @@ -58,10 +59,10 @@ tokio = { version = "1.41", features = [
[build-dependencies]
tonic-build = "0.12"

[[example]]
name = "client"
required-features = ["client"]
[[test]]
name = "sessions"
required-features = ["client", "server"]

[[example]]
name = "server"
required-features = ["server"]
[[test]]
name = "agent"
required-features = ["client", "server"]
20 changes: 0 additions & 20 deletions packages/rust/armonik/examples/client.rs

This file was deleted.

128 changes: 0 additions & 128 deletions packages/rust/armonik/examples/server.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ impl From<Response> for v3::agent::CreateResultsMetaDataResponse {
fn from(value: Response) -> Self {
Self {
communication_token: value.communication_token,
results: value.results.into_values().map(Into::into).collect(),
results: value
.results
.into_iter()
.map(|(k, v)| {
debug_assert_eq!(k, v.name);
v.into()
})
.collect(),
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions packages/rust/armonik/src/server/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ impl<T: EventsService + Send + Sync + 'static> EventsServiceExt for T {

#[crate::reexports::async_trait]
impl<T: EventsService + Send + Sync + 'static> v3::events::events_server::Events for T {
type GetEventsStream = crate::reexports::tokio_stream::wrappers::ReceiverStream<
Result<v3::events::EventSubscriptionResponse, tonic::Status>,
>;
type GetEventsStream = crate::server::ServerStream<v3::events::EventSubscriptionResponse>;
async fn get_events(
self: Arc<Self>,
request: tonic::Request<v3::events::EventSubscriptionRequest>,
Expand Down
32 changes: 26 additions & 6 deletions packages/rust/armonik/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ mod events;
mod partitions;
mod results;
mod sessions;
mod submitter;
mod tasks;
mod versions;
mod worker;
mod submitter;

pub use agent::{AgentService, AgentServiceExt};
pub use applications::{ApplicationsService, ApplicationsServiceExt};
Expand All @@ -17,10 +17,10 @@ pub use events::{EventsService, EventsServiceExt};
pub use partitions::{PartitionsService, PartitionsServiceExt};
pub use results::{ResultsService, ResultsServiceExt};
pub use sessions::{SessionsService, SessionsServiceExt};
pub use submitter::{SubmitterService, SubmitterServiceExt};
pub use tasks::{TasksService, TasksServiceExt};
pub use versions::{VersionsService, VersionsServiceExt};
pub use worker::{WorkerService, WorkerServiceExt};
pub use submitter::{SubmitterService, SubmitterServiceExt};

macro_rules! define_trait_methods {
(trait $name:ident {$($(#[$attr:meta])* fn $service:ident::$method:ident ;)* $(--- $($body:tt)*)?}) => {
Expand Down Expand Up @@ -56,7 +56,7 @@ macro_rules! impl_trait_methods {
(unary ($self:ident, $request:ident) { $inner:path }) => {
{
let ct = tokio_util::sync::CancellationToken::new();
let _cancel_guard = ct.clone().drop_guard();
let _drop_guard = ct.clone().drop_guard();
let fut = tokio::spawn(async move { $inner($self, $request.into_inner().into(), ct).await});
match fut.await {
Ok(Ok(res)) => Ok(tonic::Response::new(res.into())),
Expand All @@ -68,7 +68,7 @@ macro_rules! impl_trait_methods {
(stream client ($self:ident, $request:ident) { $inner:path }) => {
{
let ct = tokio_util::sync::CancellationToken::new();
let _cancel_guard = ct.clone().drop_guard();
let _drop_guard = ct.clone().drop_guard();
let fut = tokio::spawn(async move {
$inner(
$self,
Expand All @@ -86,7 +86,7 @@ macro_rules! impl_trait_methods {
(stream server ($self:ident, $request:ident) { $inner:path }) => {
{
let ct = tokio_util::sync::CancellationToken::new();
let _cancel_guard = ct.clone().drop_guard();
let drop_guard = ct.clone().drop_guard();
let fut = tokio::spawn(async move { $inner($self, $request.into_inner().into(), ct).await });
match fut.await {
Ok(Ok(stream)) => {
Expand All @@ -100,7 +100,10 @@ macro_rules! impl_trait_methods {
});

Ok(tonic::Response::new(
crate::reexports::tokio_stream::wrappers::ReceiverStream::new(rx),
crate::server::ServerStream{
receiver: rx,
drop_guard,
},
))
}
Ok(Err(err)) => Err(err),
Expand All @@ -110,5 +113,22 @@ macro_rules! impl_trait_methods {
};
}

pub struct ServerStream<T> {
receiver: tokio::sync::mpsc::Receiver<Result<T, tonic::Status>>,
#[allow(unused)]
drop_guard: tokio_util::sync::DropGuard,
}

impl<T> crate::reexports::tokio_stream::Stream for ServerStream<T> {
type Item = Result<T, tonic::Status>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}

use define_trait_methods;
use impl_trait_methods;
8 changes: 2 additions & 6 deletions packages/rust/armonik/src/server/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ super::impl_trait_methods! {
crate::server::impl_trait_methods!(stream client (self, request) {ResultsService::upload})
}

type DownloadResultDataStream = crate::reexports::tokio_stream::wrappers::ReceiverStream<
Result<v3::results::DownloadResultDataResponse, tonic::Status>,
>;
type DownloadResultDataStream = crate::server::ServerStream<v3::results::DownloadResultDataResponse>;
async fn download_result_data(
self: std::sync::Arc<Self>,
request: tonic::Request<v3::results::DownloadResultDataRequest>,
Expand All @@ -100,9 +98,7 @@ super::impl_trait_methods! {
super::impl_trait_methods!(stream server (self, request) {ResultsService::download})
}

type WatchResultsStream = crate::reexports::tokio_stream::wrappers::ReceiverStream<
Result<v3::results::WatchResultResponse, tonic::Status>,
>;
type WatchResultsStream = crate::server::ServerStream<v3::results::WatchResultResponse>;
async fn watch_results(
self: std::sync::Arc<Self>,
_request: tonic::Request<tonic::Streaming<v3::results::WatchResultRequest>>,
Expand Down
8 changes: 2 additions & 6 deletions packages/rust/armonik/src/server/submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ super::impl_trait_methods! {
}


type TryGetResultStreamStream = crate::reexports::tokio_stream::wrappers::ReceiverStream<
Result<v3::submitter::ResultReply, tonic::Status>,
>;
type TryGetResultStreamStream = crate::server::ServerStream<v3::submitter::ResultReply>;
async fn try_get_result_stream(
self: std::sync::Arc<Self>,
request: tonic::Request<v3::ResultRequest>,
Expand All @@ -103,9 +101,7 @@ super::impl_trait_methods! {
}


type WatchResultsStream = crate::reexports::tokio_stream::wrappers::ReceiverStream<
Result<v3::submitter::WatchResultStream, tonic::Status>,
>;
type WatchResultsStream = crate::server::ServerStream<v3::submitter::WatchResultStream>;
async fn watch_results(
self: std::sync::Arc<Self>,
_request: tonic::Request<tonic::Streaming<v3::submitter::WatchResultRequest>>,
Expand Down
Loading

0 comments on commit 5af4524

Please sign in to comment.