Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Send webhooks for invoice events #425

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 55 additions & 16 deletions modules/meteroid/crates/meteroid-store/src/domain/outbox_event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::domain::enums::BillingPeriodEnum;
use crate::domain::{Address, Customer, ShippingAddress, Subscription};
use crate::domain::enums::{BillingPeriodEnum, InvoiceStatusEnum};
use crate::domain::{Address, Customer, DetailedInvoice, ShippingAddress, Subscription};
use crate::errors::{StoreError, StoreErrorReport};
use crate::utils::local_id::{IdType, LocalId};
use crate::StoreResult;
Expand All @@ -18,7 +18,7 @@ pub struct OutboxEvent {
}

impl OutboxEvent {
pub fn customer_created(event: CustomerCreatedEvent) -> OutboxEvent {
pub fn customer_created(event: CustomerEvent) -> OutboxEvent {
OutboxEvent {
tenant_id: event.tenant_id,
aggregate_id: event.id,
Expand All @@ -34,15 +34,23 @@ impl OutboxEvent {
}
}

pub fn invoice_finalized(tenant_id: Uuid, invoice_id: Uuid) -> OutboxEvent {
pub fn invoice_created(event: InvoiceEvent) -> OutboxEvent {
OutboxEvent {
tenant_id,
aggregate_id: invoice_id,
event_type: EventType::InvoiceFinalized,
tenant_id: event.tenant_id,
aggregate_id: event.id,
event_type: EventType::InvoiceCreated(Box::new(event)),
}
}

pub fn subscription_created(event: SubscriptionCreatedEvent) -> OutboxEvent {
pub fn invoice_finalized(event: InvoiceEvent) -> OutboxEvent {
OutboxEvent {
tenant_id: event.tenant_id,
aggregate_id: event.id,
event_type: EventType::InvoiceFinalized(Box::new(event)),
}
}

pub fn subscription_created(event: SubscriptionEvent) -> OutboxEvent {
OutboxEvent {
tenant_id: event.tenant_id,
aggregate_id: event.id,
Expand All @@ -53,7 +61,8 @@ impl OutboxEvent {
fn payload_json(&self) -> StoreResult<Option<serde_json::Value>> {
match &self.event_type {
EventType::CustomerCreated(event) => Ok(Some(Self::event_json(event)?)),
EventType::InvoiceFinalized => Ok(None),
EventType::InvoiceCreated(event) => Ok(Some(Self::event_json(event)?)),
EventType::InvoiceFinalized(event) => Ok(Some(Self::event_json(event)?)),
EventType::InvoicePdfRequested => Ok(None),
EventType::SubscriptionCreated(event) => Ok(Some(Self::event_json(event)?)),
}
Expand All @@ -75,21 +84,23 @@ impl OutboxEvent {
#[derive(Display)]
pub enum EventType {
#[strum(serialize = "customer.created")]
CustomerCreated(Box<CustomerCreatedEvent>),
CustomerCreated(Box<CustomerEvent>),
#[strum(serialize = "invoice.created")]
InvoiceCreated(Box<InvoiceEvent>),
#[strum(serialize = "invoice.finalized")]
/// todo this needs payload as well
InvoiceFinalized,
InvoiceFinalized(Box<InvoiceEvent>),
#[strum(serialize = "invoice.pdf.requested")]
InvoicePdfRequested,
#[strum(serialize = "subscription.created")]
SubscriptionCreated(Box<SubscriptionCreatedEvent>),
SubscriptionCreated(Box<SubscriptionEvent>),
}

impl EventType {
pub fn aggregate_type(&self) -> String {
match self {
EventType::CustomerCreated(_) => "customer".to_string(),
EventType::InvoiceFinalized => "invoice".to_string(),
EventType::InvoiceCreated(_) => "invoice".to_string(),
EventType::InvoiceFinalized(_) => "invoice".to_string(),
EventType::InvoicePdfRequested => "invoice".to_string(),
EventType::SubscriptionCreated(_) => "subscription".to_string(),
}
Expand All @@ -113,7 +124,7 @@ impl TryInto<OutboxEventRowNew> for OutboxEvent {

#[derive(Debug, Serialize, Deserialize, o2o)]
#[from_owned(Customer)]
pub struct CustomerCreatedEvent {
pub struct CustomerEvent {
pub id: Uuid,
pub local_id: String,
pub tenant_id: Uuid,
Expand All @@ -135,7 +146,7 @@ pub struct CustomerCreatedEvent {

#[derive(Debug, Serialize, Deserialize, o2o)]
#[from_owned(Subscription)]
pub struct SubscriptionCreatedEvent {
pub struct SubscriptionEvent {
pub id: Uuid,
pub local_id: String,
pub tenant_id: Uuid,
Expand Down Expand Up @@ -171,3 +182,31 @@ pub struct SubscriptionCreatedEvent {
pub mrr_cents: u64,
pub period: BillingPeriodEnum,
}

#[derive(Debug, Serialize, Deserialize, o2o)]
#[from_owned(DetailedInvoice)]
pub struct InvoiceEvent {
#[map(@.invoice.id)]
pub id: Uuid,
#[map(@.invoice.local_id)]
pub local_id: String,
#[map(@.invoice.status)]
pub status: InvoiceStatusEnum,
#[map(@.invoice.tenant_id)]
pub tenant_id: Uuid,
#[map(@.invoice.customer_id)]
pub customer_id: Uuid,
#[map(@.customer.local_id)]
pub customer_local_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
#[map(@.invoice.subscription_id)]
pub subscription_id: Option<Uuid>,
#[map(@.invoice.currency)]
pub currency: String,
#[map(@.invoice.tax_amount)]
pub tax_amount: i64,
#[map(@.invoice.total)]
pub total: i64,
#[map(@.invoice.created_at)]
pub created_at: NaiveDateTime,
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ pub struct WebhookOutMessageNew {
pub enum WebhookOutMessagePayload {
Customer(serde_json::Value),
Subscription(serde_json::Value),
Invoice(serde_json::Value),
}

impl TryFrom<WebhookOutMessageNew> for svix::api::MessageIn {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::domain::{
};
use crate::errors::StoreError;
use crate::repositories::customer_balance::CustomerBalance;
use crate::repositories::invoices::insert_invoice;
use crate::repositories::invoices::insert_invoice_tx;
use crate::repositories::invoicing_entities::InvoicingEntityInterface;
use crate::repositories::InvoiceInterface;
use crate::store::Store;
Expand Down Expand Up @@ -442,7 +442,7 @@ impl CustomersInterface for Store {
},
};

let inserted_invoice = insert_invoice(conn, invoice_new).await?;
let inserted_invoice = insert_invoice_tx(self, conn, invoice_new).await?;

InvoicingEntityRow::update_invoicing_entity_number(
conn,
Expand Down
85 changes: 53 additions & 32 deletions modules/meteroid/crates/meteroid-store/src/repositories/invoices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,34 +151,17 @@ impl InvoiceInterface for Store {
}

async fn insert_invoice(&self, invoice: InvoiceNew) -> StoreResult<Invoice> {
let mut conn = self.get_conn().await?;

insert_invoice(&mut conn, invoice).await
self.transaction(|conn| {
async move { insert_invoice_tx(self, conn, invoice).await }.scope_boxed()
})
.await
}

async fn insert_invoice_batch(&self, invoice: Vec<InvoiceNew>) -> StoreResult<Vec<Invoice>> {
let mut conn = self.get_conn().await?;

let insertable_invoice: Vec<InvoiceRowNew> = invoice
.into_iter()
.map(|c| c.try_into())
.collect::<Result<_, _>>()?;

let inserted: Vec<Invoice> =
InvoiceRow::insert_invoice_batch(&mut conn, insertable_invoice)
.await
.map_err(Into::<Report<StoreError>>::into)
.and_then(|v| {
v.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, Report<StoreError>>>()
})?;

for inv in &inserted {
process_mrr(inv, &mut conn).await?; // TODO batch
}

Ok(inserted)
self.transaction(|conn| {
async move { insert_invoice_batch_tx(self, conn, invoice).await }.scope_boxed()
})
.await
}

async fn update_invoice_external_status(
Expand Down Expand Up @@ -300,10 +283,15 @@ impl InvoiceInterface for Store {
.await
.map_err(Into::<Report<StoreError>>::into)?;

let final_invoice: DetailedInvoice = InvoiceRow::find_by_id(conn, tenant_id, id)
.await
.map_err(Into::into)
.and_then(|row| row.try_into())?;

self.internal
.insert_outbox_events_tx(
conn,
vec![OutboxEvent::invoice_finalized(tenant_id, id)],
vec![OutboxEvent::invoice_finalized(final_invoice.into())],
)
.await?;

Expand Down Expand Up @@ -581,16 +569,49 @@ async fn compute_invoice_patch(
}
}

pub async fn insert_invoice(conn: &mut PgConn, invoice: InvoiceNew) -> StoreResult<Invoice> {
let insertable_invoice: InvoiceRowNew = invoice.try_into()?;
pub async fn insert_invoice_tx(
store: &Store,
tx: &mut PgConn,
invoice: InvoiceNew,
) -> StoreResult<Invoice> {
insert_invoice_batch_tx(store, tx, vec![invoice])
.await?
.pop()
.ok_or(StoreError::InsertError.into())
}

let inserted: Invoice = insertable_invoice
.insert(conn)
async fn insert_invoice_batch_tx(
store: &Store,
tx: &mut PgConn,
invoice: Vec<InvoiceNew>,
) -> StoreResult<Vec<Invoice>> {
let insertable_invoice: Vec<InvoiceRowNew> = invoice
.into_iter()
.map(|c| c.try_into())
.collect::<Result<_, _>>()?;

let inserted: Vec<Invoice> = InvoiceRow::insert_invoice_batch(tx, insertable_invoice)
.await
.map_err(Into::<Report<StoreError>>::into)
.and_then(TryInto::try_into)?;
.and_then(|v| {
v.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, Report<StoreError>>>()
})?;

// TODO batch
for inv in &inserted {
process_mrr(inv, tx).await?;
let final_invoice: DetailedInvoice = InvoiceRow::find_by_id(tx, inv.tenant_id, inv.id)
.await
.map_err(Into::into)
.and_then(|row| row.try_into())?;

process_mrr(&inserted, conn).await?;
store
.internal
.insert_outbox_events_tx(tx, vec![OutboxEvent::invoice_created(final_invoice.into())])
.await?;
}

Ok(inserted)
}
Expand Down
22 changes: 15 additions & 7 deletions modules/meteroid/src/workers/kafka/outbox.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::{DateTime, Utc};
use meteroid_store::domain::outbox_event::{CustomerCreatedEvent, SubscriptionCreatedEvent};
use meteroid_store::domain::outbox_event::{CustomerEvent, InvoiceEvent, SubscriptionEvent};
use rdkafka::message::{BorrowedHeaders, BorrowedMessage, Headers};
use rdkafka::Message;
use serde::Deserialize;
Expand All @@ -16,10 +16,11 @@ pub struct OutboxEvent {

#[derive(Debug)]
pub enum EventType {
CustomerCreated(Box<CustomerCreatedEvent>),
InvoiceFinalized,
CustomerCreated(Box<CustomerEvent>),
InvoiceCreated(Box<InvoiceEvent>),
InvoiceFinalized(Box<InvoiceEvent>),
InvoicePdfRequested,
SubscriptionCreated(Box<SubscriptionCreatedEvent>),
SubscriptionCreated(Box<SubscriptionEvent>),
}

impl EventType {
Expand All @@ -31,13 +32,20 @@ impl EventType {

match event_type.as_str() {
"customer.created" => {
let payload = extract_payload::<CustomerCreatedEvent>(m).ok()??;
let payload = extract_payload::<CustomerEvent>(m).ok()??;
Some(Self::CustomerCreated(Box::new(payload)))
}
"invoice.finalized" => Some(Self::InvoiceFinalized),
"invoice.created" => {
let payload = extract_payload::<InvoiceEvent>(m).ok()??;
Some(Self::InvoiceCreated(Box::new(payload)))
}
"invoice.finalized" => {
let payload = extract_payload::<InvoiceEvent>(m).ok()??;
Some(Self::InvoiceFinalized(Box::new(payload)))
}
"invoice.pdf.requested" => Some(Self::InvoicePdfRequested),
"subscription.created" => {
let payload = extract_payload::<SubscriptionCreatedEvent>(m).ok()??;
let payload = extract_payload::<SubscriptionEvent>(m).ok()??;
Some(Self::SubscriptionCreated(Box::new(payload)))
}
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion modules/meteroid/src/workers/kafka/pdf_renderer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl MessageHandler for PdfRendererHandler {
log::info!("Processing message: {:?}", event);

match event.event_type {
EventType::InvoiceFinalized | EventType::InvoicePdfRequested => {
EventType::InvoiceFinalized(_) | EventType::InvoicePdfRequested => {
let invoice_id: Uuid = parse_uuid(event.aggregate_id.as_str(), "aggregate_id")?;

let result = self.pdf_service.generate_pdfs(vec![invoice_id]).await;
Expand Down
6 changes: 5 additions & 1 deletion modules/meteroid/src/workers/kafka/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ use meteroid_store::Store;
use std::sync::Arc;

pub async fn run_webhook_outbox_processor(kafka_config: &KafkaConnectionConfig, store: Arc<Store>) {
let topics = vec!["outbox.event.customer", "outbox.event.subscription"];
let topics = vec![
"outbox.event.customer",
"outbox.event.subscription",
"outbox.event.invoice",
];
let group_id = "webhook_outbox_processor";

let handler = Arc::new(WebhookHandler::new(store));
Expand Down
Loading
Loading