Skip to content

Commit

Permalink
example: add streaming example
Browse files Browse the repository at this point in the history
This example is the same as the golang version.
See
`https://github.com/containerd/ttrpc/tree/main/integration/streaming`
for details.

Signed-off-by: wllenyj <[email protected]>
  • Loading branch information
wllenyj committed Jun 6, 2022
1 parent 9ebe640 commit bda9829
Show file tree
Hide file tree
Showing 7 changed files with 413 additions and 2 deletions.
9 changes: 9 additions & 0 deletions example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ttrpc = { path = "../", features = ["async"] }
ctrlc = { version = "3.0", features = ["termination"] }
tokio = { version = "1.0.1", features = ["signal", "time"] }
async-trait = "0.1.42"
rand = "0.8.5"


[[example]]
Expand All @@ -40,5 +41,13 @@ path = "./async-server.rs"
name = "async-client"
path = "./async-client.rs"

[[example]]
name = "async-stream-server"
path = "./async-stream-server.rs"

[[example]]
name = "async-stream-client"
path = "./async-stream-client.rs"

[build-dependencies]
ttrpc-codegen = { path = "../ttrpc-codegen"}
177 changes: 177 additions & 0 deletions example/async-stream-client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2022 Alibaba Cloud. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

mod protocols;
mod utils;

use protocols::r#async::{empty, streaming, streaming_ttrpc};
use ttrpc::context::{self, Context};
use ttrpc::r#async::Client;

#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logging::log_to_stderr(log::LevelFilter::Trace);
//simple_logging::log_to_stderr(log::LevelFilter::Info);

let c = Client::connect(utils::SOCK_ADDR).unwrap();
let sc = streaming_ttrpc::StreamingClient::new(c);

let _now = std::time::Instant::now();

let sc1 = sc.clone();
let t1 = tokio::spawn(echo_request(sc1));

let sc1 = sc.clone();
let t2 = tokio::spawn(echo_stream(sc1));

let sc1 = sc.clone();
let t3 = tokio::spawn(sum_stream(sc1));

let sc1 = sc.clone();
let t4 = tokio::spawn(divide_stream(sc1));

let sc1 = sc.clone();
let t5 = tokio::spawn(echo_null(sc1));

let t6 = tokio::spawn(echo_null_stream(sc));

let _ = tokio::join!(t1, t2, t3, t4, t5, t6);
}

fn default_ctx() -> Context {
let mut ctx = context::with_timeout(0);
ctx.add("key-1".to_string(), "value-1-1".to_string());
ctx.add("key-1".to_string(), "value-1-2".to_string());
ctx.set("key-2".to_string(), vec!["value-2".to_string()]);

ctx
}

async fn echo_request(cli: streaming_ttrpc::StreamingClient) {
let echo1 = streaming::EchoPayload {
seq: 1,
msg: "Echo Me".to_string(),
..Default::default()
};
let resp = cli.echo(default_ctx(), &echo1).await.unwrap();
assert_eq!(resp.msg, echo1.msg);
assert_eq!(resp.seq, echo1.seq + 1);
//tokio::time::sleep(std::time::Duration::from_secs(100)).await;
}

async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.echo_stream(default_ctx()).await.unwrap();

let mut i = 0;
while i < 100 {
let echo = streaming::EchoPayload {
seq: i as u32,
msg: format!("{}: Echo in a stream", i),
..Default::default()
};
stream.send(&echo).await.unwrap();
let resp = stream.recv().await.unwrap();
assert_eq!(resp.msg, echo.msg);
assert_eq!(resp.seq, echo.seq + 1);

i += 2;
}
tokio::time::sleep(std::time::Duration::from_secs(20)).await;
stream.close_send().await.unwrap();
let ret = stream.recv().await;
assert!(matches!(ret, Err(ttrpc::Error::EOF)));
}

async fn sum_stream(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.sum_stream(default_ctx()).await.unwrap();

let mut sum = streaming::Sum::new();
stream.send(&streaming::Part::new()).await.unwrap();

sum.num += 1;
let mut i = -99i32;
while i <= 100 {
let addi = streaming::Part {
add: i,
..Default::default()
};
stream.send(&addi).await.unwrap();
sum.sum += i;
sum.num += 1;

i += 1;
}
stream.send(&streaming::Part::new()).await.unwrap();
sum.num += 1;

let ssum = stream.close_and_recv().await.unwrap();
assert_eq!(ssum.sum, sum.sum);
assert_eq!(ssum.num, sum.num);
}

async fn divide_stream(cli: streaming_ttrpc::StreamingClient) {
let expected = streaming::Sum {
sum: 392,
num: 4,
..Default::default()
};
let mut stream = cli.divide_stream(default_ctx(), &expected).await.unwrap();

let mut actual = streaming::Sum::new();

// NOTE: `for part in stream.recv().await.unwrap()` can't work.
while let Some(part) = stream.recv().await.unwrap() {
actual.sum += part.add;
actual.num += 1;
}
assert_eq!(actual.sum, expected.sum);
assert_eq!(actual.num, expected.num);
}

async fn echo_null(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.echo_null(default_ctx()).await.unwrap();

for i in 0..100 {
let echo = streaming::EchoPayload {
seq: i as u32,
msg: "non-empty empty".to_string(),
..Default::default()
};
stream.send(&echo).await.unwrap();
}
let res = stream.close_and_recv().await.unwrap();
assert_eq!(res, empty::Empty::new());
}

async fn echo_null_stream(cli: streaming_ttrpc::StreamingClient) {
let stream = cli.echo_null_stream(default_ctx()).await.unwrap();

let (tx, mut rx) = stream.split();

let task = tokio::spawn(async move {
loop {
let ret = rx.recv().await;
if matches!(ret, Err(ttrpc::Error::EOF)) {
break;
}
}
});

for i in 0..100 {
let echo = streaming::EchoPayload {
seq: i as u32,
msg: "non-empty empty".to_string(),
..Default::default()
};
tx.send(&echo).await.unwrap();
}

tx.close_send().await.unwrap();

tokio::time::timeout(tokio::time::Duration::from_secs(10), task)
.await
.unwrap()
.unwrap();
}
171 changes: 171 additions & 0 deletions example/async-stream-server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2022 Alibaba Cloud. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

mod protocols;
mod utils;

use std::sync::Arc;

use log::{info, LevelFilter};

use protocols::r#async::{empty, streaming, streaming_ttrpc};
use ttrpc::asynchronous::Server;

use async_trait::async_trait;
use tokio::signal::unix::{signal, SignalKind};
use tokio::time::sleep;

struct StreamingService;

#[async_trait]
impl streaming_ttrpc::Streaming for StreamingService {
async fn echo(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
mut e: streaming::EchoPayload,
) -> ::ttrpc::Result<streaming::EchoPayload> {
e.seq += 1;
Ok(e)
}

async fn echo_stream(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
mut s: ::ttrpc::r#async::ServerStream<streaming::EchoPayload, streaming::EchoPayload>,
) -> ::ttrpc::Result<()> {
while let Some(mut e) = s.recv().await? {
e.seq += 1;
s.send(&e).await?;
}

Ok(())
}

async fn sum_stream(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
mut s: ::ttrpc::r#async::ServerStreamReceiver<streaming::Part>,
) -> ::ttrpc::Result<streaming::Sum> {
let mut sum = streaming::Sum::new();
while let Some(part) = s.recv().await? {
sum.sum += part.add;
sum.num += 1;
}

Ok(sum)
}

async fn divide_stream(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
sum: streaming::Sum,
s: ::ttrpc::r#async::ServerStreamSender<streaming::Part>,
) -> ::ttrpc::Result<()> {
let mut parts = vec![streaming::Part::new(); sum.num as usize];

let mut total = 0i32;
for i in 1..(sum.num - 2) {
let add = (rand::random::<u32>() % 1000) as i32 - 500;
parts[i as usize].add = add;
total += add;
}

parts[sum.num as usize - 2].add = sum.sum - total;

for part in parts {
s.send(&part).await.unwrap();
}

Ok(())
}

async fn echo_null(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
mut s: ::ttrpc::r#async::ServerStreamReceiver<streaming::EchoPayload>,
) -> ::ttrpc::Result<empty::Empty> {
let mut seq = 0;
while let Some(e) = s.recv().await? {
assert_eq!(e.seq, seq);
assert_eq!(e.msg.as_str(), "non-empty empty");
seq += 1;
}
Ok(empty::Empty::new())
}

async fn echo_null_stream(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
s: ::ttrpc::r#async::ServerStream<empty::Empty, streaming::EchoPayload>,
) -> ::ttrpc::Result<()> {
let msg = "non-empty empty".to_string();

let mut tasks = Vec::new();

let (tx, mut rx) = s.split();
let mut seq = 0u32;
while let Some(e) = rx.recv().await? {
assert_eq!(e.seq, seq);
assert_eq!(e.msg, msg);
seq += 1;

for _i in 0..10 {
let tx = tx.clone();
tasks.push(tokio::spawn(
async move { tx.send(&empty::Empty::new()).await },
));
}
}

for t in tasks {
t.await.unwrap().map_err(|e| {
::ttrpc::Error::RpcStatus(::ttrpc::get_status(
::ttrpc::Code::UNKNOWN,
e.to_string(),
))
})?;
}
Ok(())
}
}

#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logging::log_to_stderr(LevelFilter::Trace);
//simple_logging::log_to_stderr(LevelFilter::Info);

let s = Box::new(StreamingService {}) as Box<dyn streaming_ttrpc::Streaming + Send + Sync>;
let s = Arc::new(s);
let service = streaming_ttrpc::create_streaming(s);

utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap();

let mut server = Server::new()
.bind(utils::SOCK_ADDR)
.unwrap()
.register_service(service);

let mut hangup = signal(SignalKind::hangup()).unwrap();
let mut interrupt = signal(SignalKind::interrupt()).unwrap();
server.start().await.unwrap();

tokio::select! {
_ = hangup.recv() => {
// test stop_listen -> start
info!("stop listen");
server.stop_listen().await;
info!("start listen");
server.start().await.unwrap();

// hold some time for the new test connection.
sleep(std::time::Duration::from_secs(100)).await;
}
_ = interrupt.recv() => {
// test graceful shutdown
info!("graceful shutdown");
server.shutdown().await.unwrap();
}
};
}
5 changes: 4 additions & 1 deletion example/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ttrpc_codegen::Codegen;
use ttrpc_codegen::Customize;

fn main() {
let protos = vec![
let mut protos = vec![
"protocols/protos/github.com/kata-containers/agent/pkg/types/types.proto",
"protocols/protos/agent.proto",
"protocols/protos/health.proto",
Expand All @@ -28,6 +28,9 @@ fn main() {
.run()
.expect("Gen sync code failed.");

// Only async support stream currently.
protos.push("protocols/protos/streaming.proto");

Codegen::new()
.out_dir("protocols/asynchronous")
.inputs(&protos)
Expand Down
2 changes: 2 additions & 0 deletions example/protocols/asynchronous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ pub mod health;
pub mod health_ttrpc;
mod oci;
pub mod types;
pub mod streaming;
pub mod streaming_ttrpc;
Loading

0 comments on commit bda9829

Please sign in to comment.