Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming support. #146

Merged
merged 15 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
315 changes: 193 additions & 122 deletions compiler/src/codegen.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ unlicensed = "deny"
allow = [
"MIT",
"Apache-2.0",
"Unicode-DFS-2016",
#"Apache-2.0 WITH LLVM-exception",
]
# List of explictly disallowed licenses
Expand Down
9 changes: 9 additions & 0 deletions example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ttrpc = { path = "../", features = ["async"] }
ctrlc = { version = "3.0", features = ["termination"] }
tokio = { version = "1.0.1", features = ["signal", "time"] }
async-trait = "0.1.42"
rand = "0.8.5"


[[example]]
Expand All @@ -40,5 +41,13 @@ path = "./async-server.rs"
name = "async-client"
path = "./async-client.rs"

[[example]]
name = "async-stream-server"
path = "./async-stream-server.rs"

[[example]]
name = "async-stream-client"
path = "./async-stream-client.rs"

[build-dependencies]
ttrpc-codegen = { path = "../ttrpc-codegen"}
2 changes: 2 additions & 0 deletions example/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ build:
cargo build --example client
cargo build --example async-server
cargo build --example async-client
cargo build --example async-stream-server
cargo build --example async-stream-client

.PHONY: deps
deps:
Expand Down
174 changes: 174 additions & 0 deletions example/async-stream-client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright 2022 Alibaba Cloud. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

mod protocols;
mod utils;

use protocols::r#async::{empty, streaming, streaming_ttrpc};
use ttrpc::context::{self, Context};
use ttrpc::r#async::Client;

#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logging::log_to_stderr(log::LevelFilter::Info);

let c = Client::connect(utils::SOCK_ADDR).unwrap();
let sc = streaming_ttrpc::StreamingClient::new(c);

let _now = std::time::Instant::now();

let sc1 = sc.clone();
let t1 = tokio::spawn(echo_request(sc1));

let sc1 = sc.clone();
let t2 = tokio::spawn(echo_stream(sc1));

let sc1 = sc.clone();
let t3 = tokio::spawn(sum_stream(sc1));

let sc1 = sc.clone();
let t4 = tokio::spawn(divide_stream(sc1));

let sc1 = sc.clone();
let t5 = tokio::spawn(echo_null(sc1));

let t6 = tokio::spawn(echo_null_stream(sc));

let _ = tokio::join!(t1, t2, t3, t4, t5, t6);
}

fn default_ctx() -> Context {
let mut ctx = context::with_timeout(0);
ctx.add("key-1".to_string(), "value-1-1".to_string());
ctx.add("key-1".to_string(), "value-1-2".to_string());
ctx.set("key-2".to_string(), vec!["value-2".to_string()]);

ctx
}

async fn echo_request(cli: streaming_ttrpc::StreamingClient) {
let echo1 = streaming::EchoPayload {
seq: 1,
msg: "Echo Me".to_string(),
..Default::default()
};
let resp = cli.echo(default_ctx(), &echo1).await.unwrap();
assert_eq!(resp.msg, echo1.msg);
assert_eq!(resp.seq, echo1.seq + 1);
}

async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.echo_stream(default_ctx()).await.unwrap();

let mut i = 0;
while i < 100 {
let echo = streaming::EchoPayload {
seq: i as u32,
msg: format!("{}: Echo in a stream", i),
..Default::default()
};
stream.send(&echo).await.unwrap();
let resp = stream.recv().await.unwrap();
assert_eq!(resp.msg, echo.msg);
assert_eq!(resp.seq, echo.seq + 1);

i += 2;
}
stream.close_send().await.unwrap();
let ret = stream.recv().await;
assert!(matches!(ret, Err(ttrpc::Error::Eof)));
}

async fn sum_stream(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.sum_stream(default_ctx()).await.unwrap();

let mut sum = streaming::Sum::new();
stream.send(&streaming::Part::new()).await.unwrap();

sum.num += 1;
let mut i = -99i32;
while i <= 100 {
let addi = streaming::Part {
add: i,
..Default::default()
};
stream.send(&addi).await.unwrap();
sum.sum += i;
sum.num += 1;

i += 1;
}
stream.send(&streaming::Part::new()).await.unwrap();
sum.num += 1;

let ssum = stream.close_and_recv().await.unwrap();
assert_eq!(ssum.sum, sum.sum);
assert_eq!(ssum.num, sum.num);
}

async fn divide_stream(cli: streaming_ttrpc::StreamingClient) {
let expected = streaming::Sum {
sum: 392,
num: 4,
..Default::default()
};
let mut stream = cli.divide_stream(default_ctx(), &expected).await.unwrap();

let mut actual = streaming::Sum::new();

// NOTE: `for part in stream.recv().await.unwrap()` can't work.
while let Some(part) = stream.recv().await.unwrap() {
actual.sum += part.add;
actual.num += 1;
}
assert_eq!(actual.sum, expected.sum);
assert_eq!(actual.num, expected.num);
}

async fn echo_null(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.echo_null(default_ctx()).await.unwrap();

for i in 0..100 {
let echo = streaming::EchoPayload {
seq: i as u32,
msg: "non-empty empty".to_string(),
..Default::default()
};
stream.send(&echo).await.unwrap();
}
let res = stream.close_and_recv().await.unwrap();
assert_eq!(res, empty::Empty::new());
}

async fn echo_null_stream(cli: streaming_ttrpc::StreamingClient) {
let stream = cli.echo_null_stream(default_ctx()).await.unwrap();

let (tx, mut rx) = stream.split();

let task = tokio::spawn(async move {
loop {
let ret = rx.recv().await;
if matches!(ret, Err(ttrpc::Error::Eof)) {
break;
}
}
});

for i in 0..100 {
let echo = streaming::EchoPayload {
seq: i as u32,
msg: "non-empty empty".to_string(),
..Default::default()
};
tx.send(&echo).await.unwrap();
}

tx.close_send().await.unwrap();

tokio::time::timeout(tokio::time::Duration::from_secs(10), task)
.await
.unwrap()
.unwrap();
}
170 changes: 170 additions & 0 deletions example/async-stream-server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2022 Alibaba Cloud. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

mod protocols;
mod utils;

use std::sync::Arc;

use log::{info, LevelFilter};

use protocols::r#async::{empty, streaming, streaming_ttrpc};
use ttrpc::asynchronous::Server;

use async_trait::async_trait;
use tokio::signal::unix::{signal, SignalKind};
use tokio::time::sleep;

struct StreamingService;

#[async_trait]
impl streaming_ttrpc::Streaming for StreamingService {
async fn echo(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
mut e: streaming::EchoPayload,
) -> ::ttrpc::Result<streaming::EchoPayload> {
e.seq += 1;
Ok(e)
}

async fn echo_stream(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
mut s: ::ttrpc::r#async::ServerStream<streaming::EchoPayload, streaming::EchoPayload>,
) -> ::ttrpc::Result<()> {
while let Some(mut e) = s.recv().await? {
e.seq += 1;
s.send(&e).await?;
}

Ok(())
}

async fn sum_stream(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
mut s: ::ttrpc::r#async::ServerStreamReceiver<streaming::Part>,
) -> ::ttrpc::Result<streaming::Sum> {
let mut sum = streaming::Sum::new();
while let Some(part) = s.recv().await? {
sum.sum += part.add;
sum.num += 1;
}

Ok(sum)
}

async fn divide_stream(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
sum: streaming::Sum,
s: ::ttrpc::r#async::ServerStreamSender<streaming::Part>,
) -> ::ttrpc::Result<()> {
let mut parts = vec![streaming::Part::new(); sum.num as usize];

let mut total = 0i32;
for i in 1..(sum.num - 2) {
let add = (rand::random::<u32>() % 1000) as i32 - 500;
parts[i as usize].add = add;
total += add;
}

parts[sum.num as usize - 2].add = sum.sum - total;

for part in parts {
s.send(&part).await.unwrap();
}

Ok(())
}

async fn echo_null(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
mut s: ::ttrpc::r#async::ServerStreamReceiver<streaming::EchoPayload>,
) -> ::ttrpc::Result<empty::Empty> {
let mut seq = 0;
while let Some(e) = s.recv().await? {
assert_eq!(e.seq, seq);
assert_eq!(e.msg.as_str(), "non-empty empty");
seq += 1;
}
Ok(empty::Empty::new())
}

async fn echo_null_stream(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
s: ::ttrpc::r#async::ServerStream<empty::Empty, streaming::EchoPayload>,
) -> ::ttrpc::Result<()> {
let msg = "non-empty empty".to_string();

let mut tasks = Vec::new();

let (tx, mut rx) = s.split();
let mut seq = 0u32;
while let Some(e) = rx.recv().await? {
assert_eq!(e.seq, seq);
assert_eq!(e.msg, msg);
seq += 1;

for _i in 0..10 {
let tx = tx.clone();
tasks.push(tokio::spawn(
async move { tx.send(&empty::Empty::new()).await },
));
}
}

for t in tasks {
t.await.unwrap().map_err(|e| {
::ttrpc::Error::RpcStatus(::ttrpc::get_status(
::ttrpc::Code::UNKNOWN,
e.to_string(),
))
})?;
}
Ok(())
}
}

#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logging::log_to_stderr(LevelFilter::Info);

let s = Box::new(StreamingService {}) as Box<dyn streaming_ttrpc::Streaming + Send + Sync>;
let s = Arc::new(s);
let service = streaming_ttrpc::create_streaming(s);

utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap();

let mut server = Server::new()
.bind(utils::SOCK_ADDR)
.unwrap()
.register_service(service);

let mut hangup = signal(SignalKind::hangup()).unwrap();
let mut interrupt = signal(SignalKind::interrupt()).unwrap();
server.start().await.unwrap();

tokio::select! {
_ = hangup.recv() => {
// test stop_listen -> start
info!("stop listen");
server.stop_listen().await;
info!("start listen");
server.start().await.unwrap();

// hold some time for the new test connection.
sleep(std::time::Duration::from_secs(100)).await;
}
_ = interrupt.recv() => {
// test graceful shutdown
info!("graceful shutdown");
server.shutdown().await.unwrap();
}
};
}
Loading