Skip to content

Commit

Permalink
Prevent duplicates by checking if it is a valid reply
Browse files Browse the repository at this point in the history
  • Loading branch information
DimiDumo committed Sep 19, 2024
1 parent 164ea7c commit 7cf8282
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion packages/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ tiny_http = "0.12.0"
lettre = { version = "0.10.4", features = ["tokio1", "tokio1-native-tls"] }
ethers = { version = "2.0.10", features = ["abigen"] }
# relayer-utils = { version = "0.3.7", git = "https://github.com/zkemail/relayer-utils.git" }
relayer-utils = { rev = "94d78d6", git = "https://github.com/zkemail/relayer-utils.git" }
# relayer-utils = { rev = "94d78d6", git = "https://github.com/zkemail/relayer-utils.git" }
relayer-utils = { path = "../../../relayer-utils" }
futures = "0.3.28"
sqlx = { version = "=0.7.3", features = ["postgres", "runtime-tokio"] }
regex = "1.10.2"
Expand Down
48 changes: 48 additions & 0 deletions packages/relayer/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ impl Database {
)
.execute(&self.db)
.await?;

sqlx::query(
"CREATE TABLE IF NOT EXISTS expected_replies (
message_id VARCHAR(255) PRIMARY KEY,
request_id VARCHAR(255),
has_reply BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT (NOW() AT TIME ZONE 'UTC')
);",
)
.execute(&self.db)
.await?;
Ok(())
}

Expand Down Expand Up @@ -392,4 +403,41 @@ impl Database {
info!(LOG, "Request inserted with request_id: {}", request_id);
Ok(())
}

pub(crate) async fn add_expected_reply(
&self,
message_id: &str,
request_id: Option<String>,
) -> Result<(), DatabaseError> {
let query = "
INSERT INTO expected_replies (message_id, request_id)
VALUES ($1, $2);
";
sqlx::query(query)
.bind(message_id)
.bind(request_id)
.execute(&self.db)
.await
.map_err(|e| DatabaseError::new("Failed to insert expected_reply", e))?;
Ok(())
}

// Checks if the given message_id corresponds to a valid reply.
// This function updates the `has_reply` field to true if the message_id exists and hasn't been replied to yet.
// Returns true if the update was successful (i.e., a valid reply was recorded), false otherwise,
// also if no record exists to be updated.
pub(crate) async fn is_valid_reply(&self, message_id: &str) -> Result<bool, DatabaseError> {
let query = "
UPDATE expected_replies
SET has_reply = true
WHERE message_id = $1 AND has_reply = false
RETURNING *;
";
let result = sqlx::query(query)
.bind(message_id)
.execute(&self.db)
.await
.map_err(|e| DatabaseError::new("Failed to validate reply", e))?;
Ok(result.rows_affected() > 0)
}
}
74 changes: 64 additions & 10 deletions packages/relayer/src/modules/mail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError>
body_attachments: None,
};

send_email(email).await?;
send_email(email, Some(ExpectsReply::new(request_id))).await?;
}
EmailAuthEvent::Error {
email_addr,
Expand Down Expand Up @@ -161,7 +161,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError>
body_attachments: None,
};

send_email(email).await?;
send_email(email, None).await?;
}
EmailAuthEvent::GuardianAlreadyExists {
account_eth_addr,
Expand Down Expand Up @@ -190,7 +190,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError>
body_attachments: None,
};

send_email(email).await?;
send_email(email, None).await?;
}
EmailAuthEvent::RecoveryRequest {
account_eth_addr,
Expand Down Expand Up @@ -226,7 +226,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError>
body_attachments: None,
};

send_email(email).await?;
send_email(email, Some(ExpectsReply::new(request_id))).await?;
}
EmailAuthEvent::AcceptanceSuccess {
account_eth_addr,
Expand Down Expand Up @@ -259,7 +259,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError>
body_attachments: None,
};

send_email(email).await?;
send_email(email, None).await?;
}
EmailAuthEvent::RecoverySuccess {
account_eth_addr,
Expand Down Expand Up @@ -292,7 +292,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError>
body_attachments: None,
};

send_email(email).await?;
send_email(email, None).await?;
}
EmailAuthEvent::GuardianNotSet {
account_eth_addr,
Expand All @@ -317,7 +317,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError>
body_attachments: None,
};

send_email(email).await?;
send_email(email, None).await?;
}
EmailAuthEvent::GuardianNotRegistered {
account_eth_addr,
Expand Down Expand Up @@ -355,7 +355,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError>
body_attachments: None,
};

send_email(email).await?;
send_email(email, Some(ExpectsReply::new(request_id))).await?;
}
EmailAuthEvent::Ack {
email_addr,
Expand All @@ -379,7 +379,7 @@ pub async fn handle_email_event(event: EmailAuthEvent) -> Result<(), EmailError>
reply_to: original_message_id,
body_attachments: None,
};
send_email(email).await?;
send_email(email, None).await?;
}
EmailAuthEvent::NoOp => {}
}
Expand Down Expand Up @@ -457,7 +457,10 @@ pub fn parse_error(error: String) -> Result<Option<String>> {
/// # Returns
///
/// A `Result` indicating success or an `EmailError`.
pub async fn send_email(email: EmailMessage) -> Result<(), EmailError> {
pub async fn send_email(
email: EmailMessage,
expects_reply: Option<ExpectsReply>,
) -> Result<(), EmailError> {
let smtp_server = SMTP_SERVER.get().unwrap();

// Send POST request to email server
Expand All @@ -476,5 +479,56 @@ pub async fn send_email(email: EmailMessage) -> Result<(), EmailError> {
)));
}

if let Some(expects_reply) = expects_reply {
let response_body: EmailResponse = response
.json()
.await
.map_err(|e| EmailError::Parse(format!("Failed to parse response JSON: {}", e)))?;

let message_id = response_body.message_id;
DB.add_expected_reply(&message_id, expects_reply.request_id)
.await?;
}

Ok(())
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct EmailResponse {
status: String,
message_id: String,
}

pub struct ExpectsReply {
request_id: Option<String>,
}

impl ExpectsReply {
fn new(request_id: u32) -> Self {
Self {
request_id: Some(request_id.to_string()),
}
}

fn new_no_request_id() -> Self {
Self { request_id: None }
}
}

/// Checks if the email is a reply to a command that expects a reply.
/// Will return false for duplicate replies.
/// Will return true if the email is not a reply.
pub async fn check_is_valid_request(email: &ParsedEmail) -> Result<bool, EmailError> {
let reply_message_id = match email
.headers
.get_header("In-Reply-To")
.and_then(|v| v.first().cloned())
{
Some(id) => id,
// Email is not a reply
None => return Ok(true),
};

let is_valid = DB.is_valid_reply(&reply_message_id).await?;
Ok(is_valid)
}
Loading

0 comments on commit 7cf8282

Please sign in to comment.