diff --git a/README.md b/README.md index 464be212..99ba33ec 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ - [HTTP Headers](./examples/http_headers/) - [HTTP Response body](./examples/http_body/) - [HTTP Configuration](./examples/http_config/) +- [HTTP Parallel Call](./examples/http_parallel_call/) - [gRPC Auth (random)](./examples/grpc_auth_random/) ## Articles & blog posts from the community diff --git a/examples/http_parallel_call/Cargo.toml b/examples/http_parallel_call/Cargo.toml new file mode 100644 index 00000000..5328fcf6 --- /dev/null +++ b/examples/http_parallel_call/Cargo.toml @@ -0,0 +1,22 @@ +[package] +publish = false +name = "proxy-wasm-example-http-parallel-call" +version = "0.0.1" +authors = ["Zhuozhi Ji "] +description = "Proxy-Wasm plugin example: HTTP parallel call" +license = "Apache-2.0" +edition = "2018" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +log = "0.4" +proxy-wasm = { path = "../../" } + +[profile.release] +lto = true +opt-level = 3 +codegen-units = 1 +panic = "abort" +strip = "debuginfo" diff --git a/examples/http_parallel_call/README.md b/examples/http_parallel_call/README.md new file mode 100644 index 00000000..3ff62d5f --- /dev/null +++ b/examples/http_parallel_call/README.md @@ -0,0 +1,27 @@ +## Proxy-Wasm plugin example: HTTP parallel call + +Proxy-Wasm plugin that makes multiply HTTP callout and combine responses as final response . + +### Building + +```sh +$ cargo build --target wasm32-wasi --release +``` + +### Using in Envoy + +This example can be run with [`docker compose`](https://docs.docker.com/compose/install/) +and has a matching Envoy configuration. + +```sh +$ docker compose up +``` + +#### Access granted. + +Send HTTP request to `localhost:10000/headers`: + +```sh +$ curl localhost:10000/headers +Hello, World!\n +``` diff --git a/examples/http_parallel_call/docker-compose.yaml b/examples/http_parallel_call/docker-compose.yaml new file mode 100644 index 00000000..6a188511 --- /dev/null +++ b/examples/http_parallel_call/docker-compose.yaml @@ -0,0 +1,36 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +services: + envoy: + image: envoyproxy/envoy:v1.31-latest + hostname: envoy + ports: + - "10000:10000" + volumes: + - ./envoy.yaml:/etc/envoy/envoy.yaml + - ./target/wasm32-wasi/release:/etc/envoy/proxy-wasm-plugins + networks: + - envoymesh + depends_on: + - httpbin + httpbin: + image: mccutchen/go-httpbin + hostname: httpbin + ports: + - "8080:8080" + networks: + - envoymesh +networks: + envoymesh: {} diff --git a/examples/http_parallel_call/envoy.yaml b/examples/http_parallel_call/envoy.yaml new file mode 100644 index 00000000..61fdf8da --- /dev/null +++ b/examples/http_parallel_call/envoy.yaml @@ -0,0 +1,68 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +static_resources: + listeners: + address: + socket_address: + address: 0.0.0.0 + port_value: 10000 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http + codec_type: AUTO + route_config: + name: local_routes + virtual_hosts: + - name: local_service + domains: + - "*" + routes: + - match: + prefix: "/" + route: + cluster: httpbin + http_filters: + - name: envoy.filters.http.wasm + typed_config: + "@type": type.googleapis.com/udpa.type.v1.TypedStruct + type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm + value: + config: + name: "http_parallel_call" + vm_config: + runtime: "envoy.wasm.runtime.v8" + code: + local: + filename: "/etc/envoy/proxy-wasm-plugins/proxy_wasm_example_http_parallel_call.wasm" + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + clusters: + - name: httpbin + connect_timeout: 5s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: httpbin + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: httpbin + port_value: 8080 diff --git a/examples/http_parallel_call/src/lib.rs b/examples/http_parallel_call/src/lib.rs new file mode 100644 index 00000000..6616c59a --- /dev/null +++ b/examples/http_parallel_call/src/lib.rs @@ -0,0 +1,129 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use proxy_wasm::hostcalls; +use proxy_wasm::promise::Promise; +use proxy_wasm::traits::*; +use proxy_wasm::types::*; +use std::collections::HashMap; +use std::rc::Rc; +use std::time::Duration; + +proxy_wasm::main! {{ + proxy_wasm::set_log_level(LogLevel::Trace); + proxy_wasm::set_http_context(|_, _| -> Box { Box::new(HttpParallelCall::default()) }); +}} + +#[derive(Default)] +struct HttpParallelCall { + m: HashMap>>, +} + +impl HttpContext for HttpParallelCall { + fn on_http_request_headers(&mut self, _: usize, _: bool) -> Action { + // "Hello, " + let token1 = self + .dispatch_http_call( + "httpbin", + vec![ + (":method", "GET"), + (":path", "/base64/SGVsbG8sIAo="), + (":authority", "httpbin.org"), + ], + None, + vec![], + Duration::from_secs(1), + ) + .unwrap(); + + // "World!" + let token2 = self + .dispatch_http_call( + "httpbin", + vec![ + (":method", "GET"), + (":path", "/base64/V29ybGQhCg=="), + (":authority", "httpbin.org"), + ], + None, + vec![], + Duration::from_secs(1), + ) + .unwrap(); + + let promise1 = Promise::new(); + let promise2 = Promise::new(); + self.m.insert(token1, promise1.clone()); + self.m.insert(token2, promise2.clone()); + + Promise::all_of(vec![ + promise1 + .then(|(_, _, _body_size, _)| get_http_call_response_body_string(0, _body_size)) + .then(|body| body.unwrap_or_else(|| "".to_string())), + promise2 + .then(|(_, _, _body_size, _)| get_http_call_response_body_string(0, _body_size)) + .then(|body| body.unwrap_or_else(|| "".to_string())), + ]) + .then(|results| { + send_http_response( + 200, + vec![], + Some( + format!( + "{}{}\n", + results[0].strip_suffix("\n").unwrap(), + results[1].strip_suffix("\n").unwrap() + ) + .as_bytes(), + ), + ); + }); + + Action::Pause + } + + fn on_http_response_headers(&mut self, _: usize, _: bool) -> Action { + self.set_http_response_header("Powered-By", Some("proxy-wasm")); + Action::Continue + } +} + +impl Context for HttpParallelCall { + fn on_http_call_response( + &mut self, + _token_id: u32, + _num_headers: usize, + _body_size: usize, + _num_trailers: usize, + ) { + let promise = self.m.remove(&_token_id); + promise + .unwrap() + .fulfill((_token_id, _num_headers, _body_size, _num_trailers)); + } +} + +fn get_http_call_response_body_string(start: usize, max_size: usize) -> Option { + match hostcalls::get_buffer(BufferType::HttpCallResponseBody, start, max_size).unwrap() { + None => None, + Some(bytes) => { + let body_string = String::from_utf8(bytes.to_vec()).unwrap(); + Some(body_string) + } + } +} + +fn send_http_response(status_code: u32, headers: Vec<(&str, &str)>, body: Option<&[u8]>) { + hostcalls::send_http_response(status_code, headers, body).unwrap() +} diff --git a/src/lib.rs b/src/lib.rs index a8f42651..05d23579 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod hostcalls; +pub mod promise; pub mod traits; pub mod types; diff --git a/src/promise.rs b/src/promise.rs new file mode 100644 index 00000000..cf0f05d8 --- /dev/null +++ b/src/promise.rs @@ -0,0 +1,291 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cell::RefCell; +use std::rc::Rc; + +enum PromiseState { + Pending, + Fulfilled(T), + Rejected(String), +} + +pub struct Promise { + state: RefCell>, + then_callback: RefCell>>, + catch_callback: RefCell>>, +} + +impl Promise +where + T: 'static + Clone, +{ + pub fn new() -> Rc { + Rc::new(Self { + state: RefCell::new(PromiseState::Pending), + then_callback: RefCell::new(None), + catch_callback: RefCell::new(None), + }) + } + + pub fn fulfill(self: &Rc, value: T) { + *self.state.borrow_mut() = PromiseState::Fulfilled(value.clone()); + if let Some(callback) = self.then_callback.borrow_mut().take() { + callback(value); + } + } + + pub fn reject(self: &Rc, reason: String) { + *self.state.borrow_mut() = PromiseState::Rejected(reason.clone()); + if let Some(callback) = self.catch_callback.borrow_mut().take() { + callback(reason); + } + } + + pub fn then(self: &Rc, f: F) -> Rc> + where + F: FnOnce(T) -> R + 'static, + R: 'static + Clone, + { + let new_promise = Promise::new(); + let new_promise_clone = new_promise.clone(); + match &*self.state.borrow() { + PromiseState::Pending => { + *self.then_callback.borrow_mut() = Some(Box::new(move |value| { + let result = f(value.clone()); + new_promise_clone.fulfill(result); + })); + let new_promise_for_catch = new_promise.clone(); + *self.catch_callback.borrow_mut() = Some(Box::new(move |reason| { + new_promise_for_catch.reject(reason); + })); + } + PromiseState::Fulfilled(value) => { + let result = f(value.clone()); + new_promise.fulfill(result); + } + PromiseState::Rejected(reason) => new_promise.reject(reason.clone()), + } + new_promise + } + + pub fn catch(self: &Rc, f: F) -> Rc + where + F: FnOnce(String) + 'static, + { + match &*self.state.borrow() { + PromiseState::Pending => *self.catch_callback.borrow_mut() = Some(Box::new(f)), + PromiseState::Fulfilled(_) => {} + PromiseState::Rejected(reason) => f(reason.clone()), + } + self.clone() + } + + pub fn all_of(promises: Vec>) -> Rc>> { + let next_promise = Promise::new(); + let total = promises.len(); + let results = Rc::new(RefCell::new(vec![None; total])); + let remaining = Rc::new(RefCell::new(total)); + let rejected = Rc::new(RefCell::new(false)); + + for (i, promise) in promises.iter().enumerate() { + let next_promise_clone = next_promise.clone(); + let next_promise_clone_for_catch = next_promise.clone(); + let results_clone = results.clone(); + let remaining_clone = remaining.clone(); + let rejected_clone = rejected.clone(); + let rejected_clone_for_catch = rejected.clone(); + promise + .then(move |result| { + if *rejected_clone.borrow() { + return; + } + results_clone.borrow_mut()[i] = Some(result); + *remaining_clone.borrow_mut() -= 1; + + if *remaining_clone.borrow() == 0 { + let final_results: Vec = results_clone + .borrow_mut() + .iter_mut() + .map(|res| res.take().unwrap()) + .collect(); + next_promise_clone.fulfill(final_results); + } + }) + .catch(move |reason| { + if !*rejected_clone_for_catch.borrow() { + *rejected_clone_for_catch.borrow_mut() = true; + next_promise_clone_for_catch.reject(reason.clone()); + } + }); + } + next_promise + } + + pub fn any_of(promises: Vec>) -> Rc> { + let next_promise = Promise::new(); + let total = promises.len(); + let remaining = Rc::new(RefCell::new(total)); + let first_error = Rc::new(RefCell::new(None)); // 用来保存第一个错误 + + for promise in promises { + let next_promise_clone = next_promise.clone(); + let next_promise_clone_for_catch = next_promise.clone(); + let remaining_clone = remaining.clone(); + let remaining_clone_for_catch = remaining.clone(); + let first_error_clone = first_error.clone(); + + promise + .then(move |result| { + if *remaining_clone.borrow() > 0 { + next_promise_clone.fulfill(result); + *remaining_clone.borrow_mut() = 0; + } + }) + .catch(move |err| { + if first_error_clone.borrow().is_none() { + *first_error_clone.borrow_mut() = Some(err.clone()); + } + + *remaining_clone_for_catch.borrow_mut() -= 1; + + if *remaining_clone_for_catch.borrow() == 0 { + if let Some(first_err) = first_error_clone.borrow().clone() { + next_promise_clone_for_catch.reject(first_err); + } + } + }); + } + + next_promise + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_promise_new() { + let promise = Promise::::new(); + assert!(matches!(*promise.state.borrow(), PromiseState::Pending)); + assert!(promise.then_callback.borrow().is_none()); + assert!(promise.catch_callback.borrow().is_none()); + } + + #[test] + fn test_promise_fulfill() { + let promise = Promise::::new(); + let next_promise = promise.then(|result| { + assert_eq!(result, 42); + }); + + promise.fulfill(42); + } + + #[test] + fn test_promise_reject() { + let promise = Promise::::new(); + let next_promise = promise.catch(|err| { + assert_eq!(err, "Error"); + }); + + promise.reject("Error".to_string()); + } + + #[test] + fn test_promise_chain() { + let promise = Promise::::new(); + let next_promise = promise.then(|result| { + assert_eq!(result, 10); + 20 + }); + + next_promise.then(|result| { + assert_eq!(result, 20); + }); + + promise.fulfill(10); + } + + #[test] + fn test_all_of_success() { + let promise1 = Promise::::new(); + let promise2 = Promise::::new(); + + let all_promise = Promise::all_of(vec![promise1.clone(), promise2.clone()]); + + promise1.fulfill(42); + promise2.fulfill(100); + + all_promise + .then(|results| { + assert_eq!(results.len(), 2); + assert_eq!(results[0], 42); + assert_eq!(results[1], 100); + }) + .catch(|_err| { + panic!("Should not reach here"); + }); + } + + #[test] + fn test_all_of_failure() { + let promise1 = Promise::::new(); + let promise2 = Promise::::new(); + + let all_promise = Promise::all_of(vec![promise1.clone(), promise2.clone()]); + + promise1.reject("Error 1".to_string()); + promise2.reject("Error 2".to_string()); + + all_promise + .then(|_results| { + panic!("Should not reach here"); + }) + .catch(|err| { + assert_eq!(err, "Error 1"); + }); + } + + #[test] + fn test_all_of_mixed_results() { + let promise1 = Promise::::new(); + let promise2 = Promise::::new(); + + let all_promise = Promise::all_of(vec![promise1.clone(), promise2.clone()]); + + promise1.reject("Error".to_string()); + promise2.fulfill(100); + + all_promise + .then(|_| { + panic!("Should not reach here"); + }) + .catch(|reason| assert_eq!(reason, "Error".to_string())); + } + + #[test] + fn test_all_of_empty() { + let all_promise = Promise::::all_of(vec![]); + + all_promise + .then(|results| { + assert!(results.is_empty()); + }) + .catch(|_err| { + panic!("Should not reach here"); + }); + } +}