-
Notifications
You must be signed in to change notification settings - Fork 49
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This example is the same as the golang version. See `https://github.com/containerd/ttrpc/tree/main/integration/streaming` for details. Signed-off-by: wllenyj <[email protected]>
- Loading branch information
Showing
7 changed files
with
413 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
// Copyright (c) 2020 Ant Financial | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
|
||
mod protocols; | ||
mod utils; | ||
|
||
use protocols::r#async::{empty, streaming, streaming_ttrpc}; | ||
use ttrpc::context::{self, Context}; | ||
use ttrpc::r#async::Client; | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
async fn main() { | ||
simple_logging::log_to_stderr(log::LevelFilter::Trace); | ||
//simple_logging::log_to_stderr(log::LevelFilter::Info); | ||
|
||
let c = Client::connect(utils::SOCK_ADDR).unwrap(); | ||
let sc = streaming_ttrpc::StreamingClient::new(c); | ||
|
||
let _now = std::time::Instant::now(); | ||
|
||
let sc1 = sc.clone(); | ||
let t1 = tokio::spawn(echo_request(sc1)); | ||
|
||
let sc1 = sc.clone(); | ||
let t2 = tokio::spawn(echo_stream(sc1)); | ||
|
||
let sc1 = sc.clone(); | ||
let t3 = tokio::spawn(sum_stream(sc1)); | ||
|
||
let sc1 = sc.clone(); | ||
let t4 = tokio::spawn(divide_stream(sc1)); | ||
|
||
let sc1 = sc.clone(); | ||
let t5 = tokio::spawn(echo_null(sc1)); | ||
|
||
let t6 = tokio::spawn(echo_null_stream(sc)); | ||
|
||
let _ = tokio::join!(t1, t2, t3, t4, t5, t6); | ||
} | ||
|
||
fn default_ctx() -> Context { | ||
let mut ctx = context::with_timeout(0); | ||
ctx.add("key-1".to_string(), "value-1-1".to_string()); | ||
ctx.add("key-1".to_string(), "value-1-2".to_string()); | ||
ctx.set("key-2".to_string(), vec!["value-2".to_string()]); | ||
|
||
ctx | ||
} | ||
|
||
async fn echo_request(cli: streaming_ttrpc::StreamingClient) { | ||
let echo1 = streaming::EchoPayload { | ||
seq: 1, | ||
msg: "Echo Me".to_string(), | ||
..Default::default() | ||
}; | ||
let resp = cli.echo(default_ctx(), &echo1).await.unwrap(); | ||
assert_eq!(resp.msg, echo1.msg); | ||
assert_eq!(resp.seq, echo1.seq + 1); | ||
//tokio::time::sleep(std::time::Duration::from_secs(100)).await; | ||
} | ||
|
||
async fn echo_stream(cli: streaming_ttrpc::StreamingClient) { | ||
let mut stream = cli.echo_stream(default_ctx()).await.unwrap(); | ||
|
||
let mut i = 0; | ||
while i < 100 { | ||
let echo = streaming::EchoPayload { | ||
seq: i as u32, | ||
msg: format!("{}: Echo in a stream", i), | ||
..Default::default() | ||
}; | ||
stream.send(&echo).await.unwrap(); | ||
let resp = stream.recv().await.unwrap(); | ||
assert_eq!(resp.msg, echo.msg); | ||
assert_eq!(resp.seq, echo.seq + 1); | ||
|
||
i += 2; | ||
} | ||
tokio::time::sleep(std::time::Duration::from_secs(20)).await; | ||
stream.close_send().await.unwrap(); | ||
let ret = stream.recv().await; | ||
assert!(matches!(ret, Err(ttrpc::Error::EOF))); | ||
} | ||
|
||
async fn sum_stream(cli: streaming_ttrpc::StreamingClient) { | ||
let mut stream = cli.sum_stream(default_ctx()).await.unwrap(); | ||
|
||
let mut sum = streaming::Sum::new(); | ||
stream.send(&streaming::Part::new()).await.unwrap(); | ||
|
||
sum.num += 1; | ||
let mut i = -99i32; | ||
while i <= 100 { | ||
let addi = streaming::Part { | ||
add: i, | ||
..Default::default() | ||
}; | ||
stream.send(&addi).await.unwrap(); | ||
sum.sum += i; | ||
sum.num += 1; | ||
|
||
i += 1; | ||
} | ||
stream.send(&streaming::Part::new()).await.unwrap(); | ||
sum.num += 1; | ||
|
||
let ssum = stream.close_and_recv().await.unwrap(); | ||
assert_eq!(ssum.sum, sum.sum); | ||
assert_eq!(ssum.num, sum.num); | ||
} | ||
|
||
async fn divide_stream(cli: streaming_ttrpc::StreamingClient) { | ||
let expected = streaming::Sum { | ||
sum: 392, | ||
num: 4, | ||
..Default::default() | ||
}; | ||
let mut stream = cli.divide_stream(default_ctx(), &expected).await.unwrap(); | ||
|
||
let mut actual = streaming::Sum::new(); | ||
|
||
// NOTE: `for part in stream.recv().await.unwrap()` can't work. | ||
while let Some(part) = stream.recv().await.unwrap() { | ||
actual.sum += part.add; | ||
actual.num += 1; | ||
} | ||
assert_eq!(actual.sum, expected.sum); | ||
assert_eq!(actual.num, expected.num); | ||
} | ||
|
||
async fn echo_null(cli: streaming_ttrpc::StreamingClient) { | ||
let mut stream = cli.echo_null(default_ctx()).await.unwrap(); | ||
|
||
for i in 0..100 { | ||
let echo = streaming::EchoPayload { | ||
seq: i as u32, | ||
msg: "non-empty empty".to_string(), | ||
..Default::default() | ||
}; | ||
stream.send(&echo).await.unwrap(); | ||
} | ||
let res = stream.close_and_recv().await.unwrap(); | ||
assert_eq!(res, empty::Empty::new()); | ||
} | ||
|
||
async fn echo_null_stream(cli: streaming_ttrpc::StreamingClient) { | ||
let stream = cli.echo_null_stream(default_ctx()).await.unwrap(); | ||
|
||
let (tx, mut rx) = stream.split(); | ||
|
||
let task = tokio::spawn(async move { | ||
loop { | ||
let ret = rx.recv().await; | ||
if matches!(ret, Err(ttrpc::Error::EOF)) { | ||
break; | ||
} | ||
} | ||
}); | ||
|
||
for i in 0..100 { | ||
let echo = streaming::EchoPayload { | ||
seq: i as u32, | ||
msg: "non-empty empty".to_string(), | ||
..Default::default() | ||
}; | ||
tx.send(&echo).await.unwrap(); | ||
} | ||
|
||
tx.close_send().await.unwrap(); | ||
|
||
tokio::time::timeout(tokio::time::Duration::from_secs(10), task) | ||
.await | ||
.unwrap() | ||
.unwrap(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
// Copyright (c) 2020 Ant Financial | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
|
||
mod protocols; | ||
mod utils; | ||
|
||
use std::sync::Arc; | ||
|
||
use log::{info, LevelFilter}; | ||
|
||
use protocols::r#async::{empty, streaming, streaming_ttrpc}; | ||
use ttrpc::asynchronous::Server; | ||
|
||
use async_trait::async_trait; | ||
use tokio::signal::unix::{signal, SignalKind}; | ||
use tokio::time::sleep; | ||
|
||
struct StreamingService; | ||
|
||
#[async_trait] | ||
impl streaming_ttrpc::Streaming for StreamingService { | ||
async fn echo( | ||
&self, | ||
_ctx: &::ttrpc::r#async::TtrpcContext, | ||
mut e: streaming::EchoPayload, | ||
) -> ::ttrpc::Result<streaming::EchoPayload> { | ||
e.seq += 1; | ||
Ok(e) | ||
} | ||
|
||
async fn echo_stream( | ||
&self, | ||
_ctx: &::ttrpc::r#async::TtrpcContext, | ||
mut s: ::ttrpc::r#async::ServerStream<streaming::EchoPayload, streaming::EchoPayload>, | ||
) -> ::ttrpc::Result<()> { | ||
while let Some(mut e) = s.recv().await? { | ||
e.seq += 1; | ||
s.send(&e).await?; | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
async fn sum_stream( | ||
&self, | ||
_ctx: &::ttrpc::r#async::TtrpcContext, | ||
mut s: ::ttrpc::r#async::ServerStreamReceiver<streaming::Part>, | ||
) -> ::ttrpc::Result<streaming::Sum> { | ||
let mut sum = streaming::Sum::new(); | ||
while let Some(part) = s.recv().await? { | ||
sum.sum += part.add; | ||
sum.num += 1; | ||
} | ||
|
||
Ok(sum) | ||
} | ||
|
||
async fn divide_stream( | ||
&self, | ||
_ctx: &::ttrpc::r#async::TtrpcContext, | ||
sum: streaming::Sum, | ||
s: ::ttrpc::r#async::ServerStreamSender<streaming::Part>, | ||
) -> ::ttrpc::Result<()> { | ||
let mut parts = vec![streaming::Part::new(); sum.num as usize]; | ||
|
||
let mut total = 0i32; | ||
for i in 1..(sum.num - 2) { | ||
let add = (rand::random::<u32>() % 1000) as i32 - 500; | ||
parts[i as usize].add = add; | ||
total += add; | ||
} | ||
|
||
parts[sum.num as usize - 2].add = sum.sum - total; | ||
|
||
for part in parts { | ||
s.send(&part).await.unwrap(); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
async fn echo_null( | ||
&self, | ||
_ctx: &::ttrpc::r#async::TtrpcContext, | ||
mut s: ::ttrpc::r#async::ServerStreamReceiver<streaming::EchoPayload>, | ||
) -> ::ttrpc::Result<empty::Empty> { | ||
let mut seq = 0; | ||
while let Some(e) = s.recv().await? { | ||
assert_eq!(e.seq, seq); | ||
assert_eq!(e.msg.as_str(), "non-empty empty"); | ||
seq += 1; | ||
} | ||
Ok(empty::Empty::new()) | ||
} | ||
|
||
async fn echo_null_stream( | ||
&self, | ||
_ctx: &::ttrpc::r#async::TtrpcContext, | ||
s: ::ttrpc::r#async::ServerStream<empty::Empty, streaming::EchoPayload>, | ||
) -> ::ttrpc::Result<()> { | ||
let msg = "non-empty empty".to_string(); | ||
|
||
let mut tasks = Vec::new(); | ||
|
||
let (tx, mut rx) = s.split(); | ||
let mut seq = 0u32; | ||
while let Some(e) = rx.recv().await? { | ||
assert_eq!(e.seq, seq); | ||
assert_eq!(e.msg, msg); | ||
seq += 1; | ||
|
||
for _i in 0..10 { | ||
let tx = tx.clone(); | ||
tasks.push(tokio::spawn( | ||
async move { tx.send(&empty::Empty::new()).await }, | ||
)); | ||
} | ||
} | ||
|
||
for t in tasks { | ||
t.await.unwrap().map_err(|e| { | ||
::ttrpc::Error::RpcStatus(::ttrpc::get_status( | ||
::ttrpc::Code::UNKNOWN, | ||
e.to_string(), | ||
)) | ||
})?; | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
#[tokio::main(flavor = "current_thread")] | ||
async fn main() { | ||
simple_logging::log_to_stderr(LevelFilter::Trace); | ||
//simple_logging::log_to_stderr(LevelFilter::Info); | ||
|
||
let s = Box::new(StreamingService {}) as Box<dyn streaming_ttrpc::Streaming + Send + Sync>; | ||
let s = Arc::new(s); | ||
let service = streaming_ttrpc::create_streaming(s); | ||
|
||
utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap(); | ||
|
||
let mut server = Server::new() | ||
.bind(utils::SOCK_ADDR) | ||
.unwrap() | ||
.register_service(service); | ||
|
||
let mut hangup = signal(SignalKind::hangup()).unwrap(); | ||
let mut interrupt = signal(SignalKind::interrupt()).unwrap(); | ||
server.start().await.unwrap(); | ||
|
||
tokio::select! { | ||
_ = hangup.recv() => { | ||
// test stop_listen -> start | ||
info!("stop listen"); | ||
server.stop_listen().await; | ||
info!("start listen"); | ||
server.start().await.unwrap(); | ||
|
||
// hold some time for the new test connection. | ||
sleep(std::time::Duration::from_secs(100)).await; | ||
} | ||
_ = interrupt.recv() => { | ||
// test graceful shutdown | ||
info!("graceful shutdown"); | ||
server.shutdown().await.unwrap(); | ||
} | ||
}; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,3 +10,5 @@ pub mod health; | |
pub mod health_ttrpc; | ||
mod oci; | ||
pub mod types; | ||
pub mod streaming; | ||
pub mod streaming_ttrpc; |
Oops, something went wrong.