Skip to content

Commit

Permalink
refactor(notifications): remove novu in favor of fcm
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed Feb 7, 2024
1 parent ee80323 commit aa0580e
Show file tree
Hide file tree
Showing 13 changed files with 34 additions and 1,020 deletions.
6 changes: 0 additions & 6 deletions core/notifications/src/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ fn default_tracing_config() -> TracingConfig {
pub struct EnvOverride {
pub db_con: String,
pub mongodb_connection: Option<String>,
pub novu_api_key: Option<String>,
}

impl Config {
Expand All @@ -43,7 +42,6 @@ impl Config {
EnvOverride {
db_con,
mongodb_connection,
novu_api_key,
}: EnvOverride,
) -> anyhow::Result<Self> {
let mut config: Config = if let Some(path) = path {
Expand All @@ -56,10 +54,6 @@ impl Config {
config.db.pg_con = db_con;
config.mongo_import.connection = mongodb_connection;

if let Some(novu_api_key) = novu_api_key {
config.app.executor.novu.api_key = novu_api_key;
}

config.app.executor.fcm.load_creds()?;

Ok(config)
Expand Down
3 changes: 0 additions & 3 deletions core/notifications/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ struct Cli {
pg_con: String,
#[clap(env = "MONGODB_CON")]
mongodb_connection: Option<String>,
#[clap(env = "NOVU_API_KEY")]
novu_api_key: Option<String>,
}

pub async fn run() -> anyhow::Result<()> {
Expand All @@ -28,7 +26,6 @@ pub async fn run() -> anyhow::Result<()> {
EnvOverride {
db_con: cli.pg_con,
mongodb_connection: cli.mongodb_connection,
novu_api_key: cli.novu_api_key,
},
)?;

Expand Down
43 changes: 0 additions & 43 deletions core/notifications/src/executor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,5 @@ use super::fcm::FcmConfig;

#[derive(Clone, Default, Debug, Deserialize, Serialize)]
pub struct ExecutorConfig {
pub novu: NovuConfig,
pub fcm: FcmConfig,
}

#[derive(Clone, Default, Debug, Deserialize, Serialize)]
pub struct NovuConfig {
#[serde(default)]
pub api_key: String,
pub workflows: NovuWorkflows,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct NovuWorkflows {
#[serde(default = "default_circle_grew_workflow_id")]
pub circle_grew: String,
#[serde(default = "default_circle_threshold_reached_workflow_id")]
pub circle_threshold_reached: String,
}

impl NovuWorkflows {
pub fn for_name(&self, name: &str) -> String {
match name {
"circle_grew" => self.circle_grew.clone(),
"circle_threshold_reached" => self.circle_threshold_reached.clone(),
_ => unreachable!("unknown workflow name: {}", name),
}
}
}

impl Default for NovuWorkflows {
fn default() -> Self {
Self {
circle_grew: default_circle_grew_workflow_id(),
circle_threshold_reached: default_circle_threshold_reached_workflow_id(),
}
}
}

fn default_circle_grew_workflow_id() -> String {
String::from("circle_grew")
}

fn default_circle_threshold_reached_workflow_id() -> String {
String::from("circle_threshold_reached")
}
4 changes: 1 addition & 3 deletions core/notifications/src/executor/error.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use thiserror::Error;

use super::{fcm::error::FcmError, novu::error::NovuError};
use super::fcm::error::FcmError;
use crate::user_notification_settings::error::*;

#[derive(Error, Debug)]
pub enum ExecutorError {
#[error("ExecutorError - Novu: {0}")]
Novu(#[from] NovuError),
#[error("ExecutorError - Novu: {0}")]
Fcm(#[from] FcmError),
#[error("ExecutorError - UserNotificationSettingsError: {0}")]
Expand Down
151 changes: 8 additions & 143 deletions core/notifications/src/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
mod config;
pub mod error;
mod fcm;
mod novu;

use fcm::FcmClient;
use novu::{events::*, subscriber::*, NovuClient};
use std::collections::HashMap;

use crate::{
messages::LocalizedMessage, notification_event::*, primitives::*, user_notification_settings::*,
};
use crate::{notification_event::*, primitives::*, user_notification_settings::*};

pub use config::*;
use error::*;
pub use novu::events::AllowedPayloadValues;

#[derive(Clone)]
pub struct Executor {
config: ExecutorConfig,
novu: NovuClient,
_config: ExecutorConfig,
fcm: FcmClient,
settings: UserNotificationSettingsRepo,
}
Expand All @@ -28,38 +21,14 @@ impl Executor {
mut config: ExecutorConfig,
settings: UserNotificationSettingsRepo,
) -> Result<Self, ExecutorError> {
let _fcm = FcmClient::init(config.fcm.service_account_key()).await?;
Ok(Self {
novu: NovuClient::new(config.novu.api_key.clone(), None)?,
config,
fcm: FcmClient::init(config.fcm.service_account_key()).await?,
_config: config,
settings,
fcm: _fcm,
})
}

pub async fn notify_new<T: Into<LocalizedMessage>>(
&self,
user_id: GaloyUserId,
msg: T,
) -> Result<(), ExecutorError> {
let settings = self.settings.find_for_user_id(&user_id).await?;
if !settings.should_send_notification(
UserNotificationChannel::Push,
UserNotificationCategory::Circles,
) {
return Ok(());
}

let msg = msg.into();

self.fcm
.send(settings.push_device_tokens(), msg, DeepLink::Circles)
.await?;

Ok(())
}

pub async fn notify<T: NotificationEvent>(&self, event: T) -> Result<(), ExecutorError> {
pub async fn notify<T: NotificationEventNew>(&self, event: T) -> Result<(), ExecutorError> {
let settings = self.settings.find_for_user_id(event.user_id()).await?;
if !settings.should_send_notification(
UserNotificationChannel::Push,
Expand All @@ -68,116 +37,12 @@ impl Executor {
return Ok(());
}

self.create_subscriber_for_galoy_id(event.user_id()).await?;
self.update_push_credentials(event.user_id(), &settings.push_device_tokens())
.await?;

let user_id = event.user_id().to_string();
let payload = event.into_payload();
let overrides = <T as NotificationEvent>::into_overrides();
let msg = event.to_localized_msg(settings.locale().unwrap_or_default());

let id = self
.config
.novu
.workflows
.for_name(<T as NotificationEvent>::workflow_name());

self.trigger_workflow(user_id, id, payload, overrides)
.await?;

Ok(())
}

async fn trigger_workflow(
&self,
user_id: String,
trigger_name: String,
payload_data: HashMap<String, AllowedPayloadValues>,
overrides: Option<HashMap<String, serde_json::Value>>,
) -> Result<(), ExecutorError> {
self.novu
.trigger(TriggerPayload {
name: trigger_name,
payload: payload_data,
to: TriggerRecipientsType::Single(TriggerRecipientBuilder::new(user_id).build()),
overrides,
})
.await?;

Ok(())
}

async fn update_push_credentials(
&self,
user_id: &GaloyUserId,
push_device_tokens: impl IntoIterator<Item = &PushDeviceToken>,
) -> Result<(), ExecutorError> {
self.novu
.subscribers
.update_credentials(
user_id.to_string(),
UpdateCredentialsPayload {
provider_id: ProviderId::Fcm,
integration_identifier: None,
credentials: Credentials {
webhook_url: "".to_string(),
device_tokens: Some(
push_device_tokens
.into_iter()
.map(|t| t.to_string())
.collect(),
),
channel: None,
title: None,
image_url: None,
alert_uid: None,
state: None,
external_url: None,
},
},
)
.await?;
Ok(())
}

pub async fn trigger_email_workflow(
&self,
trigger_name: String,
recipient_email: String,
recipient_id: String,
) -> Result<(), ExecutorError> {
let payload = HashMap::new();
self.novu
.trigger(TriggerPayload {
name: trigger_name,
payload,
to: TriggerRecipientsType::Single(
TriggerRecipientBuilder::new(recipient_id)
.email(recipient_email)
.build(),
),
overrides: None,
})
self.fcm
.send(settings.push_device_tokens(), msg, event.deep_link())
.await?;
Ok(())
}

async fn create_subscriber_for_galoy_id(
&self,
user_id: &GaloyUserId,
) -> Result<(), ExecutorError> {
self.novu
.subscribers
.create(CreateSubscriberPayload {
first_name: None,
last_name: None,
email: None,
phone: None,
avatar: None,
subscriber_id: user_id.to_string(),
data: None,
})
.await?;
Ok(())
}
}
Loading

0 comments on commit aa0580e

Please sign in to comment.