From f7aab6cb4346e3b789e951ac8ee3b42515071dec Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 23 Apr 2019 11:17:16 -0700 Subject: [PATCH] Initial commit --- .gitignore | 3 ++ Cargo.toml | 20 ++++++++++++++ LICENSE | 25 +++++++++++++++++ README.md | 14 ++++++++++ src/http-client.rs | 63 ++++++++++++++++++++++++++++++++++++++++++ src/http-server.rs | 68 ++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 193 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 src/http-client.rs create mode 100644 src/http-server.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6936990 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target +**/*.rs.bk +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..09bffa8 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "tower-examples" +version = "0.1.0" +authors = ["Carl Lerche "] +edition = "2018" + +[[bin]] +name = "http-client" +path = "src/http-client.rs" + +[[bin]] +name = "http-server" +path = "src/http-server.rs" + +[dependencies] +futures = "0.1.26" +hyper = "0.12.27" +tokio = "0.1.19" +tower = { git = "https://github.com/tower-rs/tower" } +tower-hyper = { git = "https://github.com/tower-rs/tower-hyper" } diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..b980cac --- /dev/null +++ b/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2019 Tower Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..7f0fd23 --- /dev/null +++ b/README.md @@ -0,0 +1,14 @@ +# Examples of using Tower + +- [`http-client`](src/http-client.rs) - An HTTP client. +- [`http-server`](src/http-server.rs) - An HTTP server. + +## License + +This project is licensed under the [MIT license](LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tower by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/src/http-client.rs b/src/http-client.rs new file mode 100644 index 0000000..52f4f64 --- /dev/null +++ b/src/http-client.rs @@ -0,0 +1,63 @@ +use futures::Future; +use hyper::{ + client::{connect::Destination, HttpConnector}, + Request, Response, Uri, +}; +use std::time::Duration; +use tower::{builder::ServiceBuilder, reconnect::Reconnect, Service, ServiceExt}; +use tower_hyper::{ + client::{Builder, Connect}, + retry::{Body, RetryPolicy}, + util::Connector, +}; + +fn main() { + let fut = futures::lazy(|| { + request().map(|resp| { + dbg!(resp); + }) + }); + hyper::rt::run(fut) +} + +fn request() -> impl Future, Error = ()> { + let connector = Connector::new(HttpConnector::new(1)); + let hyper = Connect::new(connector, Builder::new()); + + // RetryPolicy is a very simple policy that retries `n` times + // if the response has a 500 status code. Here, `n` is 5. + let policy = RetryPolicy::new(5); + // We're calling the tower/examples/server.rs. + let dst = Destination::try_from_uri(Uri::from_static("http://127.0.0.1:3000")).unwrap(); + + // Now, to build the service! We use two BufferLayers in order to: + // - provide backpressure for the RateLimitLayer, and ConcurrencyLimitLayer + // - meet `RetryLayer`'s requirement that our service implement `Service + Clone` + // - ..and to provide cheap clones on the service. + let maker = ServiceBuilder::new() + .buffer(5) + .rate_limit(5, Duration::from_secs(1)) + .concurrency_limit(5) + .retry(policy) + .buffer(5) + .make_service(hyper); + + // `Reconnect` accepts a destination and a MakeService, creating a new service + // any time the connection encounters an error. + let client = Reconnect::new(maker, dst); + + let request = Request::builder() + .method("GET") + .body(Body::from(Vec::new())) + .unwrap(); + + // we check to see if the client is ready to accept requests. + client + .ready() + .map_err(|e| panic!("Service is not ready: {:?}", e)) + .and_then(|mut c| { + c.call(request) + .map(|res| res.map(|b| b.into_inner())) + .map_err(|e| panic!("{:?}", e)) + }) +} diff --git a/src/http-server.rs b/src/http-server.rs new file mode 100644 index 0000000..c6e0f41 --- /dev/null +++ b/src/http-server.rs @@ -0,0 +1,68 @@ +use futures::{future, Future, Poll, Stream}; +use hyper::{self, Body, Request, Response}; +use tokio::net::TcpListener; +use tower::{builder::ServiceBuilder, Service}; +use tower_hyper::{body::LiftBody, server::Server}; + +fn main() { + hyper::rt::run(future::lazy(|| { + let addr = "127.0.0.1:3000".parse().unwrap(); + let bind = TcpListener::bind(&addr).expect("bind"); + + println!("Listening on http://{}", addr); + + let maker = ServiceBuilder::new() + .concurrency_limit(5) + .make_service(MakeSvc); + + let server = Server::new(maker); + + bind.incoming() + .fold(server, |mut server, stream| { + if let Err(e) = stream.set_nodelay(true) { + return Err(e); + } + + hyper::rt::spawn( + server + .serve(stream) + .map_err(|e| panic!("Server error {:?}", e)), + ); + + Ok(server) + }) + .map_err(|e| panic!("serve error: {:?}", e)) + .map(|_| {}) + })); +} + +struct Svc; +impl Service>> for Svc { + type Response = Response<&'static str>; + type Error = hyper::Error; + type Future = future::FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(().into()) + } + + fn call(&mut self, _req: Request>) -> Self::Future { + let res = Response::new("Hello World!"); + future::ok(res) + } +} + +struct MakeSvc; +impl Service<()> for MakeSvc { + type Response = Svc; + type Error = hyper::Error; + type Future = Box + Send + 'static>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(().into()) + } + + fn call(&mut self, _: ()) -> Self::Future { + Box::new(future::ok(Svc)) + } +}