diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 5212e7f1..5479b088 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -259,3 +259,15 @@ jobs: flyctl deploy --remote-only env: FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} + + deploy-rama-echo: + runs-on: ubuntu-latest + needs: deploy-rama-fp-docker + steps: + - uses: actions/checkout@v4 + - uses: superfly/flyctl-actions/setup-flyctl@master + - run: | + cd rama-fp/infra/deployments/echo + flyctl deploy --remote-only + env: + FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} diff --git a/Cargo.lock b/Cargo.lock index 995d13bd..88d1b318 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1151,6 +1151,7 @@ dependencies = [ "rama", "serde", "serde_json", + "serde_urlencoded", "tokio", "tracing", "tracing-subscriber", diff --git a/rama-fp/Cargo.toml b/rama-fp/Cargo.toml index ac25588e..dcc3f2d8 100644 --- a/rama-fp/Cargo.toml +++ b/rama-fp/Cargo.toml @@ -18,6 +18,7 @@ clap = { version = "4.4", features = ["derive"] } rama = { version = "0.2", path = "..", features = ["full"] } serde = "1.0" serde_json = "1.0" +serde_urlencoded = "0.7" tokio = { version = "1.35", features = ["rt-multi-thread", "macros"] } tracing = "0.1" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } diff --git a/rama-fp/README.md b/rama-fp/README.md index 15da4a6a..3e8109dd 100644 --- a/rama-fp/README.md +++ b/rama-fp/README.md @@ -45,6 +45,12 @@ Also hosted (via ) as http/1.1 only: - - +Finally you can also use the Rama FP Service as an echo service for any +method, path, query, body, and so on: + +- +- + Available at Docker Hub (latest main branch commit): - diff --git a/rama-fp/infra/deployments/echo/fly.toml b/rama-fp/infra/deployments/echo/fly.toml new file mode 100644 index 00000000..2220b6f0 --- /dev/null +++ b/rama-fp/infra/deployments/echo/fly.toml @@ -0,0 +1,37 @@ +app = 'rama-echo' +primary_region = 'lhr' + +[build] +image = "glendc/rama-fp:latest" + +[experimental] +cmd = ["./rama-fp", "-i", "0.0.0.0", "-p", "8080", "--http-version", "auto", "echo"] + +[[services]] +internal_port = 8080 +protocol = "tcp" +force_https = false +auto_stop_machines = true +auto_start_machines = true +min_machines_running = 1 +processes = ['app'] + +[[services.ports]] +port = "80" + +[[services]] +internal_port = 8443 +protocol = "tcp" +force_https = false +auto_stop_machines = true +auto_start_machines = true +min_machines_running = 1 +processes = ['app'] + +[[services.ports]] +port = "443" + +[[vm]] +memory = '256mb' +cpu_kind = 'shared' +cpus = 1 diff --git a/rama-fp/src/main.rs b/rama-fp/src/main.rs index 709e0287..3b21810b 100644 --- a/rama-fp/src/main.rs +++ b/rama-fp/src/main.rs @@ -1,4 +1,4 @@ -use clap::Parser; +use clap::{Parser, Subcommand}; pub mod service; @@ -21,17 +21,45 @@ struct Cli { /// http version to serve FP Service from #[arg(long, default_value = "auto")] http_version: String, + + #[command(subcommand)] + command: Option, +} + +#[derive(Debug, Subcommand, Default)] +enum Commands { + /// Run the regular FP Server + #[default] + Run, + + /// Run an echo server + Echo, } #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Cli::parse(); - service::run(service::Config { - interface: args.interface, - port: args.port, - secure_port: args.secure_port, - http_version: args.http_version, - }) - .await + match args.command.unwrap_or_default() { + Commands::Run => { + service::run(service::Config { + interface: args.interface, + port: args.port, + secure_port: args.secure_port, + http_version: args.http_version, + }) + .await?; + } + Commands::Echo => { + service::echo(service::Config { + interface: args.interface, + port: args.port, + secure_port: args.secure_port, + http_version: args.http_version, + }) + .await?; + } + } + + Ok(()) } diff --git a/rama-fp/src/service/endpoints.rs b/rama-fp/src/service/endpoints.rs index 01e06020..4e92e888 100644 --- a/rama-fp/src/service/endpoints.rs +++ b/rama-fp/src/service/endpoints.rs @@ -1,3 +1,5 @@ +use std::{collections::HashMap, ops::Deref}; + use super::{ data::{ get_http_info, get_request_info, get_tls_info, DataSource, FetchMode, Initiator, @@ -7,11 +9,13 @@ use super::{ }; use rama::{ http::{ + dep::http_body_util::BodyExt, response::Json, service::web::extract::{self, FromRequestParts, Path}, Body, IntoResponse, Request, Response, StatusCode, }, service::Context, + stream::SocketInfo, }; use serde::Deserialize; use serde_json::json; @@ -339,6 +343,41 @@ pub async fn get_assets_script() -> Response { .expect("build js response") } +//------------------------------------------ +// endpoints: echo +//------------------------------------------ + +pub async fn echo(ctx: Context, req: Request) -> Json { + let http_info: super::data::HttpInfo = get_http_info(&req); + + let query_params = req.uri().query().and_then(|q| serde_urlencoded::from_str::>(q).ok()); + + let (parts, body) = req.into_parts(); + + let tls_info: Option = get_tls_info(&ctx); + + let request_info = get_request_info( + FetchMode::SameOrigin, + ResourceType::Document, + Initiator::Navigator, + &ctx, + &parts, + ) + .await; + + Json(json!({ + "version": request_info.version, + "scheme": request_info.scheme, + "method": request_info.method, + "path": request_info.path, + "ip": ctx.get::().unwrap().peer_addr(), + "headers": http_info.headers, + "parsedQueryParams": query_params, + "parsedBody": String::from_utf8_lossy(body.collect().await.unwrap().to_bytes().deref()), + "tls": tls_info, + })) +} + //------------------------------------------ // render utilities //------------------------------------------ diff --git a/rama-fp/src/service/mod.rs b/rama-fp/src/service/mod.rs index c0269bbf..fa790546 100644 --- a/rama-fp/src/service/mod.rs +++ b/rama-fp/src/service/mod.rs @@ -304,6 +304,185 @@ pub async fn run(cfg: Config) -> anyhow::Result<()> { Ok(()) } +pub async fn echo(cfg: Config) -> anyhow::Result<()> { + tracing_subscriber::registry() + .with(fmt::layer()) + .with( + EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(), + ) + .init(); + + let graceful = rama::graceful::Shutdown::default(); + + let acme_data = if let Ok(raw_acme_data) = std::env::var("RAMA_FP_ACME_DATA") { + let acme_data: Vec<_> = raw_acme_data + .split(';') + .map(|s| { + let mut iter = s.trim().splitn(2, ','); + let key = iter.next().expect("acme data key"); + let value = iter.next().expect("acme data value"); + (key.to_owned(), value.to_owned()) + }) + .collect(); + ACMEData::with_challenges(acme_data) + } else { + ACMEData::default() + }; + + let http_address = format!("{}:{}", cfg.interface, cfg.port); + let https_address = format!("{}:{}", cfg.interface, cfg.secure_port); + + graceful.spawn_task_fn(|guard| async move { + let http_service = ServiceBuilder::new() + .layer(TraceLayer::new_for_http()) + .layer(CompressionLayer::new()) + .layer(CatchPanicLayer::new()) + .service( + WebService::default() + // ACME + .get( + "/.well-known/acme-challenge/:token", + endpoints::get_acme_challenge, + ) + // Echo + .not_found(endpoints::echo) + ); + + let tcp_service_builder = ServiceBuilder::new() + .map_result(|result| { + if let Err(err) = result { + tracing::warn!(error = %err, "rama service failed"); + } + Ok::<_, Infallible>(()) + }) + .layer(TimeoutLayer::new(Duration::from_secs(16))) + // Why the below layer makes it no longer cloneable?!?! + .layer(LimitLayer::new(ConcurrentPolicy::with_backoff( + 2048, + ExponentialBackoff::default(), + ))); + + // also spawn a TLS listener if tls_cert_dir is set + if let Ok(tls_cert_pem_raw) = std::env::var("RAMA_FP_TLS_CRT") { + let tls_key_pem_raw = std::env::var("RAMA_FP_TLS_KEY").expect("RAMA_FP_TLS_KEY"); + + let tls_listener = TcpListener::build_with_state(State::new(acme_data.clone())) + .bind(&https_address) + .await + .expect("bind TLS Listener"); + + let http_service = http_service.clone(); + + // create tls service builder + let server_config = + get_server_config(tls_cert_pem_raw, tls_key_pem_raw, cfg.http_version.as_str()) + .await + .expect("read rama-fp TLS server config"); + let tls_service_builder = + tcp_service_builder + .clone() + .layer(TlsAcceptorLayer::with_client_config_handler( + server_config, + TlsClientConfigHandler::default().store_client_hello(), + )); + + let http_version = cfg.http_version.clone(); + guard.spawn_task_fn(|guard| async move { + match http_version.as_str() { + "" | "auto" => { + tracing::info!("FP Secure Service (auto) listening on: {https_address}"); + tls_listener + .serve_graceful( + guard.clone(), + tls_service_builder.service( + HttpServer::auto(Executor::graceful(guard)) + .service(http_service), + ), + ) + .await; + } + "h1" | "http1" | "http/1" | "http/1.0" | "http/1.1" => { + tracing::info!( + "FP Secure Service (http/1.1) listening on: {https_address}" + ); + tls_listener + .serve_graceful( + guard, + tls_service_builder + .service(HttpServer::http1().service(http_service)), + ) + .await; + } + "h2" | "http2" | "http/2" | "http/2.0" => { + tracing::info!("FP Secure Service (h2) listening on: {https_address}"); + tls_listener + .serve_graceful( + guard.clone(), + tls_service_builder.service( + HttpServer::h2(Executor::graceful(guard)).service(http_service), + ), + ) + .await; + } + _version => { + panic!("unsupported http version: {http_version}") + } + } + }); + } + + let tcp_listener = TcpListener::build_with_state(State::new(acme_data)) + .bind(&http_address) + .await + .expect("bind TCP Listener"); + + match cfg.http_version.as_str() { + "" | "auto" => { + tracing::info!("FP Echo Service (auto) listening on: {http_address}"); + tcp_listener + .serve_graceful( + guard.clone(), + tcp_service_builder.service( + HttpServer::auto(Executor::graceful(guard)).service(http_service), + ), + ) + .await; + } + "h1" | "http1" | "http/1" | "http/1.0" | "http/1.1" => { + tracing::info!("FP Echo Service (http/1.1) listening on: {http_address}"); + tcp_listener + .serve_graceful( + guard, + tcp_service_builder.service(HttpServer::http1().service(http_service)), + ) + .await; + } + "h2" | "http2" | "http/2" | "http/2.0" => { + tracing::info!("FP Echo Service (h2) listening on: {http_address}"); + tcp_listener + .serve_graceful( + guard.clone(), + tcp_service_builder.service( + HttpServer::h2(Executor::graceful(guard)).service(http_service), + ), + ) + .await; + } + _version => { + panic!("unsupported http version: {}", cfg.http_version) + } + } + }); + + graceful + .shutdown_with_limit(Duration::from_secs(30)) + .await?; + + Ok(()) +} + async fn get_server_config( tls_cert_pem_raw: String, tls_key_pem_raw: String,