From 7cf8282f5da1a1f5f5ec9456693d6eb2c535b6e4 Mon Sep 17 00:00:00 2001 From: Dimitri Date: Thu, 19 Sep 2024 12:44:44 +0700 Subject: [PATCH] Prevent duplicates by checking if it is a valid reply --- Cargo.lock | 2 +- packages/relayer/Cargo.toml | 3 +- packages/relayer/src/database.rs | 48 ++++++++++++ packages/relayer/src/modules/mail.rs | 74 ++++++++++++++++--- .../src/modules/web_server/rest_api.rs | 41 ++-------- 5 files changed, 120 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 212f868..b060b89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4141,7 +4141,6 @@ dependencies = [ [[package]] name = "relayer-utils" version = "0.3.7" -source = "git+https://github.com/zkemail/relayer-utils.git?rev=94d78d6#94d78d67862b6d6c15bebac66d184c7557f6aff5" dependencies = [ "anyhow", "base64 0.21.7", @@ -4153,6 +4152,7 @@ dependencies = [ "hmac-sha256", "itertools 0.10.5", "lazy_static", + "mailparse", "neon", "num-bigint", "num-traits", diff --git a/packages/relayer/Cargo.toml b/packages/relayer/Cargo.toml index 2240108..30d0bb2 100644 --- a/packages/relayer/Cargo.toml +++ b/packages/relayer/Cargo.toml @@ -25,7 +25,8 @@ tiny_http = "0.12.0" lettre = { version = "0.10.4", features = ["tokio1", "tokio1-native-tls"] } ethers = { version = "2.0.10", features = ["abigen"] } # relayer-utils = { version = "0.3.7", git = "https://github.com/zkemail/relayer-utils.git" } -relayer-utils = { rev = "94d78d6", git = "https://github.com/zkemail/relayer-utils.git" } +# relayer-utils = { rev = "94d78d6", git = "https://github.com/zkemail/relayer-utils.git" } +relayer-utils = { path = "../../../relayer-utils" } futures = "0.3.28" sqlx = { version = "=0.7.3", features = ["postgres", "runtime-tokio"] } regex = "1.10.2" diff --git a/packages/relayer/src/database.rs b/packages/relayer/src/database.rs index 5998413..93de8ee 100644 --- a/packages/relayer/src/database.rs +++ b/packages/relayer/src/database.rs @@ -84,6 +84,17 @@ impl Database { ) .execute(&self.db) .await?; + + sqlx::query( + "CREATE TABLE IF NOT EXISTS expected_replies ( + message_id VARCHAR(255) PRIMARY KEY, + request_id VARCHAR(255), + has_reply BOOLEAN DEFAULT FALSE, + created_at TIMESTAMP DEFAULT (NOW() AT TIME ZONE 'UTC') + );", + ) + .execute(&self.db) + .await?; Ok(()) } @@ -392,4 +403,41 @@ impl Database { info!(LOG, "Request inserted with request_id: {}", request_id); Ok(()) } + + pub(crate) async fn add_expected_reply( + &self, + message_id: &str, + request_id: Option, + ) -> Result<(), DatabaseError> { + let query = " + INSERT INTO expected_replies (message_id, request_id) + VALUES ($1, $2); + "; + sqlx::query(query) + .bind(message_id) + .bind(request_id) + .execute(&self.db) + .await + .map_err(|e| DatabaseError::new("Failed to insert expected_reply", e))?; + Ok(()) + } + + // Checks if the given message_id corresponds to a valid reply. + // This function updates the `has_reply` field to true if the message_id exists and hasn't been replied to yet. + // Returns true if the update was successful (i.e., a valid reply was recorded), false otherwise, + // also if no record exists to be updated. + pub(crate) async fn is_valid_reply(&self, message_id: &str) -> Result { + let query = " + UPDATE expected_replies + SET has_reply = true + WHERE message_id = $1 AND has_reply = false + RETURNING *; + "; + let result = sqlx::query(query) + .bind(message_id) + .execute(&self.db) + .await + .map_err(|e| DatabaseError::new("Failed to validate reply", e))?; + Ok(result.rows_affected() > 0) + } } diff --git a/packages/relayer/src/modules/mail.rs b/packages/relayer/src/modules/mail.rs index c90b6d8..d0fd85f 100644 --- a/packages/relayer/src/modules/mail.rs +++ b/packages/relayer/src/modules/mail.rs @@ -129,7 +129,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError> body_attachments: None, }; - send_email(email).await?; + send_email(email, Some(ExpectsReply::new(request_id))).await?; } EmailAuthEvent::Error { email_addr, @@ -161,7 +161,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError> body_attachments: None, }; - send_email(email).await?; + send_email(email, None).await?; } EmailAuthEvent::GuardianAlreadyExists { account_eth_addr, @@ -190,7 +190,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError> body_attachments: None, }; - send_email(email).await?; + send_email(email, None).await?; } EmailAuthEvent::RecoveryRequest { account_eth_addr, @@ -226,7 +226,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError> body_attachments: None, }; - send_email(email).await?; + send_email(email, Some(ExpectsReply::new(request_id))).await?; } EmailAuthEvent::AcceptanceSuccess { account_eth_addr, @@ -259,7 +259,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError> body_attachments: None, }; - send_email(email).await?; + send_email(email, None).await?; } EmailAuthEvent::RecoverySuccess { account_eth_addr, @@ -292,7 +292,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError> body_attachments: None, }; - send_email(email).await?; + send_email(email, None).await?; } EmailAuthEvent::GuardianNotSet { account_eth_addr, @@ -317,7 +317,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError> body_attachments: None, }; - send_email(email).await?; + send_email(email, None).await?; } EmailAuthEvent::GuardianNotRegistered { account_eth_addr, @@ -355,7 +355,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError> body_attachments: None, }; - send_email(email).await?; + send_email(email, Some(ExpectsReply::new(request_id))).await?; } EmailAuthEvent::Ack { email_addr, @@ -379,7 +379,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError> reply_to: original_message_id, body_attachments: None, }; - send_email(email).await?; + send_email(email, None).await?; } EmailAuthEvent::NoOp => {} } @@ -457,7 +457,10 @@ pub fn parse_error(error: String) -> Result> { /// # Returns /// /// A `Result` indicating success or an `EmailError`. -pub async fn send_email(email: EmailMessage) -> Result<(), EmailError> { +pub async fn send_email( + email: EmailMessage, + expects_reply: Option, +) -> Result<(), EmailError> { let smtp_server = SMTP_SERVER.get().unwrap(); // Send POST request to email server @@ -476,5 +479,56 @@ pub async fn send_email(email: EmailMessage) -> Result<(), EmailError> { ))); } + if let Some(expects_reply) = expects_reply { + let response_body: EmailResponse = response + .json() + .await + .map_err(|e| EmailError::Parse(format!("Failed to parse response JSON: {}", e)))?; + + let message_id = response_body.message_id; + DB.add_expected_reply(&message_id, expects_reply.request_id) + .await?; + } + Ok(()) } + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct EmailResponse { + status: String, + message_id: String, +} + +pub struct ExpectsReply { + request_id: Option, +} + +impl ExpectsReply { + fn new(request_id: u32) -> Self { + Self { + request_id: Some(request_id.to_string()), + } + } + + fn new_no_request_id() -> Self { + Self { request_id: None } + } +} + +/// Checks if the email is a reply to a command that expects a reply. +/// Will return false for duplicate replies. +/// Will return true if the email is not a reply. +pub async fn check_is_valid_request(email: &ParsedEmail) -> Result { + let reply_message_id = match email + .headers + .get_header("In-Reply-To") + .and_then(|v| v.first().cloned()) + { + Some(id) => id, + // Email is not a reply + None => return Ok(true), + }; + + let is_valid = DB.is_valid_reply(&reply_message_id).await?; + Ok(is_valid) +} diff --git a/packages/relayer/src/modules/web_server/rest_api.rs b/packages/relayer/src/modules/web_server/rest_api.rs index df9bce3..add61e8 100644 --- a/packages/relayer/src/modules/web_server/rest_api.rs +++ b/packages/relayer/src/modules/web_server/rest_api.rs @@ -19,7 +19,6 @@ use std::str; pub async fn request_status_api( Json(payload): Json, ) -> Result, ApiError> { - println!("requesting status"); let row = DB.get_request(payload.request_id).await?; let status = if let Some(ref row) = row { if row.is_processed { @@ -53,7 +52,6 @@ pub async fn request_status_api( pub async fn handle_acceptance_request( Json(payload): Json, ) -> Result, ApiError> { - println!("handle_acceptance_request"); let command_template = CLIENT .get_acceptance_command_templates(&payload.controller_eth_addr, payload.template_idx) .await?; @@ -222,15 +220,12 @@ pub async fn handle_acceptance_request( pub async fn handle_recovery_request( Json(payload): Json, ) -> Result, ApiError> { - println!("handle_recovery_request: {:?}", payload); let command_template = CLIENT .get_recovery_command_templates(&payload.controller_eth_addr, payload.template_idx) .await?; - println!("command_template: {:?}", command_template); let command_params = extract_template_vals(&payload.command, command_template) .map_err(|_| ApiError::Validation("Invalid command".to_string()))?; - println!("command_params"); let account_eth_addr = CLIENT .get_recovered_account_from_recovery_command( @@ -240,22 +235,15 @@ pub async fn handle_recovery_request( ) .await?; - println!("account_eth_addr"); - let account_eth_addr = format!("0x{:x}", account_eth_addr); - println!("account_eth_addr"); if !CLIENT.is_wallet_deployed(&account_eth_addr).await? { return Err(ApiError::Validation("Wallet not deployed".to_string())); } - println!("wallet is deployed"); - // Check if hash of bytecode of proxy contract is equal or not let bytecode = CLIENT.get_bytecode(&account_eth_addr).await?; - println!("bytecode"); let bytecode_hash = format!("0x{}", hex::encode(keccak256(bytecode.as_ref()))); - println!("bytecode_hash"); // let permitted_wallets: Vec = // serde_json::from_str(include_str!("../../permitted_wallets.json")).unwrap(); @@ -304,37 +292,24 @@ pub async fn handle_recovery_request( // } let mut request_id = rand::thread_rng().gen::(); - println!("got request_id"); while let Ok(Some(request)) = DB.get_request(request_id).await { request_id = rand::thread_rng().gen::(); } - println!("got request: {:?}", request_id); - println!("account_eth_addr: {:?}", account_eth_addr); - println!( - "payload.guardian_email_addr: {:?}", - payload.guardian_email_addr - ); - let account = DB .get_credentials_from_wallet_and_email(&account_eth_addr, &payload.guardian_email_addr) .await?; - println!("got account: {:?}", account); - let account_salt = if let Some(account_details) = account { calculate_account_salt(&payload.guardian_email_addr, &account_details.account_code) } else { return Err(ApiError::Validation("Wallet not deployed".to_string())); }; - println!("got account_salt"); - if !DB .is_wallet_and_email_registered(&account_eth_addr, &payload.guardian_email_addr) .await? { - println!("email and wallet are not registered"); DB.insert_request(&Request { request_id, account_eth_addr: account_eth_addr.clone(), @@ -377,13 +352,10 @@ pub async fn handle_recovery_request( }) .await?; - println!("inserted request"); - if DB .is_guardian_set(&account_eth_addr, &payload.guardian_email_addr) .await? { - println!("guardian is set"); handle_email_event(EmailAuthEvent::RecoveryRequest { account_eth_addr, guardian_email_addr: payload.guardian_email_addr.clone(), @@ -394,7 +366,6 @@ pub async fn handle_recovery_request( // TODO: Add custom error for handle_email_event .expect("Failed to send Recovery event"); } else { - println!("guardian is not set"); handle_email_event(EmailAuthEvent::GuardianNotSet { account_eth_addr, guardian_email_addr: payload.guardian_email_addr.clone(), @@ -404,8 +375,6 @@ pub async fn handle_recovery_request( .expect("Failed to send Recovery event"); } - println!("all done"); - Ok(Json(RecoveryResponse { request_id, command_params, @@ -424,11 +393,9 @@ pub async fn handle_recovery_request( pub async fn handle_complete_recovery_request( Json(payload): Json, ) -> Result { - println!("handle_complete_recovery_request"); if !CLIENT.is_wallet_deployed(&payload.account_eth_addr).await? { return Err(ApiError::Validation("Wallet not deployed".to_string())); } - println!("wallet is deployed"); match CLIENT .complete_recovery( @@ -472,7 +439,6 @@ pub async fn handle_complete_recovery_request( pub async fn get_account_salt( Json(payload): Json, ) -> Result { - println!("get_account_salt"); let account_salt = calculate_account_salt(&payload.email_addr, &payload.account_code); Ok(account_salt) } @@ -489,7 +455,6 @@ pub async fn get_account_salt( pub async fn inactive_guardian( Json(payload): Json, ) -> Result { - println!("inactive_guardian"); let is_activated = CLIENT .get_is_activated(&payload.controller_eth_addr, &payload.account_eth_addr) .await?; @@ -535,11 +500,15 @@ fn parse_error_message(error_data: String) -> String { /// /// A `Result` containing `()` or an `ApiError`. pub async fn receive_email_api_fn(email: String) -> Result<(), ApiError> { - println!("receive_email_api_fn"); let parsed_email = ParsedEmail::new_from_raw_email(&email).await?; let from_addr = parsed_email.get_from_addr()?; let original_subject = parsed_email.get_subject_all()?; tokio::spawn(async move { + if !check_is_valid_request(&parsed_email).await.unwrap() { + trace!(LOG, "Got a non valid email request. Ignoring."); + return; + } + match handle_email_event(EmailAuthEvent::Ack { email_addr: from_addr.clone(), command: parsed_email.get_command(false).unwrap_or_default(),