diff --git a/example/Cargo.toml b/example/Cargo.toml index fe9a099f..c873be51 100644 --- a/example/Cargo.toml +++ b/example/Cargo.toml @@ -22,6 +22,7 @@ ttrpc = { path = "../", features = ["async"] } ctrlc = { version = "3.0", features = ["termination"] } tokio = { version = "1.0.1", features = ["signal", "time"] } async-trait = "0.1.42" +rand = "0.8.5" [[example]] @@ -40,5 +41,13 @@ path = "./async-server.rs" name = "async-client" path = "./async-client.rs" +[[example]] +name = "async-stream-server" +path = "./async-stream-server.rs" + +[[example]] +name = "async-stream-client" +path = "./async-stream-client.rs" + [build-dependencies] ttrpc-codegen = { path = "../ttrpc-codegen"} diff --git a/example/async-stream-client.rs b/example/async-stream-client.rs new file mode 100644 index 00000000..0d7032af --- /dev/null +++ b/example/async-stream-client.rs @@ -0,0 +1,177 @@ +// Copyright (c) 2020 Ant Financial +// +// SPDX-License-Identifier: Apache-2.0 +// + +mod protocols; +mod utils; + +use protocols::r#async::{empty, streaming, streaming_ttrpc}; +use ttrpc::context::{self, Context}; +use ttrpc::r#async::Client; + +#[tokio::main(flavor = "current_thread")] +async fn main() { + simple_logging::log_to_stderr(log::LevelFilter::Trace); + //simple_logging::log_to_stderr(log::LevelFilter::Info); + + let c = Client::connect(utils::SOCK_ADDR).unwrap(); + let sc = streaming_ttrpc::StreamingClient::new(c); + + let _now = std::time::Instant::now(); + + let sc1 = sc.clone(); + let t1 = tokio::spawn(echo_request(sc1)); + + let sc1 = sc.clone(); + let t2 = tokio::spawn(echo_stream(sc1)); + + let sc1 = sc.clone(); + let t3 = tokio::spawn(sum_stream(sc1)); + + let sc1 = sc.clone(); + let t4 = tokio::spawn(divide_stream(sc1)); + + let sc1 = sc.clone(); + let t5 = tokio::spawn(echo_null(sc1)); + + let t6 = tokio::spawn(echo_null_stream(sc)); + + let _ = tokio::join!(t1, t2, t3, t4, t5, t6); +} + +fn default_ctx() -> Context { + let mut ctx = context::with_timeout(0); + ctx.add("key-1".to_string(), "value-1-1".to_string()); + ctx.add("key-1".to_string(), "value-1-2".to_string()); + ctx.set("key-2".to_string(), vec!["value-2".to_string()]); + + ctx +} + +async fn echo_request(cli: streaming_ttrpc::StreamingClient) { + let echo1 = streaming::EchoPayload { + seq: 1, + msg: "Echo Me".to_string(), + ..Default::default() + }; + let resp = cli.echo(default_ctx(), &echo1).await.unwrap(); + assert_eq!(resp.msg, echo1.msg); + assert_eq!(resp.seq, echo1.seq + 1); + //tokio::time::sleep(std::time::Duration::from_secs(100)).await; +} + +async fn echo_stream(cli: streaming_ttrpc::StreamingClient) { + let mut stream = cli.echo_stream(default_ctx()).await.unwrap(); + + let mut i = 0; + while i < 100 { + let echo = streaming::EchoPayload { + seq: i as u32, + msg: format!("{}: Echo in a stream", i), + ..Default::default() + }; + stream.send(&echo).await.unwrap(); + let resp = stream.recv().await.unwrap(); + assert_eq!(resp.msg, echo.msg); + assert_eq!(resp.seq, echo.seq + 1); + + i += 2; + } + tokio::time::sleep(std::time::Duration::from_secs(20)).await; + stream.close_send().await.unwrap(); + let ret = stream.recv().await; + assert!(matches!(ret, Err(ttrpc::Error::EOF))); +} + +async fn sum_stream(cli: streaming_ttrpc::StreamingClient) { + let mut stream = cli.sum_stream(default_ctx()).await.unwrap(); + + let mut sum = streaming::Sum::new(); + stream.send(&streaming::Part::new()).await.unwrap(); + + sum.num += 1; + let mut i = -99i32; + while i <= 100 { + let addi = streaming::Part { + add: i, + ..Default::default() + }; + stream.send(&addi).await.unwrap(); + sum.sum += i; + sum.num += 1; + + i += 1; + } + stream.send(&streaming::Part::new()).await.unwrap(); + sum.num += 1; + + let ssum = stream.close_and_recv().await.unwrap(); + assert_eq!(ssum.sum, sum.sum); + assert_eq!(ssum.num, sum.num); +} + +async fn divide_stream(cli: streaming_ttrpc::StreamingClient) { + let expected = streaming::Sum { + sum: 392, + num: 4, + ..Default::default() + }; + let mut stream = cli.divide_stream(default_ctx(), &expected).await.unwrap(); + + let mut actual = streaming::Sum::new(); + + // NOTE: `for part in stream.recv().await.unwrap()` can't work. + while let Some(part) = stream.recv().await.unwrap() { + actual.sum += part.add; + actual.num += 1; + } + assert_eq!(actual.sum, expected.sum); + assert_eq!(actual.num, expected.num); +} + +async fn echo_null(cli: streaming_ttrpc::StreamingClient) { + let mut stream = cli.echo_null(default_ctx()).await.unwrap(); + + for i in 0..100 { + let echo = streaming::EchoPayload { + seq: i as u32, + msg: "non-empty empty".to_string(), + ..Default::default() + }; + stream.send(&echo).await.unwrap(); + } + let res = stream.close_and_recv().await.unwrap(); + assert_eq!(res, empty::Empty::new()); +} + +async fn echo_null_stream(cli: streaming_ttrpc::StreamingClient) { + let stream = cli.echo_null_stream(default_ctx()).await.unwrap(); + + let (tx, mut rx) = stream.split(); + + let task = tokio::spawn(async move { + loop { + let ret = rx.recv().await; + if matches!(ret, Err(ttrpc::Error::EOF)) { + break; + } + } + }); + + for i in 0..100 { + let echo = streaming::EchoPayload { + seq: i as u32, + msg: "non-empty empty".to_string(), + ..Default::default() + }; + tx.send(&echo).await.unwrap(); + } + + tx.close_send().await.unwrap(); + + tokio::time::timeout(tokio::time::Duration::from_secs(10), task) + .await + .unwrap() + .unwrap(); +} diff --git a/example/async-stream-server.rs b/example/async-stream-server.rs new file mode 100644 index 00000000..7d8733f7 --- /dev/null +++ b/example/async-stream-server.rs @@ -0,0 +1,171 @@ +// Copyright (c) 2020 Ant Financial +// +// SPDX-License-Identifier: Apache-2.0 +// + +mod protocols; +mod utils; + +use std::sync::Arc; + +use log::{info, LevelFilter}; + +use protocols::r#async::{empty, streaming, streaming_ttrpc}; +use ttrpc::asynchronous::Server; + +use async_trait::async_trait; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::time::sleep; + +struct StreamingService; + +#[async_trait] +impl streaming_ttrpc::Streaming for StreamingService { + async fn echo( + &self, + _ctx: &::ttrpc::r#async::TtrpcContext, + mut e: streaming::EchoPayload, + ) -> ::ttrpc::Result { + e.seq += 1; + Ok(e) + } + + async fn echo_stream( + &self, + _ctx: &::ttrpc::r#async::TtrpcContext, + mut s: ::ttrpc::r#async::ServerStream, + ) -> ::ttrpc::Result<()> { + while let Some(mut e) = s.recv().await? { + e.seq += 1; + s.send(&e).await?; + } + + Ok(()) + } + + async fn sum_stream( + &self, + _ctx: &::ttrpc::r#async::TtrpcContext, + mut s: ::ttrpc::r#async::ServerStreamReceiver, + ) -> ::ttrpc::Result { + let mut sum = streaming::Sum::new(); + while let Some(part) = s.recv().await? { + sum.sum += part.add; + sum.num += 1; + } + + Ok(sum) + } + + async fn divide_stream( + &self, + _ctx: &::ttrpc::r#async::TtrpcContext, + sum: streaming::Sum, + s: ::ttrpc::r#async::ServerStreamSender, + ) -> ::ttrpc::Result<()> { + let mut parts = vec![streaming::Part::new(); sum.num as usize]; + + let mut total = 0i32; + for i in 1..(sum.num - 2) { + let add = (rand::random::() % 1000) as i32 - 500; + parts[i as usize].add = add; + total += add; + } + + parts[sum.num as usize - 2].add = sum.sum - total; + + for part in parts { + s.send(&part).await.unwrap(); + } + + Ok(()) + } + + async fn echo_null( + &self, + _ctx: &::ttrpc::r#async::TtrpcContext, + mut s: ::ttrpc::r#async::ServerStreamReceiver, + ) -> ::ttrpc::Result { + let mut seq = 0; + while let Some(e) = s.recv().await? { + assert_eq!(e.seq, seq); + assert_eq!(e.msg.as_str(), "non-empty empty"); + seq += 1; + } + Ok(empty::Empty::new()) + } + + async fn echo_null_stream( + &self, + _ctx: &::ttrpc::r#async::TtrpcContext, + s: ::ttrpc::r#async::ServerStream, + ) -> ::ttrpc::Result<()> { + let msg = "non-empty empty".to_string(); + + let mut tasks = Vec::new(); + + let (tx, mut rx) = s.split(); + let mut seq = 0u32; + while let Some(e) = rx.recv().await? { + assert_eq!(e.seq, seq); + assert_eq!(e.msg, msg); + seq += 1; + + for _i in 0..10 { + let tx = tx.clone(); + tasks.push(tokio::spawn( + async move { tx.send(&empty::Empty::new()).await }, + )); + } + } + + for t in tasks { + t.await.unwrap().map_err(|e| { + ::ttrpc::Error::RpcStatus(::ttrpc::get_status( + ::ttrpc::Code::UNKNOWN, + e.to_string(), + )) + })?; + } + Ok(()) + } +} + +#[tokio::main(flavor = "current_thread")] +async fn main() { + simple_logging::log_to_stderr(LevelFilter::Trace); + //simple_logging::log_to_stderr(LevelFilter::Info); + + let s = Box::new(StreamingService {}) as Box; + let s = Arc::new(s); + let service = streaming_ttrpc::create_streaming(s); + + utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap(); + + let mut server = Server::new() + .bind(utils::SOCK_ADDR) + .unwrap() + .register_service(service); + + let mut hangup = signal(SignalKind::hangup()).unwrap(); + let mut interrupt = signal(SignalKind::interrupt()).unwrap(); + server.start().await.unwrap(); + + tokio::select! { + _ = hangup.recv() => { + // test stop_listen -> start + info!("stop listen"); + server.stop_listen().await; + info!("start listen"); + server.start().await.unwrap(); + + // hold some time for the new test connection. + sleep(std::time::Duration::from_secs(100)).await; + } + _ = interrupt.recv() => { + // test graceful shutdown + info!("graceful shutdown"); + server.shutdown().await.unwrap(); + } + }; +} diff --git a/example/build.rs b/example/build.rs index b5550e2b..7ebb9fb5 100644 --- a/example/build.rs +++ b/example/build.rs @@ -9,7 +9,7 @@ use ttrpc_codegen::Codegen; use ttrpc_codegen::Customize; fn main() { - let protos = vec![ + let mut protos = vec![ "protocols/protos/github.com/kata-containers/agent/pkg/types/types.proto", "protocols/protos/agent.proto", "protocols/protos/health.proto", @@ -28,6 +28,9 @@ fn main() { .run() .expect("Gen sync code failed."); + // Only async support stream currently. + protos.push("protocols/protos/streaming.proto"); + Codegen::new() .out_dir("protocols/asynchronous") .inputs(&protos) diff --git a/example/protocols/asynchronous/mod.rs b/example/protocols/asynchronous/mod.rs index fd7082cc..34df78b2 100644 --- a/example/protocols/asynchronous/mod.rs +++ b/example/protocols/asynchronous/mod.rs @@ -10,3 +10,5 @@ pub mod health; pub mod health_ttrpc; mod oci; pub mod types; +pub mod streaming; +pub mod streaming_ttrpc; diff --git a/example/protocols/protos/streaming.proto b/example/protocols/protos/streaming.proto new file mode 100644 index 00000000..fce29dd6 --- /dev/null +++ b/example/protocols/protos/streaming.proto @@ -0,0 +1,49 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +syntax = "proto3"; + +package ttrpc.test.streaming; + +import "google/protobuf/empty.proto"; + +// Shim service is launched for each container and is responsible for owning the IO +// for the container and its additional processes. The shim is also the parent of +// each container and allows reattaching to the IO and receiving the exit status +// for the container processes. + +service Streaming { + rpc Echo(EchoPayload) returns (EchoPayload); + rpc EchoStream(stream EchoPayload) returns (stream EchoPayload); + rpc SumStream(stream Part) returns (Sum); + rpc DivideStream(Sum) returns (stream Part); + rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty); + rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty); +} + +message EchoPayload { + uint32 seq = 1; + string msg = 2; +} + +message Part { + int32 add = 1; +} + +message Sum { + int32 sum = 1; + int32 num = 2; +} diff --git a/ttrpc-codegen/Cargo.toml b/ttrpc-codegen/Cargo.toml index fc6eb32b..2805bcf7 100644 --- a/ttrpc-codegen/Cargo.toml +++ b/ttrpc-codegen/Cargo.toml @@ -16,4 +16,4 @@ readme = "README.md" protobuf = { version = "2.14.0" } protobuf-codegen-pure = "2.14.0" protobuf-codegen = "2.14.0" -ttrpc-compiler = "0.5.0" +ttrpc-compiler = { version = "0.5.0", path = "../compiler" }