From 92be54ebfe4a2d19101141f55e94fc8e9588ff95 Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Mon, 6 Jan 2025 22:47:07 +0100 Subject: [PATCH] feat: Add AWS SQS support (#1554) * add sqs only vuild sqs first version of terraform move add docs typechange works wip runtime Use arm64 fix dockerfile use musl add dlq works first time working try again less memory clean up add env vars support send webhook fix build fix http fix terraform * format better * add webdriver config * ok * fix webdriver * beta * add docker command --- .envrc | 3 + .github/workflows/deploy_backend.yml | 39 ++++- Cargo.lock | 181 ++++++++++++++++++++++- Cargo.toml | 2 +- backend/Cargo.toml | 3 +- backend/backend_config.toml | 18 ++- backend/src/config.rs | 7 +- backend/src/http/v0/check_email/post.rs | 1 + backend/src/lib.rs | 2 +- backend/src/worker/do_work.rs | 22 ++- cli/Cargo.toml | 2 +- core/Cargo.toml | 2 +- core/src/smtp/headless.rs | 48 +++++-- core/src/smtp/mod.rs | 20 ++- core/src/smtp/outlook/headless.rs | 31 ++-- core/src/smtp/yahoo/headless.rs | 44 ++++-- core/src/util/input_output.rs | 16 +++ sqs/.gitignore | 2 + sqs/Cargo.toml | 19 +++ sqs/Dockerfile | 33 +++++ sqs/README.md | 7 + sqs/main.tf | 183 ++++++++++++++++++++++++ sqs/src/main.rs | 157 ++++++++++++++++++++ 23 files changed, 769 insertions(+), 73 deletions(-) create mode 100644 .envrc create mode 100644 sqs/.gitignore create mode 100644 sqs/Cargo.toml create mode 100644 sqs/Dockerfile create mode 100644 sqs/README.md create mode 100644 sqs/main.tf create mode 100644 sqs/src/main.rs diff --git a/.envrc b/.envrc new file mode 100644 index 000000000..01966e504 --- /dev/null +++ b/.envrc @@ -0,0 +1,3 @@ +# Config for direnv +# https://direnv.net/ +dotenv diff --git a/.github/workflows/deploy_backend.yml b/.github/workflows/deploy_backend.yml index 04ce00f57..cb0eaa7b7 100644 --- a/.github/workflows/deploy_backend.yml +++ b/.github/workflows/deploy_backend.yml @@ -6,20 +6,51 @@ on: - "v*.*.*" jobs: - docker-publish: + get-tag: runs-on: ubuntu-latest + outputs: + GITHUB_TAG: ${{ steps.vars.outputs.GITHUB_TAG }} steps: - uses: actions/checkout@master - name: Set GITHUB_TAG arg id: vars run: echo ::set-output name=GITHUB_TAG::${GITHUB_REF:10} # Remove /refs/head/ + + docker-publish: + runs-on: ubuntu-latest + needs: get-tag + steps: + - uses: actions/checkout@master - name: Print version - run: echo "Publishing reacherhq/backend:${{ steps.vars.outputs.GITHUB_TAG }}" - - name: Publish to Registry + run: echo "Publishing reacherhq/backend:${{ needs.get-tag.outputs.GITHUB_TAG }}" + - name: Publish to Docker Hub uses: elgohr/Publish-Docker-Github-Action@v5 with: name: reacherhq/backend dockerfile: backend/Dockerfile username: ${{ secrets.DOCKER_USERNAME }} password: ${{ secrets.DOCKER_PASSWORD }} - tags: "${{ steps.vars.outputs.GITHUB_TAG }},beta" + tags: "${{ needs.get-tag.outputs.GITHUB_TAG }},beta" + + ecr-publish: + runs-on: ubuntu-latest + needs: get-tag + steps: + - uses: actions/checkout@master + - name: Print version + run: echo "Publishing reacherhq/backend:${{ needs.get-tag.outputs.GITHUB_TAG }}" + - name: Login to ECR + id: ecr + uses: elgohr/ecr-login-action@v3 + with: + access_key: ${{ secrets.AWS_ACCESS_KEY }} + secret_access_key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + region: ${{ secrets.AWS_REGION }} + - name: Publish to AWS ECR + uses: elgohr/Publish-Docker-Github-Action@v5 + with: + name: reacherhq/sqs + username: ${{ steps.ecr.outputs.username }} + password: ${{ steps.ecr.outputs.password }} + registry: ${{ steps.ecr.outputs.registry }} + tags: "${{ needs.get-tag.outputs.GITHUB_TAG }},beta" diff --git a/Cargo.lock b/Cargo.lock index b2f4b1033..cb3933c4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -335,6 +335,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "async-task" version = "4.7.1" @@ -579,7 +601,7 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "check-if-email-exists" -version = "0.10.0" +version = "0.10.2" dependencies = [ "anyhow", "async-recursion", @@ -611,7 +633,7 @@ dependencies = [ [[package]] name = "check-if-email-exists-cli" -version = "0.10.0" +version = "0.10.2" dependencies = [ "anyhow", "check-if-email-exists", @@ -1732,6 +1754,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-serde" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f056c8559e3757392c8d091e796416e4649d8e49e88b8d76df6c002f05027fd" +dependencies = [ + "http 1.2.0", + "serde", +] + [[package]] name = "httparse" version = "1.9.5" @@ -2107,6 +2139,55 @@ dependencies = [ "serde", ] +[[package]] +name = "lambda_runtime" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed49669d6430292aead991e19bf13153135a884f916e68f32997c951af637ebe" +dependencies = [ + "async-stream", + "base64 0.22.1", + "bytes", + "futures", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "http-serde", + "hyper 1.5.2", + "hyper-util", + "lambda_runtime_api_client", + "pin-project", + "serde", + "serde_json", + "serde_path_to_error", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tracing", +] + +[[package]] +name = "lambda_runtime_api_client" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c90a10f094475a34a04da2be11686c4dcfe214d93413162db9ffdff3d3af293a" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.2", + "hyper-util", + "tokio", + "tower", + "tower-service", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lapin" version = "2.5.0" @@ -2248,6 +2329,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "md-5" version = "0.10.6" @@ -2972,7 +3062,7 @@ dependencies = [ [[package]] name = "reacher_backend" -version = "0.10.0" +version = "0.10.2" dependencies = [ "anyhow", "async-smtp", @@ -2981,6 +3071,7 @@ dependencies = [ "csv", "dotenv", "futures", + "http 1.2.0", "lapin", "openssl", "reqwest", @@ -3001,6 +3092,21 @@ dependencies = [ "warp", ] +[[package]] +name = "reacher_sqs" +version = "0.10.2" +dependencies = [ + "check-if-email-exists", + "lambda_runtime", + "reacher_backend", + "reqwest", + "serde", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "reactor-trait" version = "1.1.0" @@ -3029,8 +3135,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -3041,9 +3156,15 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -3522,6 +3643,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_spanned" version = "0.6.8" @@ -4264,6 +4395,27 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -4314,18 +4466,35 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 861920f11..04ce3c774 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,2 @@ [workspace] -members = ["backend", "cli", "core"] +members = ["backend", "cli", "core", "sqs"] diff --git a/backend/Cargo.toml b/backend/Cargo.toml index c2eaa105d..2e58fb01a 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "reacher_backend" -version = "0.10.0" +version = "0.10.2" edition = "2018" license = "AGPL-3.0" publish = false @@ -13,6 +13,7 @@ config = "0.14" csv = "1.3.0" dotenv = "0.15.0" futures = { version = "0.3.30" } +http = "1.2.0" lapin = { version = "2.3.1" } tokio-executor-trait = { version = "2.1.1" } tokio-reactor-trait = { version = "1.1.0" } diff --git a/backend/backend_config.toml b/backend/backend_config.toml index aee353a2f..f353a3638 100644 --- a/backend/backend_config.toml +++ b/backend/backend_config.toml @@ -33,11 +33,6 @@ hello_name = "localhost" # Env variable: RCH__FROM_EMAIL from_email = "hello@localhost" -# Address of the Chrome WebDriver server for headless email verifications. -# -# Env variable: RCH__WEBDRIVER_ADDR -webdriver_addr = "http://localhost:9515" - # Timeout for each SMTP connection, in seconds. Leaving it commented out will # not set a timeout, i.e. the connection will wait indefinitely. # @@ -47,7 +42,18 @@ webdriver_addr = "http://localhost:9515" # Optional Sentry DSN. If set, all errors will be sent to Sentry. # # Env variable: RCH__SENTRY_DSN -# sentry_dsn = "" +# sentry_dsn = "" + +# Address of the Chrome WebDriver server for headless email verifications. +# +# Env variable: RCH__WEBDRIVER_ADDR +webdriver_addr = "http://localhost:9515" + +[webdriver] +# Path to the Chrome binary. If not set, the default system Chrome will be used. +# +# Env variable: RCH__WEBDRIVER__BINARY +# binary = "/usr/bin/google-chrome" # Uncomment the lines below to route all SMTP verification requests # through a specified proxy. Note that the proxy must be a SOCKS5 proxy to work diff --git a/backend/src/config.rs b/backend/src/config.rs index a9455d738..89aee436e 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -21,7 +21,7 @@ use crate::worker::setup_rabbit_mq; use anyhow::{bail, Context}; use check_if_email_exists::{ CheckEmailInputProxy, GmailVerifMethod, HotmailB2BVerifMethod, HotmailB2CVerifMethod, - YahooVerifMethod, LOG_TARGET, + WebdriverConfig, YahooVerifMethod, LOG_TARGET, }; use config::Config; use lapin::Channel; @@ -39,6 +39,7 @@ pub struct BackendConfig { pub from_email: String, pub hello_name: String, pub webdriver_addr: String, + pub webdriver: WebdriverConfig, pub proxy: Option, /// Verification method configuration. @@ -88,6 +89,7 @@ impl BackendConfig { from_email: "".to_string(), hello_name: "".to_string(), webdriver_addr: "".to_string(), + webdriver: WebdriverConfig::default(), proxy: None, verif_method: VerifMethodConfig::default(), http_host: "127.0.0.1".to_string(), @@ -158,8 +160,7 @@ impl BackendConfig { Ok(()) } - /// Get all storages as a Vec. We don't really care about the keys in the - /// HashMap, except for deserialize purposes. + /// Get the Postgres connection pool, if the storage is Postgres. pub fn get_storage_adapter(&self) -> Arc { self.storage_adapter.clone() } diff --git a/backend/src/http/v0/check_email/post.rs b/backend/src/http/v0/check_email/post.rs index d9a39c060..4cb746a73 100644 --- a/backend/src/http/v0/check_email/post.rs +++ b/backend/src/http/v0/check_email/post.rs @@ -68,6 +68,7 @@ impl CheckEmailRequest { sentry_dsn: config.sentry_dsn.clone(), backend_name: config.backend_name.clone(), retries: 2, + webdriver_config: config.webdriver.clone(), ..Default::default() } } diff --git a/backend/src/lib.rs b/backend/src/lib.rs index a44ccc9d8..68e6021dc 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -16,7 +16,7 @@ pub mod config; pub mod http; -mod storage; +pub mod storage; pub mod throttle; pub mod worker; diff --git a/backend/src/worker/do_work.rs b/backend/src/worker/do_work.rs index bfe2c928c..194020820 100644 --- a/backend/src/worker/do_work.rs +++ b/backend/src/worker/do_work.rs @@ -21,9 +21,12 @@ use crate::worker::single_shot::send_single_shot_reply; use check_if_email_exists::{ check_email, CheckEmailInput, CheckEmailOutput, Reachable, LOG_TARGET, }; +use http::HeaderMap; use lapin::message::Delivery; use lapin::{options::*, Channel}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::convert::TryInto; use std::fmt::Debug; use std::sync::Arc; use thiserror::Error; @@ -38,6 +41,7 @@ pub struct CheckEmailTask { } #[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] pub enum CheckEmailJobId { /// Single-shot email verification, they won't have an actual job id. SingleShot, @@ -58,6 +62,8 @@ pub enum TaskError { Lapin(lapin::Error), #[error("Reqwest error during webhook: {0}")] Reqwest(reqwest::Error), + #[error("Error converting headers: {0}")] + Headers(#[from] http::Error), } impl TaskError { @@ -67,6 +73,7 @@ impl TaskError { Self::Throttle(_) => StatusCode::TOO_MANY_REQUESTS, Self::Lapin(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::Reqwest(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Headers(_) => StatusCode::INTERNAL_SERVER_ERROR, } } } @@ -100,6 +107,7 @@ pub struct TaskWebhook { #[derive(Debug, Deserialize, Clone, Serialize)] pub struct Webhook { pub url: String, + pub headers: HashMap, pub extra: Option, } @@ -116,7 +124,7 @@ pub(crate) async fn do_check_email_work( channel: Arc, config: Arc, ) -> Result<(), anyhow::Error> { - let worker_output = inner_check_email(task).await; + let worker_output = check_email_and_send_result(task).await; match (&worker_output, delivery.redelivered) { (Ok(output), false) if output.is_reachable == Reachable::Unknown => { @@ -167,7 +175,10 @@ pub(crate) async fn do_check_email_work( Ok(()) } -async fn inner_check_email(task: &CheckEmailTask) -> Result { +/// Checks the email and sends the result to the webhook. +pub async fn check_email_and_send_result( + task: &CheckEmailTask, +) -> Result { let output = check_email(&task.input).await; // Check if we have a webhook to send the output to. @@ -180,14 +191,13 @@ async fn inner_check_email(task: &CheckEmailTask) -> Result"] categories = ["email"] description = "Check if an email address exists without sending any email" diff --git a/core/src/smtp/headless.rs b/core/src/smtp/headless.rs index 21a37b647..1720bfaf8 100644 --- a/core/src/smtp/headless.rs +++ b/core/src/smtp/headless.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use crate::util::ser_with_display::ser_with_display; +use crate::{util::ser_with_display::ser_with_display, WebdriverConfig}; use fantoccini::{ error::{CmdError, NewSessionError}, Client, ClientBuilder, @@ -36,20 +36,40 @@ pub enum HeadlessError { NewSession(#[from] NewSessionError), } -pub async fn create_headless_client(webdriver: &str) -> Result { - // Running in a Docker container, I run into the following error: - // Failed to move to new namespace: PID namespaces supported, Network namespace supported, but failed: errno = Operation not permitted - // In searching around I found a few different workarounds: - // - Enable namespaces: https://github.com/jessfraz/dockerfiles/issues/65#issuecomment-266532289 - // - Run it with a custom seccomp: https://github.com/jessfraz/dockerfiles/issues/65#issuecomment-217214671 - // - Run with --no-sandbox: https://github.com/karma-runner/karma-chrome-launcher/issues/125#issuecomment-312668593 - // For now I went with the --no-sandbox. - // - // TODO Look into security implications... - let mut caps = Map::new(); - let opts = serde_json::json!({ - "args": ["--headless", "--disable-gpu", "--no-sandbox", "--disable-dev-shm-usage"], +pub async fn create_headless_client( + webdriver: &str, + webdriver_config: &WebdriverConfig, +) -> Result { + let mut opts = serde_json::json!({ + "args": [ + "--headless=new", "--disable-gpu", "--disable-dev-shm-usage", + // Running in a Docker container, I run into the following error: + // Failed to move to new namespace: PID namespaces supported, Network namespace supported, but failed: errno = Operation not permitted + // In searching around I found a few different workarounds: + // - Enable namespaces: https://github.com/jessfraz/dockerfiles/issues/65#issuecomment-266532289 + // - Run it with a custom seccomp: https://github.com/jessfraz/dockerfiles/issues/65#issuecomment-217214671 + // - Run with --no-sandbox: https://github.com/karma-runner/karma-chrome-launcher/issues/125#issuecomment-312668593 + // For now I went with the --no-sandbox. + // + // TODO Look into security implications... + "--no-sandbox", + // From https://github.com/chromium-for-lambda/chromium-binaries/tree/b23e11c2f2859b177fd08fe50a0826c17652d846?tab=readme-ov-file#installation-via-a-lambda-layer + "--use-gl=angle", "--use-angle=swiftshader", "--single-process", "--no-zygote", + // Disable anything that might consume memory + "--window-size=800x600", + "--disable-extensions", + "--disable-software-rasterizer", + "--disable-dev-shm-usage", + "--disable-background-networking", + "--js-flags=\"--max-old-space-size=256\"", + ] }); + + if let Some(binary) = &webdriver_config.binary { + opts["binary"] = serde_json::json!(binary); + } + + let mut caps = Map::new(); caps.insert("goog:chromeOptions".to_string(), opts); // Connect to WebDriver instance that is listening on `webdriver` diff --git a/core/src/smtp/mod.rs b/core/src/smtp/mod.rs index 6ee92f65d..b5ccbf4f0 100644 --- a/core/src/smtp/mod.rs +++ b/core/src/smtp/mod.rs @@ -100,9 +100,13 @@ pub async fn check_smtp( if is_hotmail_b2c(&host_str) { if let HotmailB2CVerifMethod::Headless = &input.hotmailb2c_verif_method { return ( - outlook::headless::check_password_recovery(&to_email_str, &input.webdriver_addr) - .await - .map_err(Into::into), + outlook::headless::check_password_recovery( + &to_email_str, + &input.webdriver_addr, + &input.webdriver_config, + ) + .await + .map_err(Into::into), SmtpDebug { verif_method: VerifMethod::Headless, }, @@ -133,9 +137,13 @@ pub async fn check_smtp( } YahooVerifMethod::Headless => { return ( - yahoo::check_headless(&to_email_str, &input.webdriver_addr) - .await - .map_err(Into::into), + yahoo::check_headless( + &to_email_str, + &input.webdriver_addr, + &input.webdriver_config, + ) + .await + .map_err(Into::into), SmtpDebug { verif_method: VerifMethod::Headless, }, diff --git a/core/src/smtp/outlook/headless.rs b/core/src/smtp/outlook/headless.rs index 5ad083aec..1a094beaa 100644 --- a/core/src/smtp/outlook/headless.rs +++ b/core/src/smtp/outlook/headless.rs @@ -24,7 +24,7 @@ use crate::{ headless::{create_headless_client, HeadlessError}, SmtpDetails, }, - LOG_TARGET, + WebdriverConfig, LOG_TARGET, }; /// Check if a Hotmail/Outlook email exists by connecting to the password @@ -34,6 +34,7 @@ use crate::{ pub async fn check_password_recovery( to_email: &str, webdriver: &str, + webdriver_config: &WebdriverConfig, ) -> Result { let to_email = to_email.to_string(); tracing::debug!( @@ -42,7 +43,7 @@ pub async fn check_password_recovery( "Using Hotmail password recovery in headless navigator" ); - let c = create_headless_client(webdriver).await?; + let c = create_headless_client(webdriver, webdriver_config).await?; // Navigate to Microsoft password recovery page. c.goto("https://account.live.com/password/reset").await?; @@ -127,6 +128,7 @@ pub async fn check_password_recovery( #[cfg(test)] mod tests { use super::check_password_recovery; + use crate::WebdriverConfig; use futures::future::join; // Ignoring this test as it requires a local process of WebDriver running on @@ -140,15 +142,23 @@ mod tests { // It should not error. for _ in 0..10 { // This email does not exist. - let res = check_password_recovery("test42134@hotmail.com", "http://localhost:9515") - .await - .unwrap(); + let res = check_password_recovery( + "test42134@hotmail.com", + "http://localhost:9515", + &WebdriverConfig::default(), + ) + .await + .unwrap(); assert!(!res.is_deliverable); // This email does exist. - let res = check_password_recovery("test@hotmail.com", "http://localhost:9515") - .await - .unwrap(); + let res = check_password_recovery( + "test@hotmail.com", + "http://localhost:9515", + &WebdriverConfig::default(), + ) + .await + .unwrap(); assert!(res.is_deliverable); } } @@ -160,9 +170,10 @@ mod tests { #[tokio::test] #[ignore = "Run a webdriver server locally to test this"] async fn test_parallel() { + let webdriver_config = WebdriverConfig::default(); // This email does not exist. - let f1 = check_password_recovery("foo@bar.baz", "http://localhost:9515"); - let f2 = check_password_recovery("foo@bar.baz", "http://localhost:9515"); + let f1 = check_password_recovery("foo@bar.baz", "http://localhost:9515", &webdriver_config); + let f2 = check_password_recovery("foo@bar.baz", "http://localhost:9515", &webdriver_config); let f = join(f1, f2).await; assert!(f.0.is_ok(), "{:?}", f); diff --git a/core/src/smtp/yahoo/headless.rs b/core/src/smtp/yahoo/headless.rs index c686c14b7..784a5d5d2 100644 --- a/core/src/smtp/yahoo/headless.rs +++ b/core/src/smtp/yahoo/headless.rs @@ -24,13 +24,18 @@ use futures::future::select_ok; use futures::{Future, TryFutureExt}; use crate::smtp::headless::{create_headless_client, HeadlessError}; +use crate::WebdriverConfig; use crate::{smtp::SmtpDetails, LOG_TARGET}; /// Check if a Hotmail/Outlook email exists by connecting to the password /// recovery page https://account.live.com/password/reset using a headless /// browser. Make sure you have a WebDriver server running locally before /// running this, or this will error. -pub async fn check_headless(to_email: &str, webdriver: &str) -> Result { +pub async fn check_headless( + to_email: &str, + webdriver: &str, + webdriver_config: &WebdriverConfig, +) -> Result { let mut attempts = 0; let max_attempts = 3; let mut last_error = None; @@ -44,7 +49,7 @@ pub async fn check_headless(to_email: &str, webdriver: &str) -> Result return Ok(result), Err(e) => { last_error = Some(e); @@ -61,8 +66,9 @@ pub async fn check_headless(to_email: &str, webdriver: &str) -> Result Result { - let c = create_headless_client(webdriver).await?; + let c = create_headless_client(webdriver, webdriver_config).await?; // Navigate to Microsoft password recovery page. c.goto("https://login.yahoo.com/forgot").await?; @@ -132,7 +138,7 @@ async fn check_headless_inner( #[cfg(test)] mod tests { - use crate::initialize_crypto_provider; + use crate::{initialize_crypto_provider, WebdriverConfig}; use super::check_headless; @@ -147,22 +153,34 @@ mod tests { // Run 5 headless sessions with the below dummy emails. for _ in 0..5 { // Email does not exist. - let res = check_headless("test42134@yahoo.com", "http://localhost:9515") - .await - .unwrap(); + let res = check_headless( + "test42134@yahoo.com", + "http://localhost:9515", + &WebdriverConfig::default(), + ) + .await + .unwrap(); assert!(!res.is_deliverable); // Disabled email. - let res = check_headless("amaury@yahoo.com", "http://localhost:9515") - .await - .unwrap(); + let res = check_headless( + "amaury@yahoo.com", + "http://localhost:9515", + &WebdriverConfig::default(), + ) + .await + .unwrap(); assert!(!res.is_deliverable); assert!(res.is_disabled); // OK email. - let res = check_headless("test2@yahoo.com", "http://localhost:9515") - .await - .unwrap(); + let res = check_headless( + "test2@yahoo.com", + "http://localhost:9515", + &WebdriverConfig::default(), + ) + .await + .unwrap(); assert!(res.is_deliverable); assert!(!res.is_disabled); } diff --git a/core/src/util/input_output.rs b/core/src/util/input_output.rs index b255eb5b3..0190d5ca6 100644 --- a/core/src/util/input_output.rs +++ b/core/src/util/input_output.rs @@ -268,6 +268,10 @@ pub struct CheckEmailInput { /// /// Defaults to http://localhost:9515. pub webdriver_addr: String, + /// The WebDriver configuration to use for headless verifications. + /// + /// Defaults to the default WebdriverConfig. + pub webdriver_config: WebdriverConfig, /// Identifier for the service currently running Reacher. We recommend /// setting this to an unique identifier of the server where Reacher is /// installed on. @@ -297,6 +301,7 @@ impl Default for CheckEmailInput { haveibeenpwned_api_key: None, retries: 1, webdriver_addr: "http://localhost:9515".into(), + webdriver_config: WebdriverConfig::default(), backend_name: "backend-dev".into(), sentry_dsn: None, } @@ -439,6 +444,17 @@ impl Serialize for CheckEmailOutput { } } +#[derive(Builder, Clone, Debug, Deserialize, Serialize)] +pub struct WebdriverConfig { + pub binary: Option, +} + +impl Default for WebdriverConfig { + fn default() -> Self { + WebdriverConfig { binary: None } + } +} + #[cfg(test)] mod tests { use super::{CheckEmailOutput, DebugDetails}; diff --git a/sqs/.gitignore b/sqs/.gitignore new file mode 100644 index 000000000..2bc392490 --- /dev/null +++ b/sqs/.gitignore @@ -0,0 +1,2 @@ +terraform* +.terraform* \ No newline at end of file diff --git a/sqs/Cargo.toml b/sqs/Cargo.toml new file mode 100644 index 000000000..40f2d0949 --- /dev/null +++ b/sqs/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "reacher_sqs" +version = "0.10.2" +edition = "2018" +license = "AGPL-3.0" +publish = false + +[dependencies] +check-if-email-exists = { path = "../core", features = ["sentry"] } +lambda_runtime = "0.13" +reacher_backend = { path = "../backend" } +reqwest = { version = "0.12.5", default-features = false, features = [ + "rustls-tls", +] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1.40", features = ["macros"] } +tracing = "0.1.40" +tracing-subscriber = { version = "0.3.18", features = ["json"] } diff --git a/sqs/Dockerfile b/sqs/Dockerfile new file mode 100644 index 000000000..bb2acc916 --- /dev/null +++ b/sqs/Dockerfile @@ -0,0 +1,33 @@ +FROM messense/rust-musl-cross:x86_64-musl AS builder + +# Set the working directory +WORKDIR /usr/src/app + +# Copy the Rust project files +COPY . . + +# Build the release binary for the musl target +RUN cargo build --release --bin reacher_sqs --target x86_64-unknown-linux-musl + +# Install dependencies for ChromeDriver +FROM public.ecr.aws/lambda/provided:al2 AS runtime + +# Install dependencies +# From: https://github.com/umihico/docker-selenium-lambda/blob/73b511330db1d4d160264b00e0f25521de6b0615/Dockerfile +RUN yum install -y unzip && \ + curl -Lo "/tmp/chromedriver-linux64.zip" "https://storage.googleapis.com/chrome-for-testing-public/131.0.6778.204/linux64/chromedriver-linux64.zip" && \ + curl -Lo "/tmp/chrome-linux64.zip" "https://storage.googleapis.com/chrome-for-testing-public/131.0.6778.204/linux64/chrome-linux64.zip" && \ + unzip /tmp/chromedriver-linux64.zip -d /opt/ && \ + unzip /tmp/chrome-linux64.zip -d /opt/ +RUN yum install -y atk cups-libs gtk3 libXcomposite alsa-lib \ + libXcursor libXdamage libXext libXi libXrandr libXScrnSaver \ + libXtst pango at-spi2-atk libXt xorg-x11-server-Xvfb \ + xorg-x11-xauth dbus-glib dbus-glib-devel nss mesa-libgbm + +# Copy the statically linked binary to the runtime's expected location +COPY --from=builder /usr/src/app/target/x86_64-unknown-linux-musl/release/reacher_sqs /var/runtime/bootstrap +# The config file must be as ./backend_config.toml +COPY --from=builder /usr/src/app/backend/backend_config.toml . + +# Set the Lambda handler (entry point) +CMD ["/var/runtime/bootstrap"] diff --git a/sqs/README.md b/sqs/README.md new file mode 100644 index 000000000..98d24ad9e --- /dev/null +++ b/sqs/README.md @@ -0,0 +1,7 @@ +# SQS + +## Build Docker + +```bash +docker buildx build --platform linux/amd64 -t 430118836964.dkr.ecr.eu-west-3.amazonaws.com/reacherhq/sqs:$TAG -f sqs/Dockerfile . +``` diff --git a/sqs/main.tf b/sqs/main.tf new file mode 100644 index 000000000..ca05084d0 --- /dev/null +++ b/sqs/main.tf @@ -0,0 +1,183 @@ +provider "aws" { + region = var.aws_region +} + +# Define variables for reuse +variable "aws_region" { + default = "eu-west-3" +} + +variable "queue_name" { + default = "check-email-queue" +} + +variable "dlq_name" { + default = "check-email-dlq" +} + +variable "lambda_name" { + default = "lambda-task-check-email" +} + +variable "repository_name" { + default = "reacherhq/sqs" +} + +# Environment variables. Set as TF_VAR_{name} before running `terraform apply`. +variable "proxy_host" {} +variable "proxy_port" {} +variable "proxy_username" {} +variable "proxy_password" {} +variable "from_email" {} +variable "hello_name" {} + +# Create an SQS Queue +resource "aws_sqs_queue" "check_email_queue" { + name = var.queue_name + + redrive_policy = jsonencode({ + deadLetterTargetArn = aws_sqs_queue.check_email_dlq.arn + maxReceiveCount = 3 + }) + + tags = { + Environment = "Production" + ManagedBy = "Terraform" + } +} + +# Create a Dead Letter Queue (DLQ) +resource "aws_sqs_queue" "check_email_dlq" { + name = var.dlq_name + + tags = { + Environment = "Production" + ManagedBy = "Terraform" + } +} + +# IAM Role for Lambda execution +resource "aws_iam_role" "lambda_execution_role" { + name = "lambda_execution_role" + + assume_role_policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + Action = "sts:AssumeRole", + Principal = { + Service = "lambda.amazonaws.com" + }, + Effect = "Allow" + } + ] + }) + + tags = { + Environment = "Production" + ManagedBy = "Terraform" + } +} + +# IAM Policy for Lambda +resource "aws_iam_policy" "lambda_policy" { + name = "lambda_sqs_cloudwatch_policy" + description = "IAM policy for Lambda SQS and CloudWatch" + + policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + Effect = "Allow", + Action = [ + "sqs:ReceiveMessage", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes" + ], + Resource = aws_sqs_queue.check_email_queue.arn + }, + { + Effect = "Allow", + Action = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ], + Resource = "arn:aws:logs:*:*:*" + } + ] + }) + + tags = { + Environment = "Production" + ManagedBy = "Terraform" + } +} + +# Attach IAM policy to Lambda role +resource "aws_iam_role_policy_attachment" "attach_lambda_policy" { + role = aws_iam_role.lambda_execution_role.name + policy_arn = aws_iam_policy.lambda_policy.arn +} + +# Lambda Function +resource "aws_lambda_function" "lambda_task_check_email" { + function_name = var.lambda_name + role = aws_iam_role.lambda_execution_role.arn + + # ECR repository image + package_type = "Image" + image_uri = "${aws_ecr_repository.lambda_ecr_repo.repository_url}:beta" + + memory_size = 1024 + + environment { + variables = { + RUST_LOG = "debug" + RCH__PROXY__HOST = var.proxy_host + RCH__PROXY__PORT = var.proxy_port + RCH__PROXY__USERNAME = var.proxy_username + RCH__PROXY__PASSWORD = var.proxy_password + RCH__FROM_EMAIL = var.from_email + RCH__HELLO_NAME = var.hello_name + RCH__WEBDRIVER__BINARY = "/opt/chrome-linux64/chrome" + } + } + + timeout = 120 # Timeout set to 2 minutes, which corresponds to the max time one email verification should run, plus buffer. + + tags = { + Environment = "Production" + ManagedBy = "Terraform" + } +} + +# Connect SQS Queue to Lambda as event source +resource "aws_lambda_event_source_mapping" "sqs_trigger" { + event_source_arn = aws_sqs_queue.check_email_queue.arn + function_name = aws_lambda_function.lambda_task_check_email.arn + + batch_size = 1 +} + +# ECR Repository for Lambda image +resource "aws_ecr_repository" "lambda_ecr_repo" { + name = var.repository_name + + tags = { + Environment = "Production" + ManagedBy = "Terraform" + } +} + +# Output SQS Queue URL +output "queue_url" { + value = aws_sqs_queue.check_email_queue.url + description = "The URL of the SQS queue." +} + +# Output DLQ URL +output "dlq_url" { + value = aws_sqs_queue.check_email_dlq.url + description = "The URL of the Dead Letter Queue." +} diff --git a/sqs/src/main.rs b/sqs/src/main.rs new file mode 100644 index 000000000..4e3e2095d --- /dev/null +++ b/sqs/src/main.rs @@ -0,0 +1,157 @@ +// check-if-email-exists +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use check_if_email_exists::CheckEmailOutput; +use lambda_runtime::{service_fn, Error, LambdaEvent}; +use reacher_backend::config::{load_config, BackendConfig}; +use reacher_backend::http::CheckEmailRequest; +use reacher_backend::storage::commercial_license_trial::send_to_reacher; +use reacher_backend::worker::do_work::{ + check_email_and_send_result, CheckEmailJobId, CheckEmailTask, TaskWebhook, +}; +use serde::Deserialize; +use std::process::Command; +use std::sync::Arc; +use tracing::{debug, info}; +use tracing_subscriber::EnvFilter; + +const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); + +// Too bad aws_sdk_sqs::types::Message is not Deserialize, now we need to +// create our own struct to deserialize the message. +#[derive(Debug, Deserialize)] +struct SQSMessage { + body: String, +} + +/// The payload of the SQS event that's passed into the lambda. +#[derive(Debug, Deserialize)] +struct SQSPayload { + #[serde(rename = "Records")] + records: Vec, +} + +/// This is like CheckEmailTask, but where the input is a CheckEmailRequest +/// instead of a CheckEmailInput. We can transform the CheckEmailRequest into +/// a CheckEmailInput by calling to_check_email_input. +#[derive(Debug, Deserialize)] +struct CheckEmailPartialTask { + input: CheckEmailRequest, + job_id: CheckEmailJobId, + webhook: Option, +} + +impl CheckEmailPartialTask { + fn into_check_email_task(self, backend_config: Arc) -> CheckEmailTask { + CheckEmailTask { + input: self.input.to_check_email_input(backend_config), + job_id: self.job_id, + webhook: self.webhook, + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + // Setting a filter based on the value of the `RUST_LOG` environment + .with_env_filter(EnvFilter::from_default_env()) + .json() + // this needs to be set to remove duplicated information in the log. + .with_current_span(false) + // this needs to be set to false, otherwise ANSI color codes will + // show up in a confusing manner in CloudWatch logs. + .with_ansi(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + // remove the name of the function from every log entry + .with_target(false) + .init(); + info!(version=?CARGO_PKG_VERSION, "Starting Reacher SQS lambda."); + + run_and_wait_chromedriver().await?; + + lambda_runtime::run(service_fn(handler)).await?; + Ok(()) +} + +async fn handler(event: LambdaEvent) -> Result { + let (request, _context) = event.into_parts(); + // Since we're only fetching a single message, we can safely unwrap here. + let message = request.records.first().expect("No messages in the event"); + let task: CheckEmailPartialTask = serde_json::from_str(&message.body)?; + info!(email = ?task.input.to_email, "Processing task"); + + let backend_config = Arc::new(load_config().await?); + debug!("{:#?}", backend_config); + + let task = &task.into_check_email_task(backend_config.clone()); + + let worker_output = check_email_and_send_result(&task).await; + match worker_output.as_ref() { + Ok(output) => { + info!(email = ?output.input, is_reachable = ?output.is_reachable, "Task completed"); + } + Err(e) => { + info!(email = ?task.input.to_email, err = ?e, "Task failed"); + } + } + + // TODO: + // - Refactor storing the result and sending to Reacher, it's duplicated + // code from the backend. + // - Add throttling, again using backend code. + + // Store the result. + let storage = backend_config.get_storage_adapter(); + storage + .store(task, &worker_output, storage.get_extra()) + .await?; + + // If we're in the Commercial License Trial, we also store the + // result by sending it to back to Reacher. + send_to_reacher(backend_config, &task.input.to_email, &worker_output).await?; + + Ok(worker_output?) +} + +async fn run_and_wait_chromedriver() -> Result<(), Error> { + Command::new("/opt/chromedriver-linux64/chromedriver") + .arg("--port=9515") + .spawn()?; + + // Wait until the chromedriver is ready. + let mut attempts = 0; + let max_attempts = 10; + let delay = std::time::Duration::from_secs(1); + + while attempts < max_attempts { + if let Ok(output) = reqwest::get("http://localhost:9515/status").await { + if output.status().is_success() { + info!("Chromedriver is ready."); + break; + } + } + attempts += 1; + tokio::time::sleep(delay).await; + } + + if attempts == max_attempts { + return Err(Error::from("Chromedriver did not start in time")); + } + + Ok(()) +}