From 092d7682ec5624c1745ca8b0ce2c2ed909fc8303 Mon Sep 17 00:00:00 2001 From: Alexandru Matei Date: Thu, 11 Apr 2024 12:42:11 +0300 Subject: [PATCH 01/11] fix hang in wait_all_exit self.wait() without await returns a Future, so the notified() object is not created until we await on the returned future. That means notify_waiters() can be called before notified() is. This leads to notified() waiting forever because notify_waiters is called only once, when the last waiter is dropped. notify_waiters() and notified() form a happens-before relationship. There are two possible scenarios: 1. If notified() comes before notify_waiters() this means we can safely await on notified(). 2. If notified() comes after notify_waiters() this means that what happened before it is visible in the notified() thread. Waiting on notified() at this point will block but we can check for waiters count, which is guaranteed to be 0 because it was set before notify_waiters() call. Let's move notified() call before checking that the number of waiters is 0. Signed-off-by: Alexandru Matei --- src/asynchronous/shutdown.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/asynchronous/shutdown.rs b/src/asynchronous/shutdown.rs index 6ea6a92c..4d883c35 100644 --- a/src/asynchronous/shutdown.rs +++ b/src/asynchronous/shutdown.rs @@ -144,24 +144,25 @@ impl Notifier { /// Wait for all [`Waiter`]s to drop. pub async fn wait_all_exit(&self) -> Result<(), Elapsed> { //debug_assert!(self.shared.is_shutdown()); - if self.waiters() == 0 { - return Ok(()); - } - let wait = self.wait(); - if self.waiters() == 0 { - return Ok(()); - } - wait.await - } - - async fn wait(&self) -> Result<(), Elapsed> { if let Some(tm) = self.wait_time { - timeout(tm, self.shared.notify_exit.notified()).await + timeout(tm, self.wait()).await } else { - self.shared.notify_exit.notified().await; + self.wait().await; Ok(()) } } + + async fn wait(&self) { + while self.waiters() > 0 { + let notified = self.shared.notify_exit.notified(); + if self.waiters() == 0 { + return; + } + notified.await; + // Some waiters could have been created in the meantime + // by calling `subscribe`, loop again + } + } } impl Drop for Notifier { From 59c3b3e706fc2f98f577492e4d7edfab8b3945f5 Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Thu, 7 Mar 2024 20:01:44 +0800 Subject: [PATCH 02/11] fix timing issue of streaming the stream request and data is handle asynchronously, if the data is handled when stream is not created yet, the data will be discarded and an error occur. Signed-off-by: Abel --- src/asynchronous/server.rs | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/asynchronous/server.rs b/src/asynchronous/server.rs index 26c49f2c..9158e648 100644 --- a/src/asynchronous/server.rs +++ b/src/asynchronous/server.rs @@ -381,12 +381,14 @@ impl ReaderDelegate for ServerReader { async fn handle_msg(&self, msg: GenMessage) { let handler_shutdown_waiter = self.handler_shutdown.subscribe(); let context = self.context(); + let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>(); spawn(async move { select! { - _ = context.handle_msg(msg) => {} + _ = context.handle_msg(msg, wait_tx) => {} _ = handler_shutdown_waiter.wait_shutdown() => {} } }); + wait_rx.await.unwrap_or_default(); } async fn handle_err(&self, header: MessageHeader, e: Error) { @@ -424,7 +426,7 @@ impl HandlerContext { }) .ok(); } - async fn handle_msg(&self, msg: GenMessage) { + async fn handle_msg(&self, msg: GenMessage, wait_tx: tokio::sync::oneshot::Sender<()>) { let stream_id = msg.header.stream_id; if (stream_id % 2) != 1 { @@ -438,7 +440,7 @@ impl HandlerContext { } match msg.header.type_ { - MESSAGE_TYPE_REQUEST => match self.handle_request(msg).await { + MESSAGE_TYPE_REQUEST => match self.handle_request(msg, wait_tx).await { Ok(opt_msg) => match opt_msg { Some(mut resp) => { // Server: check size before sending to client @@ -471,6 +473,8 @@ impl HandlerContext { Err(status) => Self::respond_with_status(self.tx.clone(), stream_id, status).await, }, MESSAGE_TYPE_DATA => { + // no need to wait data message handling + drop(wait_tx); // TODO(wllenyj): Compatible with golang behavior. if (msg.header.flags & FLAG_REMOTE_CLOSED) == FLAG_REMOTE_CLOSED && !msg.payload.is_empty() @@ -518,7 +522,11 @@ impl HandlerContext { } } - async fn handle_request(&self, msg: GenMessage) -> StdResult, Status> { + async fn handle_request( + &self, + msg: GenMessage, + wait_tx: tokio::sync::oneshot::Sender<()>, + ) -> StdResult, Status> { //TODO: //if header.stream_id <= self.last_stream_id { // return Err; @@ -539,10 +547,11 @@ impl HandlerContext { })?; if let Some(method) = srv.get_method(&req.method) { + drop(wait_tx); return self.handle_method(method, req_msg).await; } if let Some(stream) = srv.get_stream(&req.method) { - return self.handle_stream(stream, req_msg).await; + return self.handle_stream(stream, req_msg, wait_tx).await; } Err(get_status( Code::UNIMPLEMENTED, @@ -598,6 +607,7 @@ impl HandlerContext { &self, stream: Arc, req_msg: Message, + wait_tx: tokio::sync::oneshot::Sender<()>, ) -> StdResult, Status> { let stream_id = req_msg.header.stream_id; let req = req_msg.payload; @@ -609,6 +619,8 @@ impl HandlerContext { let no_data = (req_msg.header.flags & FLAG_NO_DATA) == FLAG_NO_DATA; + drop(wait_tx); + let si = StreamInner::new( stream_id, self.tx.clone(), From 3355f7d2c487634dd891a3425cee088b6cf06df0 Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Thu, 14 Mar 2024 12:07:24 +0800 Subject: [PATCH 03/11] add channel to get send result Currently the send() method of stream implemented by send the value to an unbounded channel, so even the connection is closed for a long time, the send function still return succeed. This commit adds a channel to the message so that we can wait until the message is truely written to the connection. Signed-off-by: Abel Feng --- src/asynchronous/client.rs | 8 ++++--- src/asynchronous/connection.rs | 14 ++++++++---- src/asynchronous/server.rs | 8 +++---- src/asynchronous/stream.rs | 42 ++++++++++++++++++++++++++++++---- 4 files changed, 56 insertions(+), 16 deletions(-) diff --git a/src/asynchronous/client.rs b/src/asynchronous/client.rs index ed2ee3a8..80d4e7b0 100644 --- a/src/asynchronous/client.rs +++ b/src/asynchronous/client.rs @@ -27,6 +27,8 @@ use crate::r#async::stream::{ }; use crate::r#async::utils; +use super::stream::SendingMessage; + /// A ttrpc Client (async). #[derive(Clone)] pub struct Client { @@ -78,7 +80,7 @@ impl Client { self.streams.lock().unwrap().insert(stream_id, tx); self.req_tx - .send(msg) + .send(SendingMessage::new(msg)) .await .map_err(|e| Error::Others(format!("Send packet to sender error {e:?}")))?; @@ -139,7 +141,7 @@ impl Client { // TODO: check return self.streams.lock().unwrap().insert(stream_id, tx); self.req_tx - .send(msg) + .send(SendingMessage::new(msg)) .await .map_err(|e| Error::Others(format!("Send packet to sender error {e:?}")))?; @@ -204,7 +206,7 @@ struct ClientWriter { #[async_trait] impl WriterDelegate for ClientWriter { - async fn recv(&mut self) -> Option { + async fn recv(&mut self) -> Option { self.rx.recv().await } diff --git a/src/asynchronous/connection.rs b/src/asynchronous/connection.rs index 6372b25f..3ea062c4 100644 --- a/src/asynchronous/connection.rs +++ b/src/asynchronous/connection.rs @@ -16,6 +16,8 @@ use tokio::{ use crate::error::Error; use crate::proto::{GenMessage, GenMessageError, MessageHeader}; +use super::stream::SendingMessage; + pub trait Builder { type Reader; type Writer; @@ -25,7 +27,7 @@ pub trait Builder { #[async_trait] pub trait WriterDelegate { - async fn recv(&mut self) -> Option; + async fn recv(&mut self) -> Option; async fn disconnect(&self, msg: &GenMessage, e: Error); async fn exit(&self); } @@ -58,12 +60,14 @@ where let (reader_delegate, mut writer_delegate) = builder.build(); let writer_task = tokio::spawn(async move { - while let Some(msg) = writer_delegate.recv().await { - trace!("write message: {:?}", msg); - if let Err(e) = msg.write_to(&mut writer).await { + while let Some(mut sending_msg) = writer_delegate.recv().await { + trace!("write message: {:?}", sending_msg.msg); + if let Err(e) = sending_msg.msg.write_to(&mut writer).await { error!("write_message got error: {:?}", e); - writer_delegate.disconnect(&msg, e).await; + sending_msg.send_result(Err(e.clone())); + writer_delegate.disconnect(&sending_msg.msg, e).await; } + sending_msg.send_result(Ok(())); } writer_delegate.exit().await; trace!("Writer task exit."); diff --git a/src/asynchronous/server.rs b/src/asynchronous/server.rs index 26c49f2c..09348314 100644 --- a/src/asynchronous/server.rs +++ b/src/asynchronous/server.rs @@ -31,7 +31,7 @@ use tokio::{ #[cfg(any(target_os = "linux", target_os = "android"))] use tokio_vsock::VsockListener; -use crate::asynchronous::unix_incoming::UnixIncoming; +use crate::asynchronous::{stream::SendingMessage, unix_incoming::UnixIncoming}; use crate::common::{self, Domain}; use crate::context; use crate::error::{get_status, Error, Result}; @@ -339,7 +339,7 @@ struct ServerWriter { #[async_trait] impl WriterDelegate for ServerWriter { - async fn recv(&mut self) -> Option { + async fn recv(&mut self) -> Option { self.rx.recv().await } async fn disconnect(&self, _msg: &GenMessage, _: Error) {} @@ -462,7 +462,7 @@ impl HandlerContext { }; self.tx - .send(msg) + .send(SendingMessage::new(msg)) .await .map_err(err_to_others_err!(e, "Send packet to sender error ")) .ok(); @@ -652,7 +652,7 @@ impl HandlerContext { header: MessageHeader::new_response(stream_id, payload.len() as u32), payload, }; - tx.send(msg) + tx.send(SendingMessage::new(msg)) .await .map_err(err_to_others_err!(e, "Send packet to sender error ")) } diff --git a/src/asynchronous/stream.rs b/src/asynchronous/stream.rs index d3db18d6..5172ddb4 100644 --- a/src/asynchronous/stream.rs +++ b/src/asynchronous/stream.rs @@ -17,12 +17,42 @@ use crate::proto::{ MESSAGE_TYPE_DATA, MESSAGE_TYPE_RESPONSE, }; -pub type MessageSender = mpsc::Sender; -pub type MessageReceiver = mpsc::Receiver; +pub type MessageSender = mpsc::Sender; +pub type MessageReceiver = mpsc::Receiver; pub type ResultSender = mpsc::Sender>; pub type ResultReceiver = mpsc::Receiver>; +#[derive(Debug)] +pub struct SendingMessage { + pub msg: GenMessage, + pub result_chan: Option>>, +} + +impl SendingMessage { + pub fn new(msg: GenMessage) -> Self { + Self { + msg, + result_chan: None, + } + } + pub fn new_with_result( + msg: GenMessage, + result_chan: tokio::sync::oneshot::Sender>, + ) -> Self { + Self { + msg, + result_chan: Some(result_chan), + } + } + + pub fn send_result(&mut self, result: Result<()>) { + if let Some(result_ch) = self.result_chan.take() { + result_ch.send(result).unwrap_or_default(); + } + } +} + #[derive(Debug)] pub struct ClientStream { tx: CSSender, @@ -317,9 +347,13 @@ async fn _recv(rx: &mut ResultReceiver) -> Result { } async fn _send(tx: &MessageSender, msg: GenMessage) -> Result<()> { - tx.send(msg) + let (res_tx, res_rx) = tokio::sync::oneshot::channel(); + tx.send(SendingMessage::new_with_result(msg, res_tx)) + .await + .map_err(|e| Error::Others(format!("Send data packet to sender error {:?}", e)))?; + res_rx .await - .map_err(|e| Error::Others(format!("Send data packet to sender error {e:?}"))) + .map_err(|e| Error::Others(format!("Failed to wait send result {:?}", e)))? } #[derive(Clone, Copy, Debug, PartialEq, Eq)] From 1f2a266d9ed7d8c1a5e07d1f34cd47560590ab99 Mon Sep 17 00:00:00 2001 From: Xynnn007 Date: Mon, 19 Aug 2024 17:09:04 +0800 Subject: [PATCH 04/11] Codegen: convert Arc> to Arc This commit changes the generated ttrpc server from Arc> to Arc. This helps the type conversion and also avoids extra runtime cost caused by double pointer. Fixes #234 Signed-off-by: Xynnn007 --- compiler/src/codegen.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/compiler/src/codegen.rs b/compiler/src/codegen.rs index 382f5ae7..853fa757 100644 --- a/compiler/src/codegen.rs +++ b/compiler/src/codegen.rs @@ -145,7 +145,7 @@ impl<'a> MethodGen<'a> { "}", |w| { w.write_line(&format!( - "service: Arc>,", + "service: Arc,", self.service_name )); }, @@ -558,7 +558,7 @@ impl<'a> ServiceGen<'a> { fn write_sync_server_create(&self, w: &mut CodeWriter) { let method_handler_name = "::ttrpc::MethodHandler"; let s = format!( - "create_{}(service: Arc>) -> HashMap>", + "create_{}(service: Arc) -> HashMap>", to_snake_case(&self.service_name()), self.service_name(), method_handler_name, @@ -577,7 +577,7 @@ impl<'a> ServiceGen<'a> { fn write_async_server_create(&self, w: &mut CodeWriter) { let s = format!( - "create_{}(service: Arc>) -> HashMap", + "create_{}(service: Arc) -> HashMap", to_snake_case(&self.service_name()), self.service_name(), "::ttrpc::r#async::Service" @@ -642,7 +642,6 @@ fn write_generated_common(w: &mut CodeWriter) { w.write_line("#![cfg_attr(rustfmt, rustfmt_skip)]"); w.write_line("#![allow(unknown_lints)]"); w.write_line("#![allow(clipto_camel_casepy)]"); - w.write_line("#![allow(box_pointers)]"); w.write_line("#![allow(dead_code)]"); w.write_line("#![allow(missing_docs)]"); w.write_line("#![allow(non_camel_case_types)]"); From 37f4a2a5ebede50e67a9bb161a21d5c49d908bf6 Mon Sep 17 00:00:00 2001 From: Quanwei Zhou Date: Mon, 23 Sep 2024 14:54:34 +0800 Subject: [PATCH 05/11] server: fix server exit once a accept failed If the Accept error occurs, an error can be output to ensure that the subsequent connect can be accepted normally. Fixes: #239 Signed-off-by: Quanwei Zhou --- src/sync/server.rs | 8 ++++++-- src/sync/sys/unix/net.rs | 12 +----------- src/sync/sys/windows/net.rs | 17 ++++------------- 3 files changed, 11 insertions(+), 26 deletions(-) diff --git a/src/sync/server.rs b/src/sync/server.rs index b562996a..00cfb95b 100644 --- a/src/sync/server.rs +++ b/src/sync/server.rs @@ -358,7 +358,11 @@ impl Server { .spawn(move || { loop { trace!("listening..."); - let pipe_connection = match listener.accept(&listener_quit_flag) { + if listener_quit_flag.load(Ordering::SeqCst) { + info!("listener shutdown for quit flag"); + break; + } + let pipe_connection = match listener.accept() { Ok(None) => { continue; } @@ -369,7 +373,7 @@ impl Server { } Err(e) => { error!("listener accept got {:?}", e); - break; + continue; } }; diff --git a/src/sync/sys/unix/net.rs b/src/sync/sys/unix/net.rs index 11155d3a..bf109bad 100644 --- a/src/sync/sys/unix/net.rs +++ b/src/sync/sys/unix/net.rs @@ -21,8 +21,6 @@ use std::os::unix::prelude::AsRawFd; use nix::Error; use nix::unistd::*; -use std::sync::{Arc}; -use std::sync::atomic::{AtomicBool, Ordering}; use crate::common::{self, client_connect, SOCK_CLOEXEC}; #[cfg(target_os = "macos")] use crate::common::set_fd_close_exec; @@ -84,11 +82,7 @@ impl PipeListener { // - Ok(Some(PipeConnection)) if a new connection is established // - Ok(None) if spurious wake up with no new connection // - Err(io::Error) if there is an error and listener loop should be shutdown - pub(crate) fn accept( &self, quit_flag: &Arc) -> std::result::Result, io::Error> { - if quit_flag.load(Ordering::SeqCst) { - return Err(io::Error::new(io::ErrorKind::Other, "listener shutdown for quit flag")); - } - + pub(crate) fn accept(&self) -> std::result::Result, io::Error> { let mut pollers = vec![ libc::pollfd { fd: self.monitor_fd.0, @@ -127,10 +121,6 @@ impl PipeListener { return Ok(None); } - if quit_flag.load(Ordering::SeqCst) { - return Err(io::Error::new(io::ErrorKind::Other, "listener shutdown for quit flag")); - } - #[cfg(any(target_os = "linux", target_os = "android"))] let fd = match accept4(self.fd, SockFlag::SOCK_CLOEXEC) { Ok(fd) => fd, diff --git a/src/sync/sys/windows/net.rs b/src/sync/sys/windows/net.rs index 87fcd002..cbbecb45 100644 --- a/src/sync/sys/windows/net.rs +++ b/src/sync/sys/windows/net.rs @@ -23,7 +23,6 @@ use std::os::windows::ffi::OsStrExt; use std::os::windows::fs::OpenOptionsExt; use std::os::windows::io::{IntoRawHandle}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc}; use std::{io}; use windows_sys::Win32::Foundation::{ CloseHandle, ERROR_IO_PENDING, ERROR_PIPE_CONNECTED, INVALID_HANDLE_VALUE }; @@ -75,14 +74,7 @@ impl PipeListener { // accept returns: // - Ok(Some(PipeConnection)) if a new connection is established // - Err(io::Error) if there is an error and listener loop should be shutdown - pub(crate) fn accept(&self, quit_flag: &Arc) -> std::result::Result, io::Error> { - if quit_flag.load(Ordering::SeqCst) { - return Err(io::Error::new( - io::ErrorKind::Other, - "listener shutdown for quit flag", - )); - } - + pub(crate) fn accept(&self) -> std::result::Result, io::Error> { // Create a new pipe instance for every new client let instance = self.new_instance()?; let np = match PipeConnection::new(instance) { @@ -376,6 +368,7 @@ fn handle_windows_error(e: io::Error) -> Error { #[cfg(test)] mod test { use super::*; + use std::sync::Arc; use windows_sys::Win32::Foundation::ERROR_FILE_NOT_FOUND; #[test] @@ -398,8 +391,7 @@ mod test { let listener_server = listener.clone(); let thread = std::thread::spawn(move || { - let quit_flag = Arc::new(AtomicBool::new(false)); - match listener_server.accept(&quit_flag) { + match listener_server.accept() { Ok(Some(_)) => { // pipe is working } @@ -422,8 +414,7 @@ mod test { let listener_server = listener.clone(); let thread = std::thread::spawn(move || { - let quit_flag = Arc::new(AtomicBool::new(false)); - match listener_server.accept(&quit_flag) { + match listener_server.accept() { Ok(_) => { panic!("should not get pipe on close") } From c3889f6c3bfd9b326f16cd035cc02306f800d442 Mon Sep 17 00:00:00 2001 From: Tim Zhang Date: Tue, 24 Sep 2024 16:56:45 +0800 Subject: [PATCH 06/11] gh-action: Fix Codecov Add token for Codecov to fixe problem: "Rate limit reached. Please upload with the Codecov repository upload token to resolve issue" Signed-off-by: Tim Zhang --- .github/workflows/cov.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/cov.yml b/.github/workflows/cov.yml index 9f69dd93..53eb524d 100644 --- a/.github/workflows/cov.yml +++ b/.github/workflows/cov.yml @@ -16,7 +16,9 @@ jobs: - name: Generate code coverage run: cargo llvm-cov --all-features --lcov --output-path lcov.info - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} with: files: lcov.info fail_ci_if_error: true From dc6b69751d0f6a4d367268aad4974c35d639c5e3 Mon Sep 17 00:00:00 2001 From: Tim Zhang Date: Tue, 24 Sep 2024 18:07:05 +0800 Subject: [PATCH 07/11] Update timeout to infinite for poll of accept It should be a mistake to modify it from -1 to 10 in the PR #226 https://github.com/containerd/ttrpc-rust/pull/226/files#diff-e74ddb472174f24fb4713f5a2fe2d33bbc5db28ee2a5c7dad1ea9025b897e8a5R110 There already are monitor_fd to notify exit, so accept without timeout is safe. Signed-off-by: Tim Zhang --- src/sync/sys/unix/net.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sync/sys/unix/net.rs b/src/sync/sys/unix/net.rs index bf109bad..3deadf41 100644 --- a/src/sync/sys/unix/net.rs +++ b/src/sync/sys/unix/net.rs @@ -101,7 +101,7 @@ impl PipeListener { libc::poll( pollers as *mut _ as *mut libc::pollfd, pollers.len() as _, - POLL_MAX_TIME, + -1, ) }; From 444a4906eba9eb64d448bb085be49479da9744ba Mon Sep 17 00:00:00 2001 From: Tim Zhang Date: Wed, 25 Sep 2024 12:18:45 +0800 Subject: [PATCH 08/11] example: update to match new ttrpc-compiler update for commit 1f2a26 Signed-off-by: Tim Zhang --- example/Cargo.toml | 4 ++++ example/async-client.rs | 2 +- example/async-server.rs | 11 +++-------- example/async-stream-client.rs | 2 +- example/async-stream-server.rs | 8 ++------ example/protocols/mod.rs | 2 -- example/server.rs | 10 ++-------- 7 files changed, 13 insertions(+), 26 deletions(-) diff --git a/example/Cargo.toml b/example/Cargo.toml index 47094e6b..64116403 100644 --- a/example/Cargo.toml +++ b/example/Cargo.toml @@ -51,3 +51,7 @@ path = "./async-stream-client.rs" [build-dependencies] ttrpc-codegen = { path = "../ttrpc-codegen"} + +[patch.crates-io] +ttrpc-compiler = { path = "../compiler"} + diff --git a/example/async-client.rs b/example/async-client.rs index 4c4b7e9f..c298cbd2 100644 --- a/example/async-client.rs +++ b/example/async-client.rs @@ -6,7 +6,7 @@ mod protocols; mod utils; #[cfg(unix)] -use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc}; +use protocols::asynchronous::{agent, agent_ttrpc, health, health_ttrpc}; use ttrpc::context::{self, Context}; #[cfg(unix)] use ttrpc::r#async::Client; diff --git a/example/async-server.rs b/example/async-server.rs index 1ee2fb79..074d8274 100644 --- a/example/async-server.rs +++ b/example/async-server.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use log::LevelFilter; #[cfg(unix)] -use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc, types}; +use protocols::asynchronous::{agent, agent_ttrpc, health, health_ttrpc, types}; #[cfg(unix)] use ttrpc::asynchronous::Server; use ttrpc::error::{Error, Result}; @@ -97,13 +97,8 @@ fn main() { async fn main() { simple_logging::log_to_stderr(LevelFilter::Trace); - let h = Box::new(HealthService {}) as Box; - let h = Arc::new(h); - let hservice = health_ttrpc::create_health(h); - - let a = Box::new(AgentService {}) as Box; - let a = Arc::new(a); - let aservice = agent_ttrpc::create_agent_service(a); + let hservice = health_ttrpc::create_health(Arc::new(HealthService {})); + let aservice = agent_ttrpc::create_agent_service(Arc::new(AgentService {})); utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap(); diff --git a/example/async-stream-client.rs b/example/async-stream-client.rs index 6186de39..04f8c7b6 100644 --- a/example/async-stream-client.rs +++ b/example/async-stream-client.rs @@ -6,7 +6,7 @@ mod protocols; mod utils; #[cfg(unix)] -use protocols::r#async::{empty, streaming, streaming_ttrpc}; +use protocols::asynchronous::{empty, streaming, streaming_ttrpc}; use ttrpc::context::{self, Context}; #[cfg(unix)] use ttrpc::r#async::Client; diff --git a/example/async-stream-server.rs b/example/async-stream-server.rs index 250a5ff5..4dcba884 100644 --- a/example/async-stream-server.rs +++ b/example/async-stream-server.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use log::{info, LevelFilter}; #[cfg(unix)] -use protocols::r#async::{empty, streaming, streaming_ttrpc}; +use protocols::asynchronous::{empty, streaming, streaming_ttrpc}; #[cfg(unix)] use ttrpc::{asynchronous::Server, Error}; @@ -163,11 +163,7 @@ fn main() { #[tokio::main(flavor = "current_thread")] async fn main() { simple_logging::log_to_stderr(LevelFilter::Info); - - let s = Box::new(StreamingService {}) as Box; - let s = Arc::new(s); - let service = streaming_ttrpc::create_streaming(s); - + let service = streaming_ttrpc::create_streaming(Arc::new(StreamingService {})); utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap(); let mut server = Server::new() diff --git a/example/protocols/mod.rs b/example/protocols/mod.rs index d3d3a275..bef129e7 100644 --- a/example/protocols/mod.rs +++ b/example/protocols/mod.rs @@ -4,6 +4,4 @@ // #[cfg(unix)] pub mod asynchronous; -#[cfg(unix)] -pub use asynchronous as r#async; pub mod sync; diff --git a/example/server.rs b/example/server.rs index c15bf337..54d33f21 100644 --- a/example/server.rs +++ b/example/server.rs @@ -81,14 +81,8 @@ impl agent_ttrpc::AgentService for AgentService { fn main() { simple_logging::log_to_stderr(LevelFilter::Trace); - - let h = Box::new(HealthService {}) as Box; - let h = Arc::new(h); - let hservice = health_ttrpc::create_health(h); - - let a = Box::new(AgentService {}) as Box; - let a = Arc::new(a); - let aservice = agent_ttrpc::create_agent_service(a); + let hservice = health_ttrpc::create_health(Arc::new(HealthService {})); + let aservice = agent_ttrpc::create_agent_service(Arc::new(AgentService {})); utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap(); let mut server = Server::new() From 0879e83e9f8d848059dfa405fd83dc607bba9edb Mon Sep 17 00:00:00 2001 From: Tim Zhang Date: Thu, 26 Sep 2024 16:29:46 +0800 Subject: [PATCH 09/11] compiler: release v0.6.3 Cut the release for #235. Signed-off-by: Tim Zhang --- compiler/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compiler/Cargo.toml b/compiler/Cargo.toml index 13eabdd6..e449d34d 100644 --- a/compiler/Cargo.toml +++ b/compiler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ttrpc-compiler" -version = "0.6.2" +version = "0.6.3" edition = "2018" authors = ["The AntFin Kata Team "] license = "Apache-2.0" From 4dedde5975c2b58935f6758fd99f1ddb651ad637 Mon Sep 17 00:00:00 2001 From: Tim Zhang Date: Thu, 26 Sep 2024 16:30:53 +0800 Subject: [PATCH 10/11] ttrpc: release v0.8.2 Bump the version to v0.8.2 Signed-off-by: Tim Zhang --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 78ba208a..0df03998 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ttrpc" -version = "0.8.1" +version = "0.8.2" authors = ["The AntFin Kata Team "] edition = "2018" license = "Apache-2.0" From aef8aaa3c2a850ed0b77a9bf8f6648780382346b Mon Sep 17 00:00:00 2001 From: Tim Zhang Date: Thu, 26 Sep 2024 16:54:30 +0800 Subject: [PATCH 11/11] gh-action: remove codecov MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We have no time to pay attention to the code coverage, and it doesn’t make sense for every CI to fail because of this. Signed-off-by: Tim Zhang --- .github/codecov.yml | 2 -- .github/workflows/cov.yml | 24 ------------------------ 2 files changed, 26 deletions(-) delete mode 100644 .github/codecov.yml delete mode 100644 .github/workflows/cov.yml diff --git a/.github/codecov.yml b/.github/codecov.yml deleted file mode 100644 index e00ce3d6..00000000 --- a/.github/codecov.yml +++ /dev/null @@ -1,2 +0,0 @@ -github_checks: - annotations: false diff --git a/.github/workflows/cov.yml b/.github/workflows/cov.yml deleted file mode 100644 index 53eb524d..00000000 --- a/.github/workflows/cov.yml +++ /dev/null @@ -1,24 +0,0 @@ -name: Coverage - -on: [pull_request] - -jobs: - coverage: - runs-on: ubuntu-latest - env: - CARGO_TERM_COLOR: always - steps: - - uses: actions/checkout@v3 - - name: Install Rust - run: rustup toolchain install stable --component llvm-tools-preview - - name: Install cargo-llvm-cov - uses: taiki-e/install-action@cargo-llvm-cov - - name: Generate code coverage - run: cargo llvm-cov --all-features --lcov --output-path lcov.info - - name: Upload coverage to Codecov - uses: codecov/codecov-action@v4 - env: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - with: - files: lcov.info - fail_ci_if_error: true