Skip to content

Commit

Permalink
examples: Add example for server_send_stream
Browse files Browse the repository at this point in the history
Add example for server_send_stream to show server how to use stream send
data to client.

Signed-off-by: Hui Zhu <[email protected]>
  • Loading branch information
teawater committed Jan 9, 2025

Verified

This commit was signed with the committer’s verified signature. The key has expired.
tvdeyen Thomas von Deyen
1 parent 4a493e1 commit 7ffb2c1
Showing 3 changed files with 40 additions and 2 deletions.
22 changes: 20 additions & 2 deletions example/async-stream-client.rs
Original file line number Diff line number Diff line change
@@ -44,9 +44,12 @@ async fn main() {
let sc1 = sc.clone();
let t6 = tokio::spawn(echo_null_stream(sc1));

let t7 = tokio::spawn(echo_default_value(sc));
let sc1 = sc.clone();
let t7 = tokio::spawn(echo_default_value(sc1));

let t8 = tokio::spawn(server_send_stream(sc));

let _ = tokio::join!(t1, t2, t3, t4, t5, t6, t7);
let _ = tokio::join!(t1, t2, t3, t4, t5, t6, t7, t8);
}

fn default_ctx() -> Context {
@@ -201,3 +204,18 @@ async fn echo_default_value(cli: streaming_ttrpc::StreamingClient) {
assert_eq!(received.seq, 0);
assert_eq!(received.msg, "");
}

#[cfg(unix)]
async fn server_send_stream(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli
.server_send_stream(default_ctx(), &Default::default())
.await
.unwrap();

let mut seq = 0;
while let Some(received) = stream.recv().await.unwrap() {
assert_eq!(received.seq, seq);
assert_eq!(received.msg, "hello");
seq += 1;
}
}
19 changes: 19 additions & 0 deletions example/async-stream-server.rs
Original file line number Diff line number Diff line change
@@ -152,6 +152,25 @@ impl streaming_ttrpc::Streaming for StreamingService {

Ok(())
}

async fn server_send_stream(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
_: empty::Empty,
s: ::ttrpc::r#async::ServerStreamSender<streaming::EchoPayload>,
) -> ::ttrpc::Result<()> {
let mut seq = 0;
while seq < 10 {
sleep(std::time::Duration::from_secs(1)).await;
let mut e = streaming::EchoPayload::new();
e.seq = seq;
e.msg = format!("hello");
s.send(&e).await.unwrap();
seq += 1;
}

Ok(())
}
}

#[cfg(windows)]
1 change: 1 addition & 0 deletions example/protocols/protos/streaming.proto
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ service Streaming {
rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty);
rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty);
rpc EchoDefaultValue(EchoPayload) returns (stream EchoPayload);
rpc ServerSendStream(google.protobuf.Empty) returns (stream EchoPayload);
}

message EchoPayload {

0 comments on commit 7ffb2c1

Please sign in to comment.