From 5f0e894771d43401576deb20bff964d6bc427625 Mon Sep 17 00:00:00 2001 From: xujian Date: Wed, 1 Jul 2020 08:09:10 -0400 Subject: [PATCH 1/8] add function param data --- Cargo.toml | 5 ++ src/data.rs | 78 ++++++++++++++++++++++ src/factory.rs | 72 +++++++++++++++++++++ src/lib.rs | 172 ++----------------------------------------------- src/server.rs | 90 ++++++++++++++++++++++++++ tests/test.rs | 47 ++++++++++++++ 6 files changed, 296 insertions(+), 168 deletions(-) create mode 100644 src/data.rs create mode 100644 src/factory.rs create mode 100644 src/server.rs create mode 100644 tests/test.rs diff --git a/Cargo.toml b/Cargo.toml index 6996cad..3f89574 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,8 @@ edition = "2018" [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +jsonrpc-lite = "0.5.0" +fxhash = "0.2.1" + +[dependencies-dev] +tokio = { version = "0.2", features = ["full"] } diff --git a/src/data.rs b/src/data.rs new file mode 100644 index 0000000..8c1bca2 --- /dev/null +++ b/src/data.rs @@ -0,0 +1,78 @@ +use std::any::{Any, TypeId}; +use std::sync::Arc; + +use fxhash::FxHashMap; + +pub struct Data(Arc); + +impl Data { + pub fn new(d: T) -> Self { + Self(Arc::new(d)) + } + + pub fn get_ref(&self) -> &T { + self.0.as_ref() + } +} + +impl Clone for Data { + fn clone(&self) -> Data { + Data(self.0.clone()) + } +} + +pub(crate) trait DataFactory { + fn get(&self) -> Option<&D>; +} + +pub struct DataExtensions(FxHashMap>); + +impl DataExtensions { + pub fn insert(&self, t: T) { + unsafe {self.0.insert(TypeId::of::(), Box::new(t));} + } +} + +unsafe impl Sync for DataExtensions {} + +impl Default for DataExtensions { + fn default() -> Self { + Self(FxHashMap::>::default()) + } +} + +impl DataFactory for DataExtensions { + fn get(&self) -> Option<&D> { + self.0 + .get(&TypeId::of::()) + .and_then(|boxed| boxed.downcast_ref()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Mutex; + + #[derive(Debug)] + pub struct Test { + pub a: Mutex, + pub b: Mutex, + } + + #[test] + fn test_data() { + let data = Data::new(Test { + a: Mutex::new(99u32), + b: Mutex::new("abcdefg".to_string()), + }); + + let mut extension = DataExtensions::default(); + extension.insert(data); + + let obj = extension.get::>().unwrap().clone(); + + assert_eq!(99, *obj.get_ref().a.lock().unwrap()); + assert_eq!("abcdefg".to_string(), *obj.get_ref().b.lock().unwrap()); + } +} diff --git a/src/factory.rs b/src/factory.rs new file mode 100644 index 0000000..c8769c9 --- /dev/null +++ b/src/factory.rs @@ -0,0 +1,72 @@ +use crate::data::{Data, DataExtensions, DataFactory}; +use serde::{Deserialize, Serialize}; +use std::future::Future; + +pub(crate) trait Factory: Clone + 'static +where + O: Serialize, + R: Future, +{ + fn call(&self, params: T) -> R; +} + +impl Factory<(), (), R, O> for F +where + F: Fn() -> R + Clone + 'static, + O: Serialize, + R: Future, +{ + fn call(&self, _: ()) -> R { + (self)() + } +} + +impl Factory<(&DataExtensions,), (Data,), R, O> for F +where + F: Fn(Data) -> R + Clone + 'static, + O: Serialize, + R: Future, + T: 'static, +{ + fn call(&self, params: (&DataExtensions,)) -> R { + (self)(params.0.get::>().unwrap().clone()) + } +} + +impl Factory<(P,), (P,), R, O> for F +where + F: Fn(P) -> R + Clone + 'static, + O: Serialize, + R: Future, + P: for<'de> Deserialize<'de>, +{ + fn call(&self, params: (P,)) -> R { + (self)(params.0) + } +} + +impl Factory<(&DataExtensions, P), (Data, P), R, O> for F +where + F: Fn(Data, P) -> R + Clone + 'static, + O: Serialize, + R: Future, + P: for<'de> Deserialize<'de>, + T: 'static, +{ + fn call(&self, params: (&DataExtensions, P)) -> R { + (self)(params.0.get::>().unwrap().clone(), params.1) + } +} + +impl Factory<(&DataExtensions, P), (P, Data), R, O> for F +where + F: Fn(P, Data) -> R + Clone + 'static, + O: Serialize, + R: Future, + P: for<'de> Deserialize<'de>, + T: 'static, +{ + fn call(&self, params: (&DataExtensions, P)) -> R { + (self)(params.1, params.0.get::>().unwrap().clone()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 089e222..b3c4945 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,172 +1,8 @@ -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::future::Future; -use std::pin::Pin; -#[derive(Deserialize, Debug)] -pub(crate) struct Request { - pub jsonrpc: String, - pub method: String, - pub params: serde_json::Value, - pub id: i64, -} -#[derive(Serialize, Debug)] -pub(crate) struct Response { - jsonrpc: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub result: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, - id: i64, -} +mod data; +pub use data::Data; -impl Response { - pub fn new(id: i64, result: Result) -> Self { - match result { - Ok(r) => Response { - jsonrpc: "2.0".to_string(), - error: None, - result: Some(r), - id, - }, - Err(e) => Response { - jsonrpc: "2.0".to_string(), - result: None, - error: Some(e), - id, - }, - } - } -} +mod factory; -pub(crate) trait DataFactory {} - -pub struct Data(T); - -impl Data { - pub fn new(t: T) -> Self { - Data(t) - } -} - -impl DataFactory for Data {} - -pub(crate) trait Factory: Clone + 'static -where - O: Serialize, - R: Future, -{ - fn call(&self, params: T) -> R; -} - -impl Factory<(), R, O> for F -where - F: Fn() -> R + Clone + 'static, - O: Serialize, - R: Future, -{ - fn call(&self, _: ()) -> R { - (self)() - } -} - -impl Factory<(Data,), R, O> for F -where - F: Fn(Data) -> R + Clone + 'static, - O: Serialize, - R: Future, -{ - fn call(&self, params: (Data,)) -> R { - (self)(params.0) - } -} - -impl Factory<(P,), R, O> for F -where - F: Fn(P) -> R + Clone + 'static, - O: Serialize, - R: Future, - P: for<'de> Deserialize<'de>, -{ - fn call(&self, params: (P,)) -> R { - (self)(params.0) - } -} - -impl Factory<(Data, P), R, O> for F -where - F: Fn(Data, P) -> R + Clone + 'static, - O: Serialize, - R: Future, - P: for<'de> Deserialize<'de>, -{ - fn call(&self, params: (Data, P)) -> R { - (self)(params.0, params.1) - } -} - -impl Factory<(P, Data), R, O> for F -where - F: Fn(P, Data) -> R + Clone + 'static, - O: Serialize, - R: Future, - P: for<'de> Deserialize<'de>, -{ - fn call(&self, params: (P, Data)) -> R { - (self)(params.0, params.1) - } -} - -pub struct Server { - map: HashMap< - String, - Box Pin + Send>>>, - >, - state: Option>, -} - -impl Server { - pub fn new() -> Self { - Server { - map: HashMap::new(), - state: None, - } - } - - pub fn to(mut self, key: String, handle: H) -> Self - where - P: for<'de> Deserialize<'de> + Send + 'static, - R: Serialize + 'static, - E: Serialize + 'static, - F: Future> + Send + 'static, - H: Fn(P) -> F + 'static + Send, - { - let inner_handle = - move |req: Request| -> Pin + Send>> { - async fn inner(req: Request, handle: H) -> serde_json::Value - where - P: for<'de> Deserialize<'de> + Send + 'static, - R: Serialize + 'static, - E: Serialize + 'static, - F: Future> + Send + 'static, - H: Fn(P) -> F + 'static + Send, - { - let params: P = serde_json::from_value(req.params).unwrap(); - let _r = (handle)(params); - let result = Response::new(req.id, _r.await); - serde_json::to_value(result).unwrap() - } - Box::pin(inner(req, handle)) - }; - self.map.insert(key, Box::new(inner_handle)); - self - } - - pub fn data(mut self, d: D) -> Self { - if self.state.is_none() { - self.state = Some(Box::new(Data::new(d))) - } - self - } -} +pub mod server; diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..22d0f33 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,90 @@ +use crate::data::{Data, DataExtensions}; +use jsonrpc_lite::Error as JsonRpcError; +use jsonrpc_lite::JsonRpc; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use crate::factory::Factory; + +#[derive(Deserialize, Debug)] +pub struct Request { + pub jsonrpc: String, + pub method: String, + pub params: serde_json::Value, + pub id: i64, +} + + +pub struct Server { + map: HashMap< + String, + Box Pin + Send>>>, + >, + extensions: DataExtensions, +} + +impl Server { + pub fn new() -> Self { + Server { + map: HashMap::new(), + extensions: DataExtensions::default(), + } + } + + pub fn to(mut self, key: String, handle: H) -> Self + where + P: for<'de> Deserialize<'de> + Send + 'static, + R: Serialize + 'static, + E: Serialize + Into + 'static, + F: Future> + Send + 'static, + H: Factory<(&'static DataExtensions, P), (Data, P), F, Result> + Send + 'static, + T: 'static, + { + let inner_handle = + move |req: Request| -> Pin + Send>> { + async fn inner(extensions: &'static DataExtensions, req: Request, handle: H) -> serde_json::Value + where + P: for<'de> Deserialize<'de> + Send + 'static, + R: Serialize + 'static, + E: Serialize + Into + 'static, + F: Future> + Send + 'static, + H: Factory<(&'static DataExtensions, P), (Data, P), F, Result> + Send + 'static, + T: 'static, + { + let params: P = serde_json::from_value(req.params).unwrap(); + let _r = (handle).call((extensions, params)); + match _r.await { + Ok(result) => serde_json::to_value(JsonRpc::success( + req.id, + &serde_json::to_value(result).unwrap(), + )) + .unwrap(), + Err(err) => { + serde_json::to_value(JsonRpc::error(req.id, err.into())).unwrap() + } + } + } + Box::pin(inner(&self.extensions, req, handle)) + }; + self.map.insert(key, Box::new(inner_handle)); + self + } + + pub fn data(mut self, d: D) -> Self { + self.extensions.insert(Data::new(d)); + self + } + + /// 传入一个Value格式的json-rpc单独请求 + /// 返回响应 + pub async fn route(&self, req_str: serde_json::Value) -> serde_json::Value { + let req: Request = serde_json::from_value(req_str).unwrap(); + let handle = match self.map.get(&req.method) { + Some(handle) => handle, + None => return serde_json::to_value(JsonRpc::error(req.id, JsonRpcError::method_not_found())).unwrap(), + }; + + handle(req).await + } +} diff --git a/tests/test.rs b/tests/test.rs new file mode 100644 index 0000000..6421459 --- /dev/null +++ b/tests/test.rs @@ -0,0 +1,47 @@ +use jsonrpc_ws::server::{Request, Server}; +use jsonrpc_ws::Data; +use std::sync::Mutex; +use tokio::time::{self, Duration}; +use serde_json::json; + +#[derive(Debug)] +pub struct Test { + pub a: Mutex, + pub b: Mutex, +} + +async fn route_a(local_test: Data) { + let mut a = *local_test.get_ref().a.lock().unwrap(); + + a += 1; + assert_eq!(100u32, a); +} + +async fn route_b(local_test: Data, req: Request) { + let timeout = 1000u64; + time::delay_for(Duration::from_millis(timeout.into())); + + let mut a = *local_test.get_ref().a.lock().unwrap(); + + a += 1; + assert_eq!(100u32, a); +} + +#[test] +fn test_server() { + let mut server = Server::new() + .data(Test { + a: Mutex::new(99u32), + b: Mutex::new("abcdefg".to_string()), + }) + .to("route_b".to_string(), route_b); + + let runtime = tokio::runtime::Runtime(); + + runtime.block_on(server.route(json!({ + "jsonrpc": "2.0", + "method": "route_b", + "params": {"a":1, "b":2}, + "id": 99, + }))); +} From a53fff2058a0a950c2b4b59e9a9d04d48c809dc2 Mon Sep 17 00:00:00 2001 From: xujian Date: Thu, 2 Jul 2020 02:21:17 -0400 Subject: [PATCH 2/8] fix some error --- src/data.rs | 6 ++++-- src/factory.rs | 13 +++++++------ src/lib.rs | 2 +- src/server.rs | 23 ++++++++++++----------- 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/src/data.rs b/src/data.rs index 8c1bca2..9d4f0bf 100644 --- a/src/data.rs +++ b/src/data.rs @@ -28,13 +28,15 @@ pub(crate) trait DataFactory { pub struct DataExtensions(FxHashMap>); impl DataExtensions { - pub fn insert(&self, t: T) { - unsafe {self.0.insert(TypeId::of::(), Box::new(t));} + pub fn insert(&mut self, t: T) { + self.0.insert(TypeId::of::(), Box::new(t)); } } unsafe impl Sync for DataExtensions {} +unsafe impl Send for DataExtensions {} + impl Default for DataExtensions { fn default() -> Self { Self(FxHashMap::>::default()) diff --git a/src/factory.rs b/src/factory.rs index c8769c9..877da3b 100644 --- a/src/factory.rs +++ b/src/factory.rs @@ -1,6 +1,7 @@ use crate::data::{Data, DataExtensions, DataFactory}; use serde::{Deserialize, Serialize}; use std::future::Future; +use std::sync::Arc; pub(crate) trait Factory: Clone + 'static where @@ -21,14 +22,14 @@ where } } -impl Factory<(&DataExtensions,), (Data,), R, O> for F +impl Factory<(Arc,), (Data,), R, O> for F where F: Fn(Data) -> R + Clone + 'static, O: Serialize, R: Future, T: 'static, { - fn call(&self, params: (&DataExtensions,)) -> R { + fn call(&self, params: (Arc,)) -> R { (self)(params.0.get::>().unwrap().clone()) } } @@ -45,7 +46,7 @@ where } } -impl Factory<(&DataExtensions, P), (Data, P), R, O> for F +impl Factory<(Arc, P), (Data, P), R, O> for F where F: Fn(Data, P) -> R + Clone + 'static, O: Serialize, @@ -53,12 +54,12 @@ where P: for<'de> Deserialize<'de>, T: 'static, { - fn call(&self, params: (&DataExtensions, P)) -> R { + fn call(&self, params: (Arc, P)) -> R { (self)(params.0.get::>().unwrap().clone(), params.1) } } -impl Factory<(&DataExtensions, P), (P, Data), R, O> for F +impl Factory<(Arc, P), (P, Data), R, O> for F where F: Fn(P, Data) -> R + Clone + 'static, O: Serialize, @@ -66,7 +67,7 @@ where P: for<'de> Deserialize<'de>, T: 'static, { - fn call(&self, params: (&DataExtensions, P)) -> R { + fn call(&self, params: (Arc, P)) -> R { (self)(params.1, params.0.get::>().unwrap().clone()) } } diff --git a/src/lib.rs b/src/lib.rs index b3c4945..b7c7810 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ - +#![feature(get_mut_unchecked)] mod data; pub use data::Data; diff --git a/src/server.rs b/src/server.rs index 22d0f33..f40033f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,6 +6,7 @@ use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use crate::factory::Factory; +use std::sync::Arc; #[derive(Deserialize, Debug)] pub struct Request { @@ -19,37 +20,37 @@ pub struct Request { pub struct Server { map: HashMap< String, - Box Pin + Send>>>, + Box, Request) -> Pin + Send>>>, >, - extensions: DataExtensions, + extensions: Arc, } impl Server { pub fn new() -> Self { Server { map: HashMap::new(), - extensions: DataExtensions::default(), + extensions: Arc::new(DataExtensions::default()), } } - pub fn to(mut self, key: String, handle: H) -> Self + pub fn to<'a, P, F, R, E, H, T>(mut self, key: String, handle: H) -> Self where P: for<'de> Deserialize<'de> + Send + 'static, R: Serialize + 'static, E: Serialize + Into + 'static, F: Future> + Send + 'static, - H: Factory<(&'static DataExtensions, P), (Data, P), F, Result> + Send + 'static, + H: Factory<(Arc, P), (Data, P), F, Result> + Clone + Send + 'static, T: 'static, { let inner_handle = - move |req: Request| -> Pin + Send>> { - async fn inner(extensions: &'static DataExtensions, req: Request, handle: H) -> serde_json::Value + |extensions: Arc, req: Request| -> Pin + Send>> { + async fn inner(extensions: Arc, req: Request, handle: H) -> serde_json::Value where P: for<'de> Deserialize<'de> + Send + 'static, R: Serialize + 'static, E: Serialize + Into + 'static, F: Future> + Send + 'static, - H: Factory<(&'static DataExtensions, P), (Data, P), F, Result> + Send + 'static, + H: Factory<(Arc, P), (Data, P), F, Result> + Clone + Send + 'static, T: 'static, { let params: P = serde_json::from_value(req.params).unwrap(); @@ -65,14 +66,14 @@ impl Server { } } } - Box::pin(inner(&self.extensions, req, handle)) + Box::pin(inner(extensions.clone(), req, handle.clone())) }; self.map.insert(key, Box::new(inner_handle)); self } pub fn data(mut self, d: D) -> Self { - self.extensions.insert(Data::new(d)); + Arc::get_mut(&mut self.extensions).unwrap().insert(Data::new(d)); self } @@ -85,6 +86,6 @@ impl Server { None => return serde_json::to_value(JsonRpc::error(req.id, JsonRpcError::method_not_found())).unwrap(), }; - handle(req).await + handle(self.extensions.clone(), req).await } } From 55dcc3e045e3939de502bad3107e12ad204c37c9 Mon Sep 17 00:00:00 2001 From: xujian Date: Thu, 2 Jul 2020 22:39:08 -0400 Subject: [PATCH 3/8] fix some error, add testcase --- Cargo.toml | 2 +- src/factory.rs | 63 +++++---------------------------- src/lib.rs | 1 + src/server.rs | 23 +++++++----- tests/test.rs | 96 ++++++++++++++++++++++++++++++++++++++++++-------- 5 files changed, 105 insertions(+), 80 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3f89574..b8f0be0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,5 +12,5 @@ serde_json = "1.0" jsonrpc-lite = "0.5.0" fxhash = "0.2.1" -[dependencies-dev] +[dev-dependencies] tokio = { version = "0.2", features = ["full"] } diff --git a/src/factory.rs b/src/factory.rs index 877da3b..6f08997 100644 --- a/src/factory.rs +++ b/src/factory.rs @@ -1,73 +1,26 @@ -use crate::data::{Data, DataExtensions, DataFactory}; +use crate::data::{Data}; use serde::{Deserialize, Serialize}; use std::future::Future; -use std::sync::Arc; -pub(crate) trait Factory: Clone + 'static -where - O: Serialize, - R: Future, -{ - fn call(&self, params: T) -> R; -} - -impl Factory<(), (), R, O> for F -where - F: Fn() -> R + Clone + 'static, - O: Serialize, - R: Future, -{ - fn call(&self, _: ()) -> R { - (self)() - } -} - -impl Factory<(Arc,), (Data,), R, O> for F -where - F: Fn(Data) -> R + Clone + 'static, - O: Serialize, - R: Future, - T: 'static, -{ - fn call(&self, params: (Arc,)) -> R { - (self)(params.0.get::>().unwrap().clone()) - } -} -impl Factory<(P,), (P,), R, O> for F +pub(crate) trait Factory: Clone + 'static where - F: Fn(P) -> R + Clone + 'static, O: Serialize, - R: Future, - P: for<'de> Deserialize<'de>, + R: Future + Send, { - fn call(&self, params: (P,)) -> R { - (self)(params.0) - } + fn call(&self, params: T) -> R; } -impl Factory<(Arc, P), (Data, P), R, O> for F +impl Factory<(Data, P), R, O> for F where F: Fn(Data, P) -> R + Clone + 'static, O: Serialize, - R: Future, + R: Future + Send, P: for<'de> Deserialize<'de>, T: 'static, { - fn call(&self, params: (Arc, P)) -> R { - (self)(params.0.get::>().unwrap().clone(), params.1) + fn call(&self, params: (Data, P)) -> R { + (self)(params.0, params.1) } } -impl Factory<(Arc, P), (P, Data), R, O> for F -where - F: Fn(P, Data) -> R + Clone + 'static, - O: Serialize, - R: Future, - P: for<'de> Deserialize<'de>, - T: 'static, -{ - fn call(&self, params: (Arc, P)) -> R { - (self)(params.1, params.0.get::>().unwrap().clone()) - } -} diff --git a/src/lib.rs b/src/lib.rs index b7c7810..69ae797 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ #![feature(get_mut_unchecked)] +#![feature(fn_traits)] mod data; pub use data::Data; diff --git a/src/server.rs b/src/server.rs index f40033f..732cec0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -7,6 +7,7 @@ use std::future::Future; use std::pin::Pin; use crate::factory::Factory; use std::sync::Arc; +use crate::data::DataFactory; #[derive(Deserialize, Debug)] pub struct Request { @@ -39,23 +40,27 @@ impl Server { R: Serialize + 'static, E: Serialize + Into + 'static, F: Future> + Send + 'static, - H: Factory<(Arc, P), (Data, P), F, Result> + Clone + Send + 'static, - T: 'static, + H: Fn(Data, P) -> F + 'static + Clone + Send + Sync, + T: 'static + Sync + Send, { let inner_handle = - |extensions: Arc, req: Request| -> Pin + Send>> { + move |extensions: Arc, req: Request| -> Pin + Send>> { async fn inner(extensions: Arc, req: Request, handle: H) -> serde_json::Value where P: for<'de> Deserialize<'de> + Send + 'static, R: Serialize + 'static, E: Serialize + Into + 'static, F: Future> + Send + 'static, - H: Factory<(Arc, P), (Data, P), F, Result> + Clone + Send + 'static, - T: 'static, + H: Fn(Data, P) -> F + 'static + Clone + Send + Sync, + T: 'static + Sync + Send, { - let params: P = serde_json::from_value(req.params).unwrap(); - let _r = (handle).call((extensions, params)); - match _r.await { + let params: P = match serde_json::from_value(req.params){ + Ok(params) => params, + Err(err) => return serde_json::to_value(JsonRpc::error(req.id, JsonRpcError::invalid_params())).unwrap(), + }; + + let data_t = extensions.get::>().unwrap().clone(); + match (handle).call((data_t, params)).await { Ok(result) => serde_json::to_value(JsonRpc::success( req.id, &serde_json::to_value(result).unwrap(), @@ -66,7 +71,7 @@ impl Server { } } } - Box::pin(inner(extensions.clone(), req, handle.clone())) + Box::pin(inner(extensions, req, handle.clone())) }; self.map.insert(key, Box::new(inner_handle)); self diff --git a/tests/test.rs b/tests/test.rs index 6421459..01f625a 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -3,45 +3,111 @@ use jsonrpc_ws::Data; use std::sync::Mutex; use tokio::time::{self, Duration}; use serde_json::json; +use serde::{Serialize, Deserialize}; +use jsonrpc_lite::Error as JsonRpcError; #[derive(Debug)] -pub struct Test { +pub struct ShareStateTest { pub a: Mutex, pub b: Mutex, } -async fn route_a(local_test: Data) { +#[derive(Deserialize)] +pub struct ReqTest { + pub a: u32, + pub b: String, + pub c: Vec, +} + +#[derive(Serialize)] +pub struct RespTest { + pub a: u32, + pub b: String, + pub c: Vec, +} + +#[derive(Debug, Serialize)] +pub enum TestError { + // websock 错误 + WebSockServerBindError, + WebSockServerAcceptConnError, + WebSockServerGetPeerError, +} + +impl Into for TestError{ + fn into(self) -> JsonRpcError { + JsonRpcError{code: 1000i64, message: "test".to_string(), data: None} + } +} + +async fn route_a(local_test: Data) { let mut a = *local_test.get_ref().a.lock().unwrap(); a += 1; assert_eq!(100u32, a); } -async fn route_b(local_test: Data, req: Request) { +async fn route_b(local_test: Data, req: ReqTest) -> Result { let timeout = 1000u64; - time::delay_for(Duration::from_millis(timeout.into())); + time::delay_for(Duration::from_millis(timeout.into())).await; let mut a = *local_test.get_ref().a.lock().unwrap(); - a += 1; - assert_eq!(100u32, a); + + let mut new_resp_c = Vec::::new(); + new_resp_c.extend_from_slice(&req.c[..]); + new_resp_c.push("add".to_string()); + + Ok(RespTest{a: a, b: req.b, c: req.c}) } #[test] fn test_server() { - let mut server = Server::new() - .data(Test { + let server = Server::new() + .data(ShareStateTest { a: Mutex::new(99u32), b: Mutex::new("abcdefg".to_string()), }) .to("route_b".to_string(), route_b); - let runtime = tokio::runtime::Runtime(); + let mut runtime = tokio::runtime::Runtime::new().unwrap(); - runtime.block_on(server.route(json!({ - "jsonrpc": "2.0", - "method": "route_b", - "params": {"a":1, "b":2}, - "id": 99, - }))); + runtime.block_on(async move{ + let resp = server.route(json!({ + "jsonrpc": "2.0", + "method": "route_b", + "params": {"err_param": 1}, + "id": 99, + })).await; + + assert_eq!( + json!({ + "error":{ + "code":-32602, + "message":"Invalid params" + }, + "id":99, + "jsonrpc":"2.0" + }), + resp + ); + + let resp = server.route(json!({ + "jsonrpc": "2.0", + "method": "route_b", + "params": {"a": 99u32, "b": "Test中文", "c": ["tset", "中文"]}, + "id": 2, + })).await; + + println!("{}", resp); + + let resp = server.route(json!({ + "jsonrpc": "2.0", + "method": "route_b", + "params": {"a": 99u32, "b": "Test中文", "c": ["tset", "中文"]}, + "id": 3, + })).await; + + println!("{}", resp); + }); } From 808feef10d51474c59f90d3db33edb91370c174c Mon Sep 17 00:00:00 2001 From: xujian Date: Mon, 6 Jul 2020 04:16:30 -0400 Subject: [PATCH 4/8] complate left feature and add testcase --- Cargo.toml | 2 +- src/factory.rs | 4 +- src/lib.rs | 2 +- src/macros.rs | 205 +++++++++++++++++++++++++++++++++++++++++++ src/server.rs | 91 +++++++++++-------- tests/test.rs | 231 ++++++++++++++++++++++++++++++++++++++----------- 6 files changed, 443 insertions(+), 92 deletions(-) create mode 100644 src/macros.rs diff --git a/Cargo.toml b/Cargo.toml index b8f0be0..75db787 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,4 +13,4 @@ jsonrpc-lite = "0.5.0" fxhash = "0.2.1" [dev-dependencies] -tokio = { version = "0.2", features = ["full"] } +tokio = { version = "0.2", features = ["full"] } \ No newline at end of file diff --git a/src/factory.rs b/src/factory.rs index 6f08997..4431a0e 100644 --- a/src/factory.rs +++ b/src/factory.rs @@ -1,8 +1,7 @@ -use crate::data::{Data}; +use crate::data::Data; use serde::{Deserialize, Serialize}; use std::future::Future; - pub(crate) trait Factory: Clone + 'static where O: Serialize, @@ -23,4 +22,3 @@ where (self)(params.0, params.1) } } - diff --git a/src/lib.rs b/src/lib.rs index 69ae797..d64d511 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -#![feature(get_mut_unchecked)] +#![feature(type_alias_impl_trait)] #![feature(fn_traits)] mod data; diff --git a/src/macros.rs b/src/macros.rs new file mode 100644 index 0000000..5ab729b --- /dev/null +++ b/src/macros.rs @@ -0,0 +1,205 @@ +use jsonrpc_lite::JsonRpc::{Success, Error}; +use jsonrpc_lite::JsonRpc; +use serde_json::{Value, json}; +use alloc::collections::btree_map::BTreeMap; +use serde::{Serialize, Deserialize}; +use jsonrpc_lite::Error as JsonRpcError; +use jsonrpc_lite::Params as JsonrpcParams; +use std::future::Future; +use crate::data::{DataExtensions, Data}; +use std::sync::Arc; +use std::pin::Pin; +use std::any::{Any, TypeId}; +use crate::data::DataFactory; +use crate::error::WallerError; + + +#[derive(Deserialize, Debug)] +pub struct Request { + pub jsonrpc: String, + pub method: String, + pub params: serde_json::Value, + pub id: i64, +} + +trait Route{ + type OutputT: Future + Clone + Send; + fn route(&self, app_data: Arc, req_param: Value) -> Self::OutputT; +} + +pub struct RouteFn Deserialize<'de>, + R: Serialize, + E: Into >{ + fn_call: Box, P) -> Pin> + Send + 'static>>>, + + app_data: Arc, +} + +impl Deserialize<'de>, +R: Serialize, +E: Into > RouteFn{ + pub fn new( + fn_call: Box, P) -> Pin> + Send + 'static>>>, + app_data: Arc) -> Self { + Self{ + fn_call, + app_data + } + } +} + +impl Route for RouteFn +where + P: for<'de> Deserialize<'de>, + R: Serialize, + E: Into +{ + type OutputT = impl Future + Send; + + fn route(&self, app_data: Arc, req: Request) -> Self::OutputT{ + let fn_call = self.fn_call.clone(); + async move{ + let datad = app_data.get::>().unwrap().clone(); + let p: P = serde_json::from_value(req.params).unwrap(); + match fn_call(datad, p).await { + Ok(result) => serde_json::to_value(JsonRpc::success( + req.id, + &serde_json::to_value(result).unwrap(), + )) + .unwrap(), + Err(err) => { + serde_json::to_value(JsonRpc::error(req.id, err.into())).unwrap() + } + } + } + } +} + +type WalletRouteFn = RouteFn; + + +impl Route for RouteFn +where + D, + P: for<'de> Deserialize<'de>, + R: Serialize, + E: Into, +{ + type Output = impl Future; + + fn route(&self, app_data: Arc, req_param: Value) -> Output{ + async move{ + let datad = params.0.get::>().unwrap().clone(); + let p: P = serde_json::from_value(req_param); + match self.fn_call(datad, p).await { + Ok(result) => serde_json::to_value(JsonRpc::success( + req.id, + &serde_json::to_value(result).unwrap(), + )) + .unwrap(), + Err(err) => { + serde_json::to_value(JsonRpc::error(req.id, err.into())).unwrap() + } + } + } + } +} + +struct Executer{ + routes: BTreeMap, +} + + +impl Executer{ + fn find_route(&self, method: &str) -> Result<&Route, ()> { + if let Some(route) = self.routes.get(method) { + Ok(&route) + } + else{ + return Err(()); + } + } + + fn execute_jsonstr(json_str: &str) -> String { + let json_data = serde_json::from_str(json_str) { + Ok(json_data) => json_data, + Err(_) => return serde_json::to_string(); + }; + + match json_data{ + Value::Object(_) => serde_json::to_string( + self.execute_once( + JsonRpc::parse(json_data).unwrap_or(JsonRpc::error((), JsonrpcError::invalid_request())))), + Value::Array(array) => serde_json::to_string( + array.map(|obj| self.execute_once(JsonRpc::parse_vec(obj). + unwrap_or(JsonRpc::error((), JsonrpcError::invalid_request())))).collect()), + _ => return serde_json::to_string(JsonRpc::error((), JsonrpcError::parse_error())), + } + } + + fn execute_once(&self, req_json: JsonRpc) -> JsonRpc { + let route = match self.find_route(req_json.get_method()){ + Ok(route) => route, + Err(_) => return JsonRpc::error((), JsonrpcError::method_not_found()), + }; + + match route.execute(req_json.get_params()) { + Ok(ans) => ans, + Err(err) => err, + } + } +} + + +macro_rules! json_rpc_dispatch { + ( $( $fn_name:expr => $fn_ident:ident => $param_type:ty ),*) => {{ + let mut json_rpc_dispatcher = BTreeMap::::new(); + + $( + if let Some(_) = json_rpc_dispatcher.get(&$fn_name) { + panic!("json_rpc method {} has registed", $fn_name); + } + json_rpc_dispatcher.insert($fn_name, NewType{a: 1u64, b: 99u64}); + )* + + json_rpc_dispatcher + }}; +} + +// macro_rules! get_fn_name { +// ($fn_ident:block) => { +// format!("{:?}", $fn_ident) +// } +// } + +#[cfg(test)] +mod tests { + use super::*; + use serde::{Serialize, Deserialize}; + use jsonrpc_lite::JsonRpc::{Request, Success, Error}; + use jsonrpc_lite::JsonRpc; + use alloc::string::String; + + #[derive(Serialize, Deserialize)] + struct RequestA { + param_1: String, + param_2: String, + param_3: u32, + } + + fn fn_testa(req_data: RequestA) -> JsonRpc { + JsonRpc::success(99i64, &json!({"ans": req_data.param_2})) + } + + #[test] + fn test_dispatch(){ + trace_macros!(true); + let dispatcher = json_rpc_dispatch!( + "fn_testa".to_string() => fn_testa => RequestA + ); + trace_macros!(false); + + println!("{:?}", dispatcher); + } +} \ No newline at end of file diff --git a/src/server.rs b/src/server.rs index 732cec0..143e952 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,27 +1,26 @@ +use crate::data::DataFactory; use crate::data::{Data, DataExtensions}; use jsonrpc_lite::Error as JsonRpcError; use jsonrpc_lite::JsonRpc; use serde::{Deserialize, Serialize}; +use serde_json::Value; use std::collections::HashMap; use std::future::Future; use std::pin::Pin; -use crate::factory::Factory; use std::sync::Arc; -use crate::data::DataFactory; #[derive(Deserialize, Debug)] pub struct Request { pub jsonrpc: String, pub method: String, - pub params: serde_json::Value, + pub params: Value, pub id: i64, } - pub struct Server { map: HashMap< String, - Box, Request) -> Pin + Send>>>, + Box, Request) -> Pin + Send>>>, >, extensions: Arc, } @@ -43,54 +42,74 @@ impl Server { H: Fn(Data, P) -> F + 'static + Clone + Send + Sync, T: 'static + Sync + Send, { - let inner_handle = - move |extensions: Arc, req: Request| -> Pin + Send>> { - async fn inner(extensions: Arc, req: Request, handle: H) -> serde_json::Value - where - P: for<'de> Deserialize<'de> + Send + 'static, - R: Serialize + 'static, - E: Serialize + Into + 'static, - F: Future> + Send + 'static, - H: Fn(Data, P) -> F + 'static + Clone + Send + Sync, - T: 'static + Sync + Send, - { - let params: P = match serde_json::from_value(req.params){ - Ok(params) => params, - Err(err) => return serde_json::to_value(JsonRpc::error(req.id, JsonRpcError::invalid_params())).unwrap(), - }; - - let data_t = extensions.get::>().unwrap().clone(); - match (handle).call((data_t, params)).await { - Ok(result) => serde_json::to_value(JsonRpc::success( + let inner_handle = move |extensions: Arc, + req: Request| + -> Pin + Send>> { + async fn inner( + extensions: Arc, + req: Request, + handle: H, + ) -> Value + where + P: for<'de> Deserialize<'de> + Send + 'static, + R: Serialize + 'static, + E: Serialize + Into + 'static, + F: Future> + Send + 'static, + H: Fn(Data, P) -> F + 'static + Clone + Send + Sync, + T: 'static + Sync + Send, + { + let params: P = match serde_json::from_value(req.params) { + Ok(params) => params, + Err(_) => { + return serde_json::to_value(JsonRpc::error( req.id, - &serde_json::to_value(result).unwrap(), + JsonRpcError::invalid_params(), )) - .unwrap(), - Err(err) => { - serde_json::to_value(JsonRpc::error(req.id, err.into())).unwrap() - } + .unwrap() } + }; + + let data_t = extensions.get::>().unwrap().clone(); + match (handle).call((data_t, params)).await { + Ok(result) => serde_json::to_value(JsonRpc::success( + req.id, + &serde_json::to_value(result).unwrap(), + )) + .unwrap(), + Err(err) => serde_json::to_value(JsonRpc::error(req.id, err.into())).unwrap(), } - Box::pin(inner(extensions, req, handle.clone())) - }; + } + Box::pin(inner(extensions, req, handle.clone())) + }; self.map.insert(key, Box::new(inner_handle)); self } pub fn data(mut self, d: D) -> Self { - Arc::get_mut(&mut self.extensions).unwrap().insert(Data::new(d)); + Arc::get_mut(&mut self.extensions) + .unwrap() + .insert(Data::new(d)); self } /// 传入一个Value格式的json-rpc单独请求 - /// 返回响应 - pub async fn route(&self, req_str: serde_json::Value) -> serde_json::Value { + /// 立刻返回响应执行Future或者错误结果 + pub async fn route_once( + &self, + req_str: Value, + ) -> Result + Send>>, Value> { let req: Request = serde_json::from_value(req_str).unwrap(); let handle = match self.map.get(&req.method) { Some(handle) => handle, - None => return serde_json::to_value(JsonRpc::error(req.id, JsonRpcError::method_not_found())).unwrap(), + None => { + return Err(serde_json::to_value(JsonRpc::error( + req.id, + JsonRpcError::method_not_found(), + )) + .unwrap()) + } }; - handle(self.extensions.clone(), req).await + Ok(handle(self.extensions.clone(), req)) } } diff --git a/tests/test.rs b/tests/test.rs index 01f625a..7c766ca 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,27 +1,29 @@ -use jsonrpc_ws::server::{Request, Server}; +use jsonrpc_lite::Error as JsonRpcError; +use jsonrpc_lite::JsonRpc; +use jsonrpc_ws::server::Server; use jsonrpc_ws::Data; -use std::sync::Mutex; -use tokio::time::{self, Duration}; +use serde::{Deserialize, Serialize}; use serde_json::json; -use serde::{Serialize, Deserialize}; -use jsonrpc_lite::Error as JsonRpcError; +use serde_json::Value; +use std::sync::{Arc, Mutex}; +use tokio::time::{self, Duration}; #[derive(Debug)] pub struct ShareStateTest { - pub a: Mutex, + pub a: Mutex, pub b: Mutex, } #[derive(Deserialize)] pub struct ReqTest { - pub a: u32, + pub a: u64, pub b: String, pub c: Vec, } -#[derive(Serialize)] +#[derive(Deserialize, Serialize)] pub struct RespTest { - pub a: u32, + pub a: u64, pub b: String, pub c: Vec, } @@ -34,51 +36,111 @@ pub enum TestError { WebSockServerGetPeerError, } -impl Into for TestError{ +impl Into for TestError { fn into(self) -> JsonRpcError { - JsonRpcError{code: 1000i64, message: "test".to_string(), data: None} + JsonRpcError { + code: 1000i64, + message: "test".to_string(), + data: None, + } } } -async fn route_a(local_test: Data) { - let mut a = *local_test.get_ref().a.lock().unwrap(); - - a += 1; - assert_eq!(100u32, a); -} - async fn route_b(local_test: Data, req: ReqTest) -> Result { - let timeout = 1000u64; - time::delay_for(Duration::from_millis(timeout.into())).await; + time_sleep(1000).await; - let mut a = *local_test.get_ref().a.lock().unwrap(); - a += 1; + let mut a = local_test.get_ref().a.lock().unwrap(); + *a += 1; let mut new_resp_c = Vec::::new(); new_resp_c.extend_from_slice(&req.c[..]); - new_resp_c.push("add".to_string()); + new_resp_c.push(format!(" add {}", *a)); + + Ok(RespTest { + a: *a + req.a, + b: req.b, + c: new_resp_c, + }) +} + +async fn time_sleep(timeout_ms: u64) { + time::delay_for(Duration::from_millis(timeout_ms.into())).await; +} + +fn server_route_error() -> JsonRpcError { + JsonRpcError { + code: -32500, + message: "Server Internal Route error".to_string(), + data: None, + } +} - Ok(RespTest{a: a, b: req.b, c: req.c}) +/// 传入jsonrpc请求 +/// 返回结果 +pub async fn route_jsonrpc(server: Arc, req_str: Value) -> Value { + match req_str { + Value::Object(_) => match server.route_once(req_str).await { + Ok(fut) => fut.await, + Err(err) => err, + }, + Value::Array(array) => { + let localtask = tokio::task::LocalSet::new(); + let share_outputs = Arc::new(Mutex::new(Vec::::new())); + + for each in array { + let inner_server = Arc::downgrade(&server); + let share_outputs = share_outputs.clone(); + + localtask.spawn_local(async move { + // task开始执行是尝试获取server对象 + let output = match inner_server.upgrade() { + Some(server) => match server.route_once(each).await { + Ok(fut) => fut.await, + Err(err) => err, + }, + None => serde_json::to_value(server_route_error()).unwrap(), + }; + + let mut outputs = share_outputs.lock().unwrap(); + outputs.push(output); + }); + } + localtask.await; + + // TODO 内部panic可能要处理 + // outputs Arc持有者只剩下一个,此处取出不会失败,也不考虑失败处理 + let output = if let Ok(outputs) = Arc::try_unwrap(share_outputs) { + // 锁持有者同理 + outputs.into_inner().unwrap() + } else { + panic!("Arc> into_inner failed"); + }; + Value::Array(output) + } + _ => return serde_json::to_value(JsonRpc::error((), JsonRpcError::parse_error())).unwrap(), + } } #[test] -fn test_server() { +fn test_server_simple() { let server = Server::new() .data(ShareStateTest { - a: Mutex::new(99u32), + a: Mutex::new(100u64), b: Mutex::new("abcdefg".to_string()), }) .to("route_b".to_string(), route_b); - let mut runtime = tokio::runtime::Runtime::new().unwrap(); - - runtime.block_on(async move{ - let resp = server.route(json!({ - "jsonrpc": "2.0", - "method": "route_b", - "params": {"err_param": 1}, - "id": 99, - })).await; + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + + runtime.block_on(async move { + let resp = server + .route_once(json!({ + "jsonrpc": "2.0", + "method": "route_b", + "params": {"err_param": 1}, + "id": 99, + })) + .await; assert_eq!( json!({ @@ -89,25 +151,92 @@ fn test_server() { "id":99, "jsonrpc":"2.0" }), - resp + resp.unwrap().await + ); + }); +} + +#[tokio::test] +async fn test_server_route_and_array() { + let server = Arc::new( + Server::new() + .data(ShareStateTest { + a: Mutex::new(100u64), + b: Mutex::new("abcdefg".to_string()), + }) + .to("route_b".to_string(), route_b), + ); + + let tasks = async move { + let resp = route_jsonrpc( + server.clone(), + json!({ + "jsonrpc": "2.0", + "method": "route_b", + "params": {"err_param": 1}, + "id": 99, + }), + ) + .await; + + assert_eq!( + json!({"error":{"code":-32602,"message":"Invalid params"},"id":99,"jsonrpc":"2.0"}) + .to_string(), + resp.to_string() ); - let resp = server.route(json!({ - "jsonrpc": "2.0", - "method": "route_b", - "params": {"a": 99u32, "b": "Test中文", "c": ["tset", "中文"]}, - "id": 2, - })).await; + let resp = route_jsonrpc( + server.clone(), + json!([{ + "jsonrpc": "2.0", + "method": "route_b", + "params": {"err_param": 1}, + "id": 91, + },{ + "jsonrpc": "2.0", + "method": "route_b", + "params": {"a": 8888u64, "b":"_8888_", "c":["c","_string_","_8888_"]}, + "id": 92, + },{ + "jsonrpc": "2.0", + "method": "route_b", + "params": {"a": 8888u64, "b":"_8888_", "c":["c","_string_","_8888_"]}, + "id": 93, + }]), + ) + .await; + + let resp_vec = match resp { + Value::Array(array) => array, + _ => panic!("unexpect error"), + }; + + let ans_91: Vec<&Value> = resp_vec + .iter() + .filter(|&resp| resp["id"].as_u64().unwrap() == 91) + .collect(); + + assert_eq!(1, ans_91.len()); + assert_eq!( + "Invalid params", + ans_91[0]["error"]["message"].as_str().unwrap() + ); - println!("{}", resp); + let ans_9293_a_sum = resp_vec + .iter() + .filter(|&resp| { + resp["id"].as_u64().unwrap() == 92 || resp["id"].as_u64().unwrap() == 93 + }) + .fold(0, |sum, resp| { + sum + serde_json::from_value::(resp["result"].clone()) + .unwrap() + .a + }); - let resp = server.route(json!({ - "jsonrpc": "2.0", - "method": "route_b", - "params": {"a": 99u32, "b": "Test中文", "c": ["tset", "中文"]}, - "id": 3, - })).await; + assert_eq!(8888u64 * 2 + 101 + 102, ans_9293_a_sum); + }; - println!("{}", resp); - }); + let local = tokio::task::LocalSet::new(); + + local.run_until(tasks).await; } From 5725db28936590e668dbc9096decca9004a10fe7 Mon Sep 17 00:00:00 2001 From: xujian Date: Mon, 6 Jul 2020 04:19:38 -0400 Subject: [PATCH 5/8] remove a unused rs file --- src/macros.rs | 205 -------------------------------------------------- 1 file changed, 205 deletions(-) delete mode 100644 src/macros.rs diff --git a/src/macros.rs b/src/macros.rs deleted file mode 100644 index 5ab729b..0000000 --- a/src/macros.rs +++ /dev/null @@ -1,205 +0,0 @@ -use jsonrpc_lite::JsonRpc::{Success, Error}; -use jsonrpc_lite::JsonRpc; -use serde_json::{Value, json}; -use alloc::collections::btree_map::BTreeMap; -use serde::{Serialize, Deserialize}; -use jsonrpc_lite::Error as JsonRpcError; -use jsonrpc_lite::Params as JsonrpcParams; -use std::future::Future; -use crate::data::{DataExtensions, Data}; -use std::sync::Arc; -use std::pin::Pin; -use std::any::{Any, TypeId}; -use crate::data::DataFactory; -use crate::error::WallerError; - - -#[derive(Deserialize, Debug)] -pub struct Request { - pub jsonrpc: String, - pub method: String, - pub params: serde_json::Value, - pub id: i64, -} - -trait Route{ - type OutputT: Future + Clone + Send; - fn route(&self, app_data: Arc, req_param: Value) -> Self::OutputT; -} - -pub struct RouteFn Deserialize<'de>, - R: Serialize, - E: Into >{ - fn_call: Box, P) -> Pin> + Send + 'static>>>, - - app_data: Arc, -} - -impl Deserialize<'de>, -R: Serialize, -E: Into > RouteFn{ - pub fn new( - fn_call: Box, P) -> Pin> + Send + 'static>>>, - app_data: Arc) -> Self { - Self{ - fn_call, - app_data - } - } -} - -impl Route for RouteFn -where - P: for<'de> Deserialize<'de>, - R: Serialize, - E: Into -{ - type OutputT = impl Future + Send; - - fn route(&self, app_data: Arc, req: Request) -> Self::OutputT{ - let fn_call = self.fn_call.clone(); - async move{ - let datad = app_data.get::>().unwrap().clone(); - let p: P = serde_json::from_value(req.params).unwrap(); - match fn_call(datad, p).await { - Ok(result) => serde_json::to_value(JsonRpc::success( - req.id, - &serde_json::to_value(result).unwrap(), - )) - .unwrap(), - Err(err) => { - serde_json::to_value(JsonRpc::error(req.id, err.into())).unwrap() - } - } - } - } -} - -type WalletRouteFn = RouteFn; - - -impl Route for RouteFn -where - D, - P: for<'de> Deserialize<'de>, - R: Serialize, - E: Into, -{ - type Output = impl Future; - - fn route(&self, app_data: Arc, req_param: Value) -> Output{ - async move{ - let datad = params.0.get::>().unwrap().clone(); - let p: P = serde_json::from_value(req_param); - match self.fn_call(datad, p).await { - Ok(result) => serde_json::to_value(JsonRpc::success( - req.id, - &serde_json::to_value(result).unwrap(), - )) - .unwrap(), - Err(err) => { - serde_json::to_value(JsonRpc::error(req.id, err.into())).unwrap() - } - } - } - } -} - -struct Executer{ - routes: BTreeMap, -} - - -impl Executer{ - fn find_route(&self, method: &str) -> Result<&Route, ()> { - if let Some(route) = self.routes.get(method) { - Ok(&route) - } - else{ - return Err(()); - } - } - - fn execute_jsonstr(json_str: &str) -> String { - let json_data = serde_json::from_str(json_str) { - Ok(json_data) => json_data, - Err(_) => return serde_json::to_string(); - }; - - match json_data{ - Value::Object(_) => serde_json::to_string( - self.execute_once( - JsonRpc::parse(json_data).unwrap_or(JsonRpc::error((), JsonrpcError::invalid_request())))), - Value::Array(array) => serde_json::to_string( - array.map(|obj| self.execute_once(JsonRpc::parse_vec(obj). - unwrap_or(JsonRpc::error((), JsonrpcError::invalid_request())))).collect()), - _ => return serde_json::to_string(JsonRpc::error((), JsonrpcError::parse_error())), - } - } - - fn execute_once(&self, req_json: JsonRpc) -> JsonRpc { - let route = match self.find_route(req_json.get_method()){ - Ok(route) => route, - Err(_) => return JsonRpc::error((), JsonrpcError::method_not_found()), - }; - - match route.execute(req_json.get_params()) { - Ok(ans) => ans, - Err(err) => err, - } - } -} - - -macro_rules! json_rpc_dispatch { - ( $( $fn_name:expr => $fn_ident:ident => $param_type:ty ),*) => {{ - let mut json_rpc_dispatcher = BTreeMap::::new(); - - $( - if let Some(_) = json_rpc_dispatcher.get(&$fn_name) { - panic!("json_rpc method {} has registed", $fn_name); - } - json_rpc_dispatcher.insert($fn_name, NewType{a: 1u64, b: 99u64}); - )* - - json_rpc_dispatcher - }}; -} - -// macro_rules! get_fn_name { -// ($fn_ident:block) => { -// format!("{:?}", $fn_ident) -// } -// } - -#[cfg(test)] -mod tests { - use super::*; - use serde::{Serialize, Deserialize}; - use jsonrpc_lite::JsonRpc::{Request, Success, Error}; - use jsonrpc_lite::JsonRpc; - use alloc::string::String; - - #[derive(Serialize, Deserialize)] - struct RequestA { - param_1: String, - param_2: String, - param_3: u32, - } - - fn fn_testa(req_data: RequestA) -> JsonRpc { - JsonRpc::success(99i64, &json!({"ans": req_data.param_2})) - } - - #[test] - fn test_dispatch(){ - trace_macros!(true); - let dispatcher = json_rpc_dispatch!( - "fn_testa".to_string() => fn_testa => RequestA - ); - trace_macros!(false); - - println!("{:?}", dispatcher); - } -} \ No newline at end of file From 861f75b75e66219f3b4023f981d182381e8cf46e Mon Sep 17 00:00:00 2001 From: xujian Date: Tue, 7 Jul 2020 06:16:54 -0400 Subject: [PATCH 6/8] move function route_jsonrpc to file route, modify param type to string --- Cargo.toml | 1 + src/lib.rs | 12 ++++- src/{server.rs => route.rs} | 75 ++++++++++++++++++++++++++++-- tests/test.rs | 91 ++++++++----------------------------- 4 files changed, 101 insertions(+), 78 deletions(-) rename src/{server.rs => route.rs} (61%) diff --git a/Cargo.toml b/Cargo.toml index 75db787..03703a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" jsonrpc-lite = "0.5.0" fxhash = "0.2.1" +futures-util = "0.3.5" [dev-dependencies] tokio = { version = "0.2", features = ["full"] } \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index d64d511..27f8752 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,4 +6,14 @@ pub use data::Data; mod factory; -pub mod server; +pub mod route; + +use jsonrpc_lite::Error as JsonRpcError; + +fn server_route_error() -> JsonRpcError { + JsonRpcError { + code: -32500, + message: "Server Internal Route error".to_string(), + data: None, + } +} \ No newline at end of file diff --git a/src/server.rs b/src/route.rs similarity index 61% rename from src/server.rs rename to src/route.rs index 143e952..bb3be2c 100644 --- a/src/server.rs +++ b/src/route.rs @@ -7,7 +7,10 @@ use serde_json::Value; use std::collections::HashMap; use std::future::Future; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use futures_util::future::join_all; +use crate::server_route_error; + #[derive(Deserialize, Debug)] pub struct Request { @@ -17,7 +20,7 @@ pub struct Request { pub id: i64, } -pub struct Server { +pub struct Route { map: HashMap< String, Box, Request) -> Pin + Send>>>, @@ -25,9 +28,13 @@ pub struct Server { extensions: Arc, } -impl Server { +unsafe impl Sync for Route {} + +unsafe impl Send for Route {} + +impl Route { pub fn new() -> Self { - Server { + Route { map: HashMap::new(), extensions: Arc::new(DataExtensions::default()), } @@ -113,3 +120,63 @@ impl Server { Ok(handle(self.extensions.clone(), req)) } } + +/// 传入jsonrpc请求 +/// 返回结果 +pub async fn route_jsonrpc(server: Arc, req_str: &str) -> String { + let req: Value = match serde_json::from_str(req_str) { + Ok(req) => req, + Err(_) => { + return serde_json::to_value(JsonRpc::error((), JsonRpcError::parse_error())) + .unwrap() + .to_string() + } + }; + let resp = match req { + Value::Object(_) => match server.route_once(req).await { + Ok(fut) => fut.await, + Err(err) => err, + }, + Value::Array(array) => { + let share_outputs = Arc::new(Mutex::new(Vec::::new())); + let mut tasks = Vec::new(); + + for each in array { + let inner_server = Arc::downgrade(&server); + let share_outputs = share_outputs.clone(); + + tasks.push(async move { + // task开始执行是尝试获取server对象 + let output = match inner_server.upgrade() { + Some(server) => match server.route_once(each).await { + Ok(fut) => fut.await, + Err(err) => err, + }, + None => serde_json::to_value(server_route_error()).unwrap(), + }; + + let mut outputs = share_outputs.lock().unwrap(); + outputs.push(output); + }); + } + join_all(tasks).await; + + // TODO 内部panic可能要处理 + // outputs Arc持有者只剩下一个,此处取出不会失败,也不考虑失败处理 + let output = if let Ok(outputs) = Arc::try_unwrap(share_outputs) { + // 锁持有者同理 + outputs.into_inner().unwrap() + } else { + panic!("Arc> into_inner failed"); + }; + Value::Array(output) + } + _ => { + return serde_json::to_value(JsonRpc::error((), JsonRpcError::parse_error())) + .unwrap() + .to_string() + } + }; + + resp.to_string() +} \ No newline at end of file diff --git a/tests/test.rs b/tests/test.rs index 7c766ca..a984ca8 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,12 +1,13 @@ use jsonrpc_lite::Error as JsonRpcError; -use jsonrpc_lite::JsonRpc; -use jsonrpc_ws::server::Server; +use jsonrpc_ws::route::Route; use jsonrpc_ws::Data; use serde::{Deserialize, Serialize}; use serde_json::json; use serde_json::Value; use std::sync::{Arc, Mutex}; use tokio::time::{self, Duration}; +use jsonrpc_ws::route::route_jsonrpc; + #[derive(Debug)] pub struct ShareStateTest { @@ -67,63 +68,9 @@ async fn time_sleep(timeout_ms: u64) { time::delay_for(Duration::from_millis(timeout_ms.into())).await; } -fn server_route_error() -> JsonRpcError { - JsonRpcError { - code: -32500, - message: "Server Internal Route error".to_string(), - data: None, - } -} - -/// 传入jsonrpc请求 -/// 返回结果 -pub async fn route_jsonrpc(server: Arc, req_str: Value) -> Value { - match req_str { - Value::Object(_) => match server.route_once(req_str).await { - Ok(fut) => fut.await, - Err(err) => err, - }, - Value::Array(array) => { - let localtask = tokio::task::LocalSet::new(); - let share_outputs = Arc::new(Mutex::new(Vec::::new())); - - for each in array { - let inner_server = Arc::downgrade(&server); - let share_outputs = share_outputs.clone(); - - localtask.spawn_local(async move { - // task开始执行是尝试获取server对象 - let output = match inner_server.upgrade() { - Some(server) => match server.route_once(each).await { - Ok(fut) => fut.await, - Err(err) => err, - }, - None => serde_json::to_value(server_route_error()).unwrap(), - }; - - let mut outputs = share_outputs.lock().unwrap(); - outputs.push(output); - }); - } - localtask.await; - - // TODO 内部panic可能要处理 - // outputs Arc持有者只剩下一个,此处取出不会失败,也不考虑失败处理 - let output = if let Ok(outputs) = Arc::try_unwrap(share_outputs) { - // 锁持有者同理 - outputs.into_inner().unwrap() - } else { - panic!("Arc> into_inner failed"); - }; - Value::Array(output) - } - _ => return serde_json::to_value(JsonRpc::error((), JsonRpcError::parse_error())).unwrap(), - } -} - #[test] fn test_server_simple() { - let server = Server::new() + let route = Route::new() .data(ShareStateTest { a: Mutex::new(100u64), b: Mutex::new("abcdefg".to_string()), @@ -133,7 +80,7 @@ fn test_server_simple() { let mut runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(async move { - let resp = server + let resp = route .route_once(json!({ "jsonrpc": "2.0", "method": "route_b", @@ -158,8 +105,8 @@ fn test_server_simple() { #[tokio::test] async fn test_server_route_and_array() { - let server = Arc::new( - Server::new() + let route = Arc::new( + Route::new() .data(ShareStateTest { a: Mutex::new(100u64), b: Mutex::new("abcdefg".to_string()), @@ -168,16 +115,16 @@ async fn test_server_route_and_array() { ); let tasks = async move { - let resp = route_jsonrpc( - server.clone(), - json!({ + let resp: Value = serde_json::from_str(&route_jsonrpc( + route.clone(), + &json!({ "jsonrpc": "2.0", "method": "route_b", "params": {"err_param": 1}, "id": 99, - }), + }).to_string(), ) - .await; + .await).unwrap(); assert_eq!( json!({"error":{"code":-32602,"message":"Invalid params"},"id":99,"jsonrpc":"2.0"}) @@ -185,9 +132,9 @@ async fn test_server_route_and_array() { resp.to_string() ); - let resp = route_jsonrpc( - server.clone(), - json!([{ + let resp: Value = serde_json::from_str(&route_jsonrpc( + route.clone(), + &json!([{ "jsonrpc": "2.0", "method": "route_b", "params": {"err_param": 1}, @@ -202,9 +149,9 @@ async fn test_server_route_and_array() { "method": "route_b", "params": {"a": 8888u64, "b":"_8888_", "c":["c","_string_","_8888_"]}, "id": 93, - }]), + }]).to_string(), ) - .await; + .await).unwrap(); let resp_vec = match resp { Value::Array(array) => array, @@ -236,7 +183,5 @@ async fn test_server_route_and_array() { assert_eq!(8888u64 * 2 + 101 + 102, ans_9293_a_sum); }; - let local = tokio::task::LocalSet::new(); - - local.run_until(tasks).await; + tokio::spawn(tasks).await.unwrap(); } From 8e0f89e9c2c2c8baaeb5cad161f83a7273169534 Mon Sep 17 00:00:00 2001 From: xujian Date: Wed, 8 Jul 2020 00:36:34 -0400 Subject: [PATCH 7/8] commit websock part as a subproject --- Cargo.toml | 12 +- jsonrpc-websock/Cargo.toml | 28 ++++ jsonrpc-websock/examples/example_client.rs | 172 +++++++++++++++++++++ jsonrpc-websock/examples/example_server.rs | 106 +++++++++++++ jsonrpc-websock/src/lib.rs | 2 + jsonrpc-websock/src/server.rs | 121 +++++++++++++++ 6 files changed, 440 insertions(+), 1 deletion(-) create mode 100644 jsonrpc-websock/Cargo.toml create mode 100644 jsonrpc-websock/examples/example_client.rs create mode 100644 jsonrpc-websock/examples/example_server.rs create mode 100644 jsonrpc-websock/src/lib.rs create mode 100644 jsonrpc-websock/src/server.rs diff --git a/Cargo.toml b/Cargo.toml index 03703a0..756ff8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,16 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +name = "jsonrpc_ws" +path = "src/lib.rs" + +[workspace] +members = [ + ".", + "jsonrpc-websock", +] + [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -14,4 +24,4 @@ fxhash = "0.2.1" futures-util = "0.3.5" [dev-dependencies] -tokio = { version = "0.2", features = ["full"] } \ No newline at end of file +tokio = { version = "0.2", features = ["full"] } diff --git a/jsonrpc-websock/Cargo.toml b/jsonrpc-websock/Cargo.toml new file mode 100644 index 0000000..312f1dc --- /dev/null +++ b/jsonrpc-websock/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "jsonrpc-websock" +version = "0.1.0" +authors = ["xujian "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +name = "jsonrpc_websock" +path = "src/lib.rs" + + +[dependencies] +tokio = { version = "0.2", features = ["full"] } +chrono = "0.4.11" +tokio-tungstenite = "0.10.1" +futures-util = { version = "0.3", default-features = false, features = ["async-await", "sink", "std"] } +url = "2.0.0" +jsonrpc-lite = "0.5.0" +jsonrpc-ws = { path = "../../jsonrpc-ws" } +log = "0.4.8" + + +[dev-dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +env_logger = "0.7" \ No newline at end of file diff --git a/jsonrpc-websock/examples/example_client.rs b/jsonrpc-websock/examples/example_client.rs new file mode 100644 index 0000000..3c16dea --- /dev/null +++ b/jsonrpc-websock/examples/example_client.rs @@ -0,0 +1,172 @@ +use futures_util::stream::{SplitSink, SplitStream}; +use futures_util::{SinkExt, StreamExt}; +use std::env; +use std::{ + future::Future, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll, Waker}, + thread, + time::Duration, +}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::{spawn, time}; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::WebSocketStream; +use url::Url; + +static LOCAL_SERVER: &'static str = "ws://127.0.0.1:9000"; + +const RECONN_INTERVAL: u64 = 3000; + +struct WebSockWriteHalf(pub Option, Message>>); +struct WebSockReadHalf(pub Option>>); + +async fn set_conn_none( + lock_ws_receiver: Arc>, + lock_ws_sender: Arc>, +) -> bool { + let mut ws_receiver = lock_ws_receiver.lock().unwrap(); + let mut ws_sender = lock_ws_sender.lock().unwrap(); + + ws_receiver.0 = None; + ws_sender.0 = None; + return true; +} + +async fn client_check_conn( + case_url: Url, + lock_ws_receiver: Arc>, + lock_ws_sender: Arc>, +) -> bool { + let ws_receiver = lock_ws_receiver.lock().unwrap(); + + if let None = ws_receiver.0 { + drop(ws_receiver); + + if let Ok((ws_stream, _)) = connect_async(case_url).await { + let (sender, receiver) = ws_stream.split(); + let mut ws_receiver = lock_ws_receiver.lock().unwrap(); + let mut ws_sender = lock_ws_sender.lock().unwrap(); + + ws_sender.0 = Some(sender); + ws_receiver.0 = Some(receiver); + log::info!("connect success"); + return true; + } else { + log::info!("connect fail, reconning ..."); + return false; + } + } + return true; +} + +async fn receiver_loop( + case_url: Url, + lock_ws_receiver: Arc>, + lock_ws_sender: Arc>, +) { + loop { + let mut ws_receiver = lock_ws_receiver.lock().unwrap(); + + let result: Result = match &mut ws_receiver.0 { + Some(ws_receiver) => match ws_receiver.next().await { + Some(Ok(msg)) => { + if msg.is_text() { + Ok(msg.into_text().unwrap()) + } else { + log::warn!("Peer receive data format error, not text"); + Err(false) + } + } + Some(Err(err)) => { + log::warn!("server close connect"); + Err(true) + } + None => Err(true), + }, + None => Err(true), + }; + drop(ws_receiver); + + match result { + Ok(msg) => { + println!("resp: {}", msg); + } + Err(is_reconn) => { + if is_reconn { + set_conn_none(lock_ws_receiver.clone(), lock_ws_sender.clone()).await; + if client_check_conn( + case_url.clone(), + lock_ws_receiver.clone(), + lock_ws_sender.clone(), + ) + .await + { + log::info!("re_conn: {}", case_url); + continue; + } else { + time::delay_for(Duration::from_millis(RECONN_INTERVAL)).await; + } + } + } + } + } +} + +async fn ws_send(str_cmd: String, lock_ws_sender: Arc>) { + let mut ws_sender = match lock_ws_sender.try_lock() { + Ok(ws_sender) => ws_sender, + Err(_) => { + time::delay_for(Duration::from_millis(100)).await; + + log::warn!("ws_stream close, skip send"); + return; + } + }; + + if let Some(ws_sender) = &mut ws_sender.0 { + if let Err(err) = ws_sender.send(Message::Text(str_cmd)).await { + log::warn!("ws_stream send failed with err: {}", err); + } + } else { + log::warn!("ws_stream close, skip send"); + } +} + +#[tokio::main] +async fn main() { + use env_logger::Env; + env_logger::from_env(Env::default().default_filter_or("warn")).init(); + + let connect_transport = env::args() + .nth(1) + .unwrap_or_else(|| LOCAL_SERVER.to_string()); + + let case_url = Url::parse(&connect_transport).expect("Bad testcase URL"); + + let lock_ws_receiver = Arc::new(Mutex::new(WebSockReadHalf(None))); + let lock_ws_sender = Arc::new(Mutex::new(WebSockWriteHalf(None))); + + let local = tokio::task::LocalSet::new(); + local + .run_until(async move { + tokio::task::spawn_local(receiver_loop( + case_url, + lock_ws_receiver.clone(), + lock_ws_sender.clone(), + )); + + let mut reader = BufReader::new(tokio::io::stdin()); + loop { + let mut str_cmd = String::new(); + reader.read_line(&mut str_cmd).await.unwrap(); + str_cmd.pop(); + + ws_send(str_cmd, lock_ws_sender.clone()).await; + } + }) + .await; +} diff --git a/jsonrpc-websock/examples/example_server.rs b/jsonrpc-websock/examples/example_server.rs new file mode 100644 index 0000000..57de238 --- /dev/null +++ b/jsonrpc-websock/examples/example_server.rs @@ -0,0 +1,106 @@ +extern crate jsonrpc_websock; + +use jsonrpc_lite::Error as JsonRpcError; +use jsonrpc_websock::WsServer; +use jsonrpc_ws::route::Route; +use jsonrpc_ws::Data; +use serde::{Deserialize, Serialize}; +use std::env; +use std::sync::{Arc, RwLock}; + +#[derive(Serialize)] +pub enum ExampleError { + // websock 错误 + ParamIsNone, +} + +impl Into for ExampleError { + fn into(self) -> JsonRpcError { + let (code, message) = match self { + ParamIsNone => (1000i64, "Param is none"), + _ => (9999i64, "Unexpect error"), + }; + + JsonRpcError { + code, + message: message.to_string(), + data: None, + } + } +} + +#[derive(Serialize, Deserialize)] +pub struct CurrencyDetail { + value: u64, + id: String, + dcds: String, + locked: bool, + owner: String, +} + +pub struct CurrencyStore {} +impl CurrencyStore { + pub fn get_detail_by_ids( + &self, + req: GetDetailParam, + ) -> Result, ExampleError> { + if req.ids.len() == 0 { + return Err(ExampleError::ParamIsNone); + } + Ok(Vec::::new()) + } +} + +#[derive(Serialize, Deserialize)] +pub struct GetDetailParam { + ids: Vec, +} + +pub async fn get_detail_by_ids( + wallet: Data, + req: GetDetailParam, +) -> Result, ExampleError> { + let store = wallet.get_ref().store.try_read().unwrap(); + store.get_detail_by_ids(req) +} + +pub struct TSystem { + pub store: RwLock, +} + +impl TSystem { + pub fn new() -> Self { + Self { + store: RwLock::new(CurrencyStore {}), + } + } +} + +pub async fn start_ws_server(bind_transport: String) { + let route: Arc = Arc::new( + Route::new() + .data(TSystem::new()) + .to("currency.ids.detail".to_string(), get_detail_by_ids), + ); + + let ws_server = match WsServer::bind(bind_transport).await { + Ok(ws_server) => ws_server, + Err(err) => panic!("{}", err), + }; + + ws_server.listen_loop(route).await; +} + +static LOCAL_SERVER: &'static str = "127.0.0.1:9000"; + +#[tokio::main] +async fn main() { + use env_logger::Env; + env_logger::from_env(Env::default().default_filter_or("warn")).init(); + + let bind_transport = env::args() + .nth(1) + .unwrap_or_else(|| LOCAL_SERVER.to_string()); + + start_ws_server(bind_transport).await; +} diff --git a/jsonrpc-websock/src/lib.rs b/jsonrpc-websock/src/lib.rs new file mode 100644 index 0000000..bfa8fa5 --- /dev/null +++ b/jsonrpc-websock/src/lib.rs @@ -0,0 +1,2 @@ +mod server; +pub use server::WsServer; diff --git a/jsonrpc-websock/src/server.rs b/jsonrpc-websock/src/server.rs new file mode 100644 index 0000000..d29794c --- /dev/null +++ b/jsonrpc-websock/src/server.rs @@ -0,0 +1,121 @@ +use futures_util::stream::{SplitSink, SplitStream}; +use futures_util::{SinkExt, StreamExt}; +use jsonrpc_ws::route::{route_jsonrpc, Route}; +use std::sync::Arc; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::mpsc; +use tokio_tungstenite::accept_async; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::WebSocketStream; + +type WebSockWriteHalf = SplitSink, Message>; +type WebSockReadHalf = SplitStream>; + +const REQ_QUEUE_LEN: usize = 10; + +pub struct WsServer { + bind_transport: String, + listener: TcpListener, +} + +impl WsServer { + pub async fn bind(bind_transport: String) -> Result { + let listener = TcpListener::bind(&bind_transport) + .await + .map_err(|err| err.to_string())?; + + log::info!("Listening on: {}", &bind_transport); + + let instance = Self { + bind_transport, + listener, + }; + + Ok(instance) + } + + pub async fn listen_loop(mut self, route: Arc) { + while let Ok((stream, _)) = self.listener.accept().await { + let route_ = route.clone(); + tokio::spawn(async move { + if let Err(err) = Self::client_loop(stream, route_).await { + log::warn!("{}", err); + } + }); + } + } + + async fn client_loop(stream: TcpStream, route: Arc) -> Result<(), String> { + let peer = stream + .peer_addr() + .map_err(|err| format!("get client peer_addr error, with info: {}", err))?; + + let mut ws_stream = accept_async(stream) + .await + .map_err(|err| format!("ws_stream accept error, with info: {}", err))?; + + log::info!("client {} connect", peer); + let (mut write_half, mut read_half) = ws_stream.split(); + + let (mut req_pipe_in, mut req_pipe_out) = mpsc::channel(REQ_QUEUE_LEN); + let (mut resp_pipe_in, mut resp_pipe_out) = mpsc::channel(REQ_QUEUE_LEN); + + tokio::select! { + _ = Self::dispatch_loop(route, req_pipe_out, resp_pipe_in) => { + log::info!("client {} close because dispatch_loop", peer); + }, + _ = Self::read_half_loop(read_half, req_pipe_in) => { + log::info!("client {} close because read_half", peer); + }, + _ = Self::write_half_loop(write_half, resp_pipe_out) => { + log::info!("client {} close because write_half", peer); + }, + }; + + Ok(()) + } + + async fn dispatch_loop( + route: Arc, + mut req_pipe: mpsc::Receiver, + mut resp_pipe: mpsc::Sender, + ) { + while let Some(req_str) = req_pipe.recv().await { + let route_ = route.clone(); + let resp_str = route_jsonrpc(route_, &req_str).await; + if let Err(_) = resp_pipe.send(resp_str).await { + // 处理完客户端已断开,忽略 + return; + } + } + } + + async fn read_half_loop(mut read_half: WebSockReadHalf, mut req_pipe_in: mpsc::Sender) { + while let Some(ans) = read_half.next().await { + match ans { + Err(err) => { + return; + } + Ok(Message::Text(msg_str)) => { + if let Err(_) = req_pipe_in.send(msg_str).await { + return; + } + } + Ok(Message::Ping(_)) => log::debug!("recv message ping/pong"), + Ok(Message::Pong(_)) => log::debug!("recv message ping/pong"), + Ok(_) => log::debug!("data format not String, ignore this item"), + } + } + } + + async fn write_half_loop( + mut write_half: WebSockWriteHalf, + mut resp_pipe_out: mpsc::Receiver, + ) { + while let Some(msg_str) = resp_pipe_out.recv().await { + if let Err(_) = write_half.send(Message::Text(msg_str)).await { + return; + } + } + } +} From 0dd324f43565ff4979b0ece943964b89cac64700 Mon Sep 17 00:00:00 2001 From: xujian Date: Wed, 8 Jul 2020 04:18:41 -0400 Subject: [PATCH 8/8] format code and rename lib name --- Cargo.toml | 6 +- .../Cargo.toml | 6 +- .../examples/example_client.rs | 10 +-- .../examples/example_server.rs | 11 ++- .../src/lib.rs | 0 .../src/server.rs | 18 ++--- src/lib.rs | 2 +- src/route.rs | 7 +- tests/test.rs | 71 ++++++++++--------- 9 files changed, 64 insertions(+), 67 deletions(-) rename {jsonrpc-websock => jsonrpc-websocket}/Cargo.toml (86%) rename {jsonrpc-websock => jsonrpc-websocket}/examples/example_client.rs (96%) rename {jsonrpc-websock => jsonrpc-websocket}/examples/example_server.rs (91%) rename {jsonrpc-websock => jsonrpc-websocket}/src/lib.rs (100%) rename {jsonrpc-websock => jsonrpc-websocket}/src/server.rs (88%) diff --git a/Cargo.toml b/Cargo.toml index 756ff8d..85d5929 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "jsonrpc-ws" +name = "jsonrpc" version = "0.1.0" authors = ["tiannian "] edition = "2018" @@ -7,13 +7,13 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] -name = "jsonrpc_ws" +name = "jsonrpc" path = "src/lib.rs" [workspace] members = [ ".", - "jsonrpc-websock", + "jsonrpc-websocket", ] [dependencies] diff --git a/jsonrpc-websock/Cargo.toml b/jsonrpc-websocket/Cargo.toml similarity index 86% rename from jsonrpc-websock/Cargo.toml rename to jsonrpc-websocket/Cargo.toml index 312f1dc..0df1174 100644 --- a/jsonrpc-websock/Cargo.toml +++ b/jsonrpc-websocket/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "jsonrpc-websock" +name = "jsonrpc-websocket" version = "0.1.0" authors = ["xujian "] edition = "2018" @@ -7,7 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] -name = "jsonrpc_websock" +name = "jsonrpc_websocket" path = "src/lib.rs" @@ -18,7 +18,7 @@ tokio-tungstenite = "0.10.1" futures-util = { version = "0.3", default-features = false, features = ["async-await", "sink", "std"] } url = "2.0.0" jsonrpc-lite = "0.5.0" -jsonrpc-ws = { path = "../../jsonrpc-ws" } +jsonrpc = { path = "../../jsonrpc-ws" } log = "0.4.8" diff --git a/jsonrpc-websock/examples/example_client.rs b/jsonrpc-websocket/examples/example_client.rs similarity index 96% rename from jsonrpc-websock/examples/example_client.rs rename to jsonrpc-websocket/examples/example_client.rs index 3c16dea..ce87788 100644 --- a/jsonrpc-websock/examples/example_client.rs +++ b/jsonrpc-websocket/examples/example_client.rs @@ -2,16 +2,12 @@ use futures_util::stream::{SplitSink, SplitStream}; use futures_util::{SinkExt, StreamExt}; use std::env; use std::{ - future::Future, - pin::Pin, sync::{Arc, Mutex}, - task::{Context, Poll, Waker}, - thread, time::Duration, }; use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::net::{TcpListener, TcpStream}; -use tokio::{spawn, time}; +use tokio::net::TcpStream; +use tokio::time; use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::WebSocketStream; @@ -81,7 +77,7 @@ async fn receiver_loop( Err(false) } } - Some(Err(err)) => { + Some(Err(_)) => { log::warn!("server close connect"); Err(true) } diff --git a/jsonrpc-websock/examples/example_server.rs b/jsonrpc-websocket/examples/example_server.rs similarity index 91% rename from jsonrpc-websock/examples/example_server.rs rename to jsonrpc-websocket/examples/example_server.rs index 57de238..eb8d188 100644 --- a/jsonrpc-websock/examples/example_server.rs +++ b/jsonrpc-websocket/examples/example_server.rs @@ -1,9 +1,9 @@ -extern crate jsonrpc_websock; +extern crate jsonrpc_websocket; +use jsonrpc::route::Route; +use jsonrpc::Data; use jsonrpc_lite::Error as JsonRpcError; -use jsonrpc_websock::WsServer; -use jsonrpc_ws::route::Route; -use jsonrpc_ws::Data; +use jsonrpc_websocket::WsServer; use serde::{Deserialize, Serialize}; use std::env; use std::sync::{Arc, RwLock}; @@ -17,8 +17,7 @@ pub enum ExampleError { impl Into for ExampleError { fn into(self) -> JsonRpcError { let (code, message) = match self { - ParamIsNone => (1000i64, "Param is none"), - _ => (9999i64, "Unexpect error"), + ExampleError::ParamIsNone => (1000i64, "Param is none"), }; JsonRpcError { diff --git a/jsonrpc-websock/src/lib.rs b/jsonrpc-websocket/src/lib.rs similarity index 100% rename from jsonrpc-websock/src/lib.rs rename to jsonrpc-websocket/src/lib.rs diff --git a/jsonrpc-websock/src/server.rs b/jsonrpc-websocket/src/server.rs similarity index 88% rename from jsonrpc-websock/src/server.rs rename to jsonrpc-websocket/src/server.rs index d29794c..3ab62e4 100644 --- a/jsonrpc-websock/src/server.rs +++ b/jsonrpc-websocket/src/server.rs @@ -1,6 +1,6 @@ use futures_util::stream::{SplitSink, SplitStream}; use futures_util::{SinkExt, StreamExt}; -use jsonrpc_ws::route::{route_jsonrpc, Route}; +use jsonrpc::route::{route_jsonrpc, Route}; use std::sync::Arc; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc; @@ -14,7 +14,6 @@ type WebSockReadHalf = SplitStream>; const REQ_QUEUE_LEN: usize = 10; pub struct WsServer { - bind_transport: String, listener: TcpListener, } @@ -26,10 +25,7 @@ impl WsServer { log::info!("Listening on: {}", &bind_transport); - let instance = Self { - bind_transport, - listener, - }; + let instance = Self { listener }; Ok(instance) } @@ -50,15 +46,15 @@ impl WsServer { .peer_addr() .map_err(|err| format!("get client peer_addr error, with info: {}", err))?; - let mut ws_stream = accept_async(stream) + let ws_stream = accept_async(stream) .await .map_err(|err| format!("ws_stream accept error, with info: {}", err))?; log::info!("client {} connect", peer); - let (mut write_half, mut read_half) = ws_stream.split(); + let (write_half, read_half) = ws_stream.split(); - let (mut req_pipe_in, mut req_pipe_out) = mpsc::channel(REQ_QUEUE_LEN); - let (mut resp_pipe_in, mut resp_pipe_out) = mpsc::channel(REQ_QUEUE_LEN); + let (req_pipe_in, req_pipe_out) = mpsc::channel(REQ_QUEUE_LEN); + let (resp_pipe_in, resp_pipe_out) = mpsc::channel(REQ_QUEUE_LEN); tokio::select! { _ = Self::dispatch_loop(route, req_pipe_out, resp_pipe_in) => { @@ -93,7 +89,7 @@ impl WsServer { async fn read_half_loop(mut read_half: WebSockReadHalf, mut req_pipe_in: mpsc::Sender) { while let Some(ans) = read_half.next().await { match ans { - Err(err) => { + Err(_) => { return; } Ok(Message::Text(msg_str)) => { diff --git a/src/lib.rs b/src/lib.rs index 27f8752..f1eff90 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,4 +16,4 @@ fn server_route_error() -> JsonRpcError { message: "Server Internal Route error".to_string(), data: None, } -} \ No newline at end of file +} diff --git a/src/route.rs b/src/route.rs index bb3be2c..2a7ec41 100644 --- a/src/route.rs +++ b/src/route.rs @@ -1,5 +1,7 @@ use crate::data::DataFactory; use crate::data::{Data, DataExtensions}; +use crate::server_route_error; +use futures_util::future::join_all; use jsonrpc_lite::Error as JsonRpcError; use jsonrpc_lite::JsonRpc; use serde::{Deserialize, Serialize}; @@ -8,9 +10,6 @@ use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use std::sync::{Arc, Mutex}; -use futures_util::future::join_all; -use crate::server_route_error; - #[derive(Deserialize, Debug)] pub struct Request { @@ -179,4 +178,4 @@ pub async fn route_jsonrpc(server: Arc, req_str: &str) -> String { }; resp.to_string() -} \ No newline at end of file +} diff --git a/tests/test.rs b/tests/test.rs index a984ca8..f34e4bd 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,13 +1,12 @@ +use jsonrpc::route::route_jsonrpc; +use jsonrpc::route::Route; +use jsonrpc::Data; use jsonrpc_lite::Error as JsonRpcError; -use jsonrpc_ws::route::Route; -use jsonrpc_ws::Data; use serde::{Deserialize, Serialize}; use serde_json::json; use serde_json::Value; use std::sync::{Arc, Mutex}; use tokio::time::{self, Duration}; -use jsonrpc_ws::route::route_jsonrpc; - #[derive(Debug)] pub struct ShareStateTest { @@ -115,16 +114,20 @@ async fn test_server_route_and_array() { ); let tasks = async move { - let resp: Value = serde_json::from_str(&route_jsonrpc( - route.clone(), - &json!({ - "jsonrpc": "2.0", - "method": "route_b", - "params": {"err_param": 1}, - "id": 99, - }).to_string(), + let resp: Value = serde_json::from_str( + &route_jsonrpc( + route.clone(), + &json!({ + "jsonrpc": "2.0", + "method": "route_b", + "params": {"err_param": 1}, + "id": 99, + }) + .to_string(), + ) + .await, ) - .await).unwrap(); + .unwrap(); assert_eq!( json!({"error":{"code":-32602,"message":"Invalid params"},"id":99,"jsonrpc":"2.0"}) @@ -132,26 +135,30 @@ async fn test_server_route_and_array() { resp.to_string() ); - let resp: Value = serde_json::from_str(&route_jsonrpc( - route.clone(), - &json!([{ - "jsonrpc": "2.0", - "method": "route_b", - "params": {"err_param": 1}, - "id": 91, - },{ - "jsonrpc": "2.0", - "method": "route_b", - "params": {"a": 8888u64, "b":"_8888_", "c":["c","_string_","_8888_"]}, - "id": 92, - },{ - "jsonrpc": "2.0", - "method": "route_b", - "params": {"a": 8888u64, "b":"_8888_", "c":["c","_string_","_8888_"]}, - "id": 93, - }]).to_string(), + let resp: Value = serde_json::from_str( + &route_jsonrpc( + route.clone(), + &json!([{ + "jsonrpc": "2.0", + "method": "route_b", + "params": {"err_param": 1}, + "id": 91, + },{ + "jsonrpc": "2.0", + "method": "route_b", + "params": {"a": 8888u64, "b":"_8888_", "c":["c","_string_","_8888_"]}, + "id": 92, + },{ + "jsonrpc": "2.0", + "method": "route_b", + "params": {"a": 8888u64, "b":"_8888_", "c":["c","_string_","_8888_"]}, + "id": 93, + }]) + .to_string(), + ) + .await, ) - .await).unwrap(); + .unwrap(); let resp_vec = match resp { Value::Array(array) => array,