Skip to content

Commit

Permalink
chore(notifications): complete the logic for sending email
Browse files Browse the repository at this point in the history
  • Loading branch information
thevaibhav-dixit committed Feb 18, 2024
1 parent 74b4986 commit 8466b93
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 26 deletions.
1 change: 1 addition & 0 deletions core/notifications/notifications.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ app:
smtp:
username: ""
from_email: ""
relay: ""
9 changes: 6 additions & 3 deletions core/notifications/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ pub struct NotificationsApp {
impl NotificationsApp {
pub async fn init(pool: Pool<Postgres>, config: AppConfig) -> Result<Self, ApplicationError> {
let settings = UserNotificationSettingsRepo::new(&pool);
let executor = PushExecutor::init(config.executor.clone(), settings.clone()).await?;
let _email_executor = EmailExecutor::init(config.email_executor.clone(), settings.clone())?;
let runner = job::start_job_runner(&pool, executor).await?;
let push_executor = PushExecutor::init(config.executor.clone(), settings.clone()).await?;
let email_executor = EmailExecutor::init(config.email_executor.clone(), settings.clone())?;
let runner = job::start_job_runner(&pool, push_executor, email_executor).await?;
Ok(Self {
_config: config,
pool,
Expand Down Expand Up @@ -175,6 +175,9 @@ impl NotificationsApp {
event: T,
) -> Result<(), ApplicationError> {
let mut tx = self.pool.begin().await?;
if event.should_send_email() {
job::spawn_send_email_notification(&mut tx, event.clone().into()).await?;
}
job::spawn_send_push_notification(&mut tx, event.into()).await?;
tx.commit().await?;
Ok(())
Expand Down
16 changes: 13 additions & 3 deletions core/notifications/src/email_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ mod config;
pub mod error;
mod smtp;

use tracing::instrument;

use crate::{notification_event::*, user_notification_settings::*};

pub use config::*;
Expand All @@ -28,10 +30,18 @@ impl EmailExecutor {
})
}

#[instrument(name = "email_executor.notify", skip(self))]
pub async fn notify<T: NotificationEvent>(&self, event: &T) -> Result<(), EmailExecutorError> {
let settings = self.settings.find_for_user_id(event.user_id()).await?;
let msg = event.to_localized_msg(settings.locale().unwrap_or_default());
self.smtp.send_email(msg).await?;
if let Some((settings, addr)) = self
.settings
.find_for_user_id(event.user_id())
.await
.ok()
.and_then(|s| s.email_address().map(|addr| (s, addr)))
{
let msg = event.to_localized_msg(settings.locale().unwrap_or_default());
self.smtp.send_email(msg, addr).await?;
}
Ok(())
}
}
1 change: 1 addition & 0 deletions core/notifications/src/email_executor/smtp/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ pub struct SmtpConfig {
#[serde(default)]
pub password: String,
pub from_email: String,
pub relay: String,
}
12 changes: 8 additions & 4 deletions core/notifications/src/email_executor/smtp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use lettre::{
AsyncSmtpTransport, AsyncTransport, Tokio1Executor,
};

use crate::messages::LocalizedMessage;
use crate::{messages::LocalizedMessage, primitives::GaloyEmailAddress};

pub use config::*;
use error::*;
Expand All @@ -22,7 +22,7 @@ impl SmtpClient {
pub fn init(config: SmtpConfig) -> Result<Self, SmtpError> {
let creds = Credentials::new(config.username, config.password);
let client: AsyncSmtpTransport<Tokio1Executor> =
AsyncSmtpTransport::<Tokio1Executor>::starttls_relay("smtp.gmail.com")?
AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(&config.relay)?
.credentials(creds)
.build();
Ok(Self {
Expand All @@ -31,10 +31,14 @@ impl SmtpClient {
})
}

pub async fn send_email(&self, msg: LocalizedMessage) -> Result<(), SmtpError> {
pub async fn send_email(
&self,
msg: LocalizedMessage,
recipient_addr: GaloyEmailAddress,
) -> Result<(), SmtpError> {
let email = Message::builder()
.from(Mailbox::new(None, self.from_email.parse()?))
.to(Mailbox::new(None, "some-email".parse()?))
.to(Mailbox::new(None, recipient_addr.into_inner().parse()?))
.subject(msg.title)
.body(msg.body)?;
self.client.send(email).await?;
Expand Down
6 changes: 4 additions & 2 deletions core/notifications/src/job/error.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use thiserror::Error;

use crate::push_executor::error::PushExecutorError;
use crate::{email_executor::error::EmailExecutorError, push_executor::error::PushExecutorError};

#[derive(Error, Debug)]
pub enum JobError {
#[error("JobError - Sqlx: {0}")]
Sqlx(#[from] sqlx::Error),
#[error("JobError - ExecutorError: {0}")]
#[error("JobError - PushExecutorError: {0}")]
PushExecutor(#[from] PushExecutorError),
#[error("JobError - EmailExecutorError: {0}")]
EmailExecutor(#[from] EmailExecutorError),
}

impl job_executor::JobExecutionError for JobError {}
51 changes: 47 additions & 4 deletions core/notifications/src/job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod send_email_notification;
mod send_push_notification;

pub mod error;
Expand All @@ -7,18 +8,21 @@ use tracing::instrument;

use job_executor::JobExecutor;

use crate::push_executor::PushExecutor;
use crate::{email_executor::EmailExecutor, push_executor::PushExecutor};

use error::JobError;

use send_email_notification::SendEmailNotificationData;
use send_push_notification::SendPushNotificationData;

pub async fn start_job_runner(
pool: &sqlx::PgPool,
executor: PushExecutor,
push_executor: PushExecutor,
email_executor: EmailExecutor,
) -> Result<JobRunnerHandle, JobError> {
let mut registry = JobRegistry::new(&[send_push_notification]);
registry.set_context(executor);
let mut registry = JobRegistry::new(&[send_push_notification, send_email_notification]);
registry.set_context(push_executor);
registry.set_context(email_executor);

Ok(registry.runner(pool).set_keep_alive(false).run().await?)
}
Expand Down Expand Up @@ -61,3 +65,42 @@ pub async fn spawn_send_push_notification(
}
Ok(())
}

#[job(
name = "send_email_notification",
channel_name = "send_email_notification"
)]
async fn send_email_notification(
mut current_job: CurrentJob,
executor: EmailExecutor,
) -> Result<(), JobError> {
JobExecutor::builder(&mut current_job)
.build()
.expect("couldn't build JobExecutor")
.execute(|data| async move {
let data: SendEmailNotificationData =
data.expect("no SendEmailNotificationData available");
send_email_notification::execute(data, executor).await
})
.await?;
Ok(())
}

#[instrument(name = "job.spawn_send_email_notification", skip_all, fields(error, error.level, error.message), err)]
pub async fn spawn_send_email_notification(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
data: impl Into<SendEmailNotificationData>,
) -> Result<(), JobError> {
let data = data.into();
if let Err(e) = send_email_notification
.builder()
.set_json(&data)
.expect("Couldn't set json")
.spawn(&mut **tx)
.await
{
tracing::insert_error_fields(tracing::Level::WARN, &e);
return Err(e.into());
}
Ok(())
}
32 changes: 32 additions & 0 deletions core/notifications/src/job/send_email_notification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use serde::{Deserialize, Serialize};
use tracing::instrument;

use std::collections::HashMap;

use super::error::JobError;
use crate::{email_executor::EmailExecutor, notification_event::NotificationEventPayload};

#[derive(Debug, Serialize, Deserialize)]
pub(super) struct SendEmailNotificationData {
payload: NotificationEventPayload,
#[serde(flatten)]
pub(super) tracing_data: HashMap<String, serde_json::Value>,
}

impl From<NotificationEventPayload> for SendEmailNotificationData {
fn from(payload: NotificationEventPayload) -> Self {
Self {
payload,
tracing_data: tracing::extract_tracing_data(),
}
}
}

#[instrument(name = "job.send_email_notification", skip(executor), err)]
pub async fn execute(
data: SendEmailNotificationData,
executor: EmailExecutor,
) -> Result<SendEmailNotificationData, JobError> {
executor.notify(&data.payload).await?;
Ok(data)
}
53 changes: 45 additions & 8 deletions core/notifications/src/notification_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ pub enum DeepLink {
Circles,
}

pub trait NotificationEvent: std::fmt::Debug + Into<NotificationEventPayload> {
pub trait NotificationEvent: std::fmt::Debug + Into<NotificationEventPayload> + Clone {
fn category(&self) -> UserNotificationCategory;
fn user_id(&self) -> &GaloyUserId;
fn deep_link(&self) -> DeepLink;
fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage;
fn should_send_email(&self) -> bool;
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum NotificationEventPayload {
CircleGrew(CircleGrew),
Expand Down Expand Up @@ -72,9 +73,25 @@ impl NotificationEvent for NotificationEventPayload {
}
}
}

fn should_send_email(&self) -> bool {
match self {
NotificationEventPayload::CircleGrew(event) => event.should_send_email(),
NotificationEventPayload::CircleThresholdReached(event) => event.should_send_email(),
NotificationEventPayload::IdentityVerificationApproved(event) => {
event.should_send_email()
}
NotificationEventPayload::IdentityVerificationDeclined(event) => {
event.should_send_email()
}
NotificationEventPayload::IdentityVerificationReviewPending(event) => {
event.should_send_email()
}
}
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct CircleGrew {
pub user_id: GaloyUserId,
pub circle_type: CircleType,
Expand All @@ -98,6 +115,10 @@ impl NotificationEvent for CircleGrew {
fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage {
Messages::circle_grew(locale.as_ref(), self)
}

fn should_send_email(&self) -> bool {
false
}
}

impl From<CircleGrew> for NotificationEventPayload {
Expand All @@ -106,7 +127,7 @@ impl From<CircleGrew> for NotificationEventPayload {
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct CircleThresholdReached {
pub user_id: GaloyUserId,
pub circle_type: CircleType,
Expand All @@ -130,6 +151,10 @@ impl NotificationEvent for CircleThresholdReached {
fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage {
Messages::circle_threshold_reached(locale.as_ref(), self)
}

fn should_send_email(&self) -> bool {
false
}
}

impl From<CircleThresholdReached> for NotificationEventPayload {
Expand All @@ -138,7 +163,7 @@ impl From<CircleThresholdReached> for NotificationEventPayload {
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct IdentityVerificationApproved {
pub user_id: GaloyUserId,
}
Expand All @@ -159,6 +184,10 @@ impl NotificationEvent for IdentityVerificationApproved {
fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage {
Messages::identity_verification_approved(locale.as_ref(), self)
}

fn should_send_email(&self) -> bool {
true
}
}

impl From<IdentityVerificationApproved> for NotificationEventPayload {
Expand All @@ -167,13 +196,13 @@ impl From<IdentityVerificationApproved> for NotificationEventPayload {
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum IdentityVerificationDeclinedReason {
DocumentsNotClear,
VerificationPhotoNotClear,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct IdentityVerificationDeclined {
pub user_id: GaloyUserId,
pub declined_reason: IdentityVerificationDeclinedReason,
Expand All @@ -195,6 +224,10 @@ impl NotificationEvent for IdentityVerificationDeclined {
fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage {
Messages::identity_verification_declined(locale.as_ref(), self)
}

fn should_send_email(&self) -> bool {
true
}
}

impl From<IdentityVerificationDeclined> for NotificationEventPayload {
Expand All @@ -203,7 +236,7 @@ impl From<IdentityVerificationDeclined> for NotificationEventPayload {
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct IdentityVerificationReviewPending {
pub user_id: GaloyUserId,
}
Expand All @@ -224,6 +257,10 @@ impl NotificationEvent for IdentityVerificationReviewPending {
fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage {
Messages::identity_verification_review_pending(locale.as_ref(), self)
}

fn should_send_email(&self) -> bool {
true
}
}

impl From<IdentityVerificationReviewPending> for NotificationEventPayload {
Expand Down
4 changes: 2 additions & 2 deletions core/notifications/src/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub enum UserNotificationCategory {
AdminNotification,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum CircleType {
Inner,
Outer,
Expand All @@ -135,7 +135,7 @@ impl std::fmt::Display for CircleType {
}
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum CircleTimeFrame {
Month,
AllTime,
Expand Down

0 comments on commit 8466b93

Please sign in to comment.