Skip to content

Commit

Permalink
Merge branch 'containerd:master' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
jokemanfire authored Sep 27, 2024
2 parents 197c56b + 80d4ff4 commit c7db103
Show file tree
Hide file tree
Showing 20 changed files with 117 additions and 117 deletions.
2 changes: 0 additions & 2 deletions .github/codecov.yml

This file was deleted.

22 changes: 0 additions & 22 deletions .github/workflows/cov.yml

This file was deleted.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ttrpc"
version = "0.8.1"
version = "0.8.2"
authors = ["The AntFin Kata Team <[email protected]>"]
edition = "2018"
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion compiler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ttrpc-compiler"
version = "0.6.2"
version = "0.6.3"
edition = "2018"
authors = ["The AntFin Kata Team <[email protected]>"]
license = "Apache-2.0"
Expand Down
7 changes: 3 additions & 4 deletions compiler/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl<'a> MethodGen<'a> {
"}",
|w| {
w.write_line(&format!(
"service: Arc<Box<dyn {} + Send + Sync>>,",
"service: Arc<dyn {} + Send + Sync>,",
self.service_name
));
},
Expand Down Expand Up @@ -558,7 +558,7 @@ impl<'a> ServiceGen<'a> {
fn write_sync_server_create(&self, w: &mut CodeWriter) {
let method_handler_name = "::ttrpc::MethodHandler";
let s = format!(
"create_{}(service: Arc<Box<dyn {} + Send + Sync>>) -> HashMap<String, Box<dyn {} + Send + Sync>>",
"create_{}(service: Arc<dyn {} + Send + Sync>) -> HashMap<String, Box<dyn {} + Send + Sync>>",
to_snake_case(&self.service_name()),
self.service_name(),
method_handler_name,
Expand All @@ -577,7 +577,7 @@ impl<'a> ServiceGen<'a> {

fn write_async_server_create(&self, w: &mut CodeWriter) {
let s = format!(
"create_{}(service: Arc<Box<dyn {} + Send + Sync>>) -> HashMap<String, {}>",
"create_{}(service: Arc<dyn {} + Send + Sync>) -> HashMap<String, {}>",
to_snake_case(&self.service_name()),
self.service_name(),
"::ttrpc::r#async::Service"
Expand Down Expand Up @@ -642,7 +642,6 @@ fn write_generated_common(w: &mut CodeWriter) {
w.write_line("#![cfg_attr(rustfmt, rustfmt_skip)]");
w.write_line("#![allow(unknown_lints)]");
w.write_line("#![allow(clipto_camel_casepy)]");
w.write_line("#![allow(box_pointers)]");
w.write_line("#![allow(dead_code)]");
w.write_line("#![allow(missing_docs)]");
w.write_line("#![allow(non_camel_case_types)]");
Expand Down
4 changes: 4 additions & 0 deletions example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ path = "./async-stream-client.rs"

[build-dependencies]
ttrpc-codegen = { path = "../ttrpc-codegen"}

[patch.crates-io]
ttrpc-compiler = { path = "../compiler"}

2 changes: 1 addition & 1 deletion example/async-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
mod protocols;
mod utils;
#[cfg(unix)]
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc};
use protocols::asynchronous::{agent, agent_ttrpc, health, health_ttrpc};
use ttrpc::context::{self, Context};
#[cfg(unix)]
use ttrpc::r#async::Client;
Expand Down
11 changes: 3 additions & 8 deletions example/async-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::sync::Arc;
use log::LevelFilter;

#[cfg(unix)]
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc, types};
use protocols::asynchronous::{agent, agent_ttrpc, health, health_ttrpc, types};
#[cfg(unix)]
use ttrpc::asynchronous::Server;
use ttrpc::error::{Error, Result};
Expand Down Expand Up @@ -97,13 +97,8 @@ fn main() {
async fn main() {
simple_logging::log_to_stderr(LevelFilter::Trace);

let h = Box::new(HealthService {}) as Box<dyn health_ttrpc::Health + Send + Sync>;
let h = Arc::new(h);
let hservice = health_ttrpc::create_health(h);

let a = Box::new(AgentService {}) as Box<dyn agent_ttrpc::AgentService + Send + Sync>;
let a = Arc::new(a);
let aservice = agent_ttrpc::create_agent_service(a);
let hservice = health_ttrpc::create_health(Arc::new(HealthService {}));
let aservice = agent_ttrpc::create_agent_service(Arc::new(AgentService {}));

utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion example/async-stream-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
mod protocols;
mod utils;
#[cfg(unix)]
use protocols::r#async::{empty, streaming, streaming_ttrpc};
use protocols::asynchronous::{empty, streaming, streaming_ttrpc};
use ttrpc::context::{self, Context};
#[cfg(unix)]
use ttrpc::r#async::Client;
Expand Down
8 changes: 2 additions & 6 deletions example/async-stream-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::Arc;
use log::{info, LevelFilter};

#[cfg(unix)]
use protocols::r#async::{empty, streaming, streaming_ttrpc};
use protocols::asynchronous::{empty, streaming, streaming_ttrpc};
#[cfg(unix)]
use ttrpc::{asynchronous::Server, Error};

Expand Down Expand Up @@ -163,11 +163,7 @@ fn main() {
#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logging::log_to_stderr(LevelFilter::Info);

let s = Box::new(StreamingService {}) as Box<dyn streaming_ttrpc::Streaming + Send + Sync>;
let s = Arc::new(s);
let service = streaming_ttrpc::create_streaming(s);

let service = streaming_ttrpc::create_streaming(Arc::new(StreamingService {}));
utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap();

let mut server = Server::new()
Expand Down
2 changes: 0 additions & 2 deletions example/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@
//
#[cfg(unix)]
pub mod asynchronous;
#[cfg(unix)]
pub use asynchronous as r#async;
pub mod sync;
10 changes: 2 additions & 8 deletions example/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,8 @@ impl agent_ttrpc::AgentService for AgentService {

fn main() {
simple_logging::log_to_stderr(LevelFilter::Trace);

let h = Box::new(HealthService {}) as Box<dyn health_ttrpc::Health + Send + Sync>;
let h = Arc::new(h);
let hservice = health_ttrpc::create_health(h);

let a = Box::new(AgentService {}) as Box<dyn agent_ttrpc::AgentService + Send + Sync>;
let a = Arc::new(a);
let aservice = agent_ttrpc::create_agent_service(a);
let hservice = health_ttrpc::create_health(Arc::new(HealthService {}));
let aservice = agent_ttrpc::create_agent_service(Arc::new(AgentService {}));

utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap();
let mut server = Server::new()
Expand Down
8 changes: 5 additions & 3 deletions src/asynchronous/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use crate::r#async::stream::{
};
use crate::r#async::utils;

use super::stream::SendingMessage;

/// A ttrpc Client (async).
#[derive(Clone)]
pub struct Client {
Expand Down Expand Up @@ -78,7 +80,7 @@ impl Client {
self.streams.lock().unwrap().insert(stream_id, tx);

self.req_tx
.send(msg)
.send(SendingMessage::new(msg))
.await
.map_err(|e| Error::Others(format!("Send packet to sender error {e:?}")))?;

Expand Down Expand Up @@ -139,7 +141,7 @@ impl Client {
// TODO: check return
self.streams.lock().unwrap().insert(stream_id, tx);
self.req_tx
.send(msg)
.send(SendingMessage::new(msg))
.await
.map_err(|e| Error::Others(format!("Send packet to sender error {e:?}")))?;

Expand Down Expand Up @@ -204,7 +206,7 @@ struct ClientWriter {

#[async_trait]
impl WriterDelegate for ClientWriter {
async fn recv(&mut self) -> Option<GenMessage> {
async fn recv(&mut self) -> Option<SendingMessage> {
self.rx.recv().await
}

Expand Down
14 changes: 9 additions & 5 deletions src/asynchronous/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use tokio::{
use crate::error::Error;
use crate::proto::{GenMessage, GenMessageError, MessageHeader};

use super::stream::SendingMessage;

pub trait Builder {
type Reader;
type Writer;
Expand All @@ -25,7 +27,7 @@ pub trait Builder {

#[async_trait]
pub trait WriterDelegate {
async fn recv(&mut self) -> Option<GenMessage>;
async fn recv(&mut self) -> Option<SendingMessage>;
async fn disconnect(&self, msg: &GenMessage, e: Error);
async fn exit(&self);
}
Expand Down Expand Up @@ -58,12 +60,14 @@ where
let (reader_delegate, mut writer_delegate) = builder.build();

let writer_task = tokio::spawn(async move {
while let Some(msg) = writer_delegate.recv().await {
trace!("write message: {:?}", msg);
if let Err(e) = msg.write_to(&mut writer).await {
while let Some(mut sending_msg) = writer_delegate.recv().await {
trace!("write message: {:?}", sending_msg.msg);
if let Err(e) = sending_msg.msg.write_to(&mut writer).await {
error!("write_message got error: {:?}", e);
writer_delegate.disconnect(&msg, e).await;
sending_msg.send_result(Err(e.clone()));
writer_delegate.disconnect(&sending_msg.msg, e).await;
}
sending_msg.send_result(Ok(()));
}
writer_delegate.exit().await;
trace!("Writer task exit.");
Expand Down
30 changes: 21 additions & 9 deletions src/asynchronous/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use tokio::{
#[cfg(any(target_os = "linux", target_os = "android"))]
use tokio_vsock::VsockListener;

use crate::asynchronous::unix_incoming::UnixIncoming;
use crate::asynchronous::{stream::SendingMessage, unix_incoming::UnixIncoming};
use crate::common::{self, Domain};
use crate::context;
use crate::error::{get_status, Error, Result};
Expand Down Expand Up @@ -339,7 +339,7 @@ struct ServerWriter {

#[async_trait]
impl WriterDelegate for ServerWriter {
async fn recv(&mut self) -> Option<GenMessage> {
async fn recv(&mut self) -> Option<SendingMessage> {
self.rx.recv().await
}
async fn disconnect(&self, _msg: &GenMessage, _: Error) {}
Expand Down Expand Up @@ -381,12 +381,14 @@ impl ReaderDelegate for ServerReader {
async fn handle_msg(&self, msg: GenMessage) {
let handler_shutdown_waiter = self.handler_shutdown.subscribe();
let context = self.context();
let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>();
spawn(async move {
select! {
_ = context.handle_msg(msg) => {}
_ = context.handle_msg(msg, wait_tx) => {}
_ = handler_shutdown_waiter.wait_shutdown() => {}
}
});
wait_rx.await.unwrap_or_default();
}

async fn handle_err(&self, header: MessageHeader, e: Error) {
Expand Down Expand Up @@ -424,7 +426,7 @@ impl HandlerContext {
})
.ok();
}
async fn handle_msg(&self, msg: GenMessage) {
async fn handle_msg(&self, msg: GenMessage, wait_tx: tokio::sync::oneshot::Sender<()>) {
let stream_id = msg.header.stream_id;

if (stream_id % 2) != 1 {
Expand All @@ -438,7 +440,7 @@ impl HandlerContext {
}

match msg.header.type_ {
MESSAGE_TYPE_REQUEST => match self.handle_request(msg).await {
MESSAGE_TYPE_REQUEST => match self.handle_request(msg, wait_tx).await {
Ok(opt_msg) => match opt_msg {
Some(mut resp) => {
// Server: check size before sending to client
Expand All @@ -462,7 +464,7 @@ impl HandlerContext {
};

self.tx
.send(msg)
.send(SendingMessage::new(msg))
.await
.map_err(err_to_others_err!(e, "Send packet to sender error "))
.ok();
Expand All @@ -471,6 +473,8 @@ impl HandlerContext {
Err(status) => Self::respond_with_status(self.tx.clone(), stream_id, status).await,
},
MESSAGE_TYPE_DATA => {
// no need to wait data message handling
drop(wait_tx);
// TODO(wllenyj): Compatible with golang behavior.
if (msg.header.flags & FLAG_REMOTE_CLOSED) == FLAG_REMOTE_CLOSED
&& !msg.payload.is_empty()
Expand Down Expand Up @@ -518,7 +522,11 @@ impl HandlerContext {
}
}

async fn handle_request(&self, msg: GenMessage) -> StdResult<Option<Response>, Status> {
async fn handle_request(
&self,
msg: GenMessage,
wait_tx: tokio::sync::oneshot::Sender<()>,
) -> StdResult<Option<Response>, Status> {
//TODO:
//if header.stream_id <= self.last_stream_id {
// return Err;
Expand All @@ -539,10 +547,11 @@ impl HandlerContext {
})?;

if let Some(method) = srv.get_method(&req.method) {
drop(wait_tx);
return self.handle_method(method, req_msg).await;
}
if let Some(stream) = srv.get_stream(&req.method) {
return self.handle_stream(stream, req_msg).await;
return self.handle_stream(stream, req_msg, wait_tx).await;
}
Err(get_status(
Code::UNIMPLEMENTED,
Expand Down Expand Up @@ -598,6 +607,7 @@ impl HandlerContext {
&self,
stream: Arc<dyn StreamHandler + Send + Sync>,
req_msg: Message<Request>,
wait_tx: tokio::sync::oneshot::Sender<()>,
) -> StdResult<Option<Response>, Status> {
let stream_id = req_msg.header.stream_id;
let req = req_msg.payload;
Expand All @@ -609,6 +619,8 @@ impl HandlerContext {

let no_data = (req_msg.header.flags & FLAG_NO_DATA) == FLAG_NO_DATA;

drop(wait_tx);

let si = StreamInner::new(
stream_id,
self.tx.clone(),
Expand Down Expand Up @@ -652,7 +664,7 @@ impl HandlerContext {
header: MessageHeader::new_response(stream_id, payload.len() as u32),
payload,
};
tx.send(msg)
tx.send(SendingMessage::new(msg))
.await
.map_err(err_to_others_err!(e, "Send packet to sender error "))
}
Expand Down
Loading

0 comments on commit c7db103

Please sign in to comment.