From 566e5388fbd785d61efdc83613585a7a196867f9 Mon Sep 17 00:00:00 2001 From: KishenKumarrrrr Date: Fri, 22 Nov 2024 11:30:03 +0800 Subject: [PATCH] feat: add custom spans in trace --- .../middlewares/email-callback.middleware.ts | 69 ++++--- .../email/services/email-callback.service.ts | 3 + .../services/email-transactional.service.ts | 167 ++++++++-------- .../src/email/utils/callback/parsers/ses.ts | 183 ++++++++++-------- 4 files changed, 224 insertions(+), 198 deletions(-) diff --git a/backend/src/email/middlewares/email-callback.middleware.ts b/backend/src/email/middlewares/email-callback.middleware.ts index 3df91d7ae..99915e67e 100644 --- a/backend/src/email/middlewares/email-callback.middleware.ts +++ b/backend/src/email/middlewares/email-callback.middleware.ts @@ -1,6 +1,7 @@ import { Request, Response, NextFunction } from 'express' import { EmailCallbackService } from '@email/services' import { loggerWithLabel } from '@core/logger' +import { tracer } from 'dd-trace' const logger = loggerWithLabel(module) @@ -9,22 +10,24 @@ const isAuthenticated = ( res: Response, next: NextFunction ): Response | void => { - const authHeader = req.get('authorization') - if (!authHeader) { - // SNS will send 2 request: - // - first one without the basic authorization first and require the callback - // server to respond with 401 WWW-Authenticate Basic realm="Email" - // - second one with the basic authorization - // The above mechanism is based on RFC-2671 https://www.rfc-editor.org/rfc/rfc2617.html#page-8 - // Of course, this middleare is to reject all requests without the - // Authorization header as well - res.set('WWW-Authenticate', 'Basic realm="Email"') - return res.sendStatus(401) - } - if (EmailCallbackService.isAuthenticated(authHeader)) { - return next() - } - return res.sendStatus(403) + tracer.wrap('isAuthenticated', () => { + const authHeader = req.get('authorization') + if (!authHeader) { + // SNS will send 2 request: + // - first one without the basic authorization first and require the callback + // server to respond with 401 WWW-Authenticate Basic realm="Email" + // - second one with the basic authorization + // The above mechanism is based on RFC-2671 https://www.rfc-editor.org/rfc/rfc2617.html#page-8 + // Of course, this middleare is to reject all requests without the + // Authorization header as well + res.set('WWW-Authenticate', 'Basic realm="Email"') + return res.sendStatus(401) + } + if (EmailCallbackService.isAuthenticated(authHeader)) { + return next() + } + return res.sendStatus(403) + }) } const parseEvent = async ( @@ -45,23 +48,25 @@ const printConfirmSubscription = ( res: Response, next: NextFunction ): Response | void => { - const { Type: type, SubscribeURL: subscribeUrl } = JSON.parse(req.body) - if (type === 'SubscriptionConfirmation') { - const parsed = new URL(subscribeUrl) - if ( - parsed.protocol === 'https:' && - /^sns\.[a-zA-Z0-9-]{3,}\.amazonaws\.com(\.cn)?$/.test(parsed.host) - ) { - logger.info({ - message: 'Confirm the subscription', - type, - subscribeUrl, - action: 'printConfirmSubscription', - }) - return res.sendStatus(202) + tracer.wrap('printConfirmSubscription', () => { + const { Type: type, SubscribeURL: subscribeUrl } = JSON.parse(req.body) + if (type === 'SubscriptionConfirmation') { + const parsed = new URL(subscribeUrl) + if ( + parsed.protocol === 'https:' && + /^sns\.[a-zA-Z0-9-]{3,}\.amazonaws\.com(\.cn)?$/.test(parsed.host) + ) { + logger.info({ + message: 'Confirm the subscription', + type, + subscribeUrl, + action: 'printConfirmSubscription', + }) + return res.sendStatus(202) + } } - } - return next() + return next() + }) } export const EmailCallbackMiddleware = { isAuthenticated, diff --git a/backend/src/email/services/email-callback.service.ts b/backend/src/email/services/email-callback.service.ts index 62538c2c3..231c47bbb 100644 --- a/backend/src/email/services/email-callback.service.ts +++ b/backend/src/email/services/email-callback.service.ts @@ -2,6 +2,7 @@ import { Request } from 'express' import { ses, sendgrid } from '@email/utils/callback/parsers' import config from '@core/config' import { loggerWithLabel } from '@core/logger' +import { tracer } from 'dd-trace' const logger = loggerWithLabel(module) @@ -24,7 +25,9 @@ const isAuthenticated = (authHeader?: string): boolean => { } const parseEvent = async (req: Request): Promise => { + const parseJsonSpan = tracer.startSpan('parseJson') const parsed = JSON.parse(req.body) + parseJsonSpan.finish() let records: Promise[] = [] if (ses.isEvent(req)) { // body could be one record or an array of records, hence we concat diff --git a/backend/src/email/services/email-transactional.service.ts b/backend/src/email/services/email-transactional.service.ts index fb7afb79d..3714587a9 100644 --- a/backend/src/email/services/email-transactional.service.ts +++ b/backend/src/email/services/email-transactional.service.ts @@ -16,6 +16,7 @@ import { } from '@core/constants' import { Order } from 'sequelize/types/model' import { Op, WhereOptions } from 'sequelize' +import tracer from 'dd-trace' const logger = loggerWithLabel(module) @@ -130,103 +131,107 @@ async function handleStatusCallbacks( id: string, metadata: CallbackMetaData ): Promise { - const emailMessageTransactional = await EmailMessageTransactional.findByPk(id) - if (!emailMessageTransactional) { - throw new Error(`Failed to find emailMessageTransactional for id: ${id}`) - } + tracer.wrap('handleStatusCallbacks', async () => { + const emailMessageTransactional = await EmailMessageTransactional.findByPk( + id + ) + if (!emailMessageTransactional) { + throw new Error(`Failed to find emailMessageTransactional for id: ${id}`) + } - const mainRecipientDelivered = metadata.delivery?.recipients?.find( - (e) => e === emailMessageTransactional.recipient - ) - const mainRecipientBounced = metadata.bounce?.bouncedRecipients?.find( - (e) => e.emailAddress === emailMessageTransactional.recipient - ) - const mainRecipientComplained = - metadata.complaint?.complainedRecipients?.find( + const mainRecipientDelivered = metadata.delivery?.recipients?.find( + (e) => e === emailMessageTransactional.recipient + ) + const mainRecipientBounced = metadata.bounce?.bouncedRecipients?.find( (e) => e.emailAddress === emailMessageTransactional.recipient ) + const mainRecipientComplained = + metadata.complaint?.complainedRecipients?.find( + (e) => e.emailAddress === emailMessageTransactional.recipient + ) - switch (type) { - case SesEventType.Delivery: - if (mainRecipientDelivered) { - await EmailMessageTransactional.update( - { - status: TransactionalEmailMessageStatus.Delivered, - deliveredAt: metadata.timestamp, - }, - { - where: { id }, - } - ) - } - break - case SesEventType.Bounce: - // check that bounce applies to the main recipient - if (mainRecipientBounced) { + switch (type) { + case SesEventType.Delivery: + if (mainRecipientDelivered) { + await EmailMessageTransactional.update( + { + status: TransactionalEmailMessageStatus.Delivered, + deliveredAt: metadata.timestamp, + }, + { + where: { id }, + } + ) + } + break + case SesEventType.Bounce: + // check that bounce applies to the main recipient + if (mainRecipientBounced) { + await EmailMessageTransactional.update( + { + status: TransactionalEmailMessageStatus.Bounced, + errorCode: + metadata.bounce?.bounceType === 'Permanent' + ? 'Hard bounce' + : 'Soft bounce', + errorSubType: metadata.bounce?.bounceSubType, + }, + { + where: { id }, + } + ) + } + break + case SesEventType.Complaint: + // check that complaint applies to the main recipient + if (mainRecipientComplained) { + await EmailMessageTransactional.update( + { + status: TransactionalEmailMessageStatus.Complaint, + errorCode: metadata.complaint?.complaintFeedbackType, + errorSubType: metadata.complaint?.complaintSubType, + }, + { + where: { id }, + } + ) + } + break + case SesEventType.Open: + // Cannot check that open applies to the main recipient + // we only update the DB if there was no previous error await EmailMessageTransactional.update( { - status: TransactionalEmailMessageStatus.Bounced, - errorCode: - metadata.bounce?.bounceType === 'Permanent' - ? 'Hard bounce' - : 'Soft bounce', - errorSubType: metadata.bounce?.bounceSubType, + status: TransactionalEmailMessageStatus.Opened, + openedAt: metadata.timestamp, }, { - where: { id }, + where: { id, errorCode: null }, } ) - } - break - case SesEventType.Complaint: - // check that complaint applies to the main recipient - if (mainRecipientComplained) { + break + case SesEventType.Send: + // Cannot check that send applies to the main recipient + // we only update the DB if there was no previous error await EmailMessageTransactional.update( { - status: TransactionalEmailMessageStatus.Complaint, - errorCode: metadata.complaint?.complaintFeedbackType, - errorSubType: metadata.complaint?.complaintSubType, + status: TransactionalEmailMessageStatus.Sent, + sentAt: metadata.timestamp, }, { - where: { id }, + where: { id, errorCode: null }, } ) - } - break - case SesEventType.Open: - // Cannot check that open applies to the main recipient - // we only update the DB if there was no previous error - await EmailMessageTransactional.update( - { - status: TransactionalEmailMessageStatus.Opened, - openedAt: metadata.timestamp, - }, - { - where: { id, errorCode: null }, - } - ) - break - case SesEventType.Send: - // Cannot check that send applies to the main recipient - // we only update the DB if there was no previous error - await EmailMessageTransactional.update( - { - status: TransactionalEmailMessageStatus.Sent, - sentAt: metadata.timestamp, - }, - { - where: { id, errorCode: null }, - } - ) - break - default: - logger.warn({ - message: 'Unable to handle messages with this type', - type, - id, - metadata, - }) - } + break + default: + logger.warn({ + message: 'Unable to handle messages with this type', + type, + id, + metadata, + }) + } + }) } async function listMessages({ diff --git a/backend/src/email/utils/callback/parsers/ses.ts b/backend/src/email/utils/callback/parsers/ses.ts index 44269a160..71e948119 100644 --- a/backend/src/email/utils/callback/parsers/ses.ts +++ b/backend/src/email/utils/callback/parsers/ses.ts @@ -14,6 +14,7 @@ import config from '@core/config' import { compareSha256Hash } from '@shared/utils/crypto' import { EmailTransactionalService } from '@email/services/email-transactional.service' import { SesEventType, Metadata } from '@email/interfaces/callback.interface' +import { tracer } from 'dd-trace' const logger = loggerWithLabel(module) const REFERENCE_ID_HEADER_V2 = 'X-SMTPAPI' // Case sensitive @@ -137,45 +138,47 @@ const parseNotificationAndEvent = async ( message: any, metadata: Metadata ): Promise => { - if (!isNotificationAndEventForMainRecipient(message, type)) { - logger.info({ - message: 'SES notification or event is not for the main recipient', - action: 'filterNotification', - body: message, - }) - return - } - switch (type) { - case SesEventType.Delivery: - await updateDeliveredStatus(metadata) - break - case SesEventType.Bounce: - await updateBouncedStatus({ - ...metadata, - bounceType: message?.bounce?.bounceType, - bounceSubType: message?.bounce?.bounceSubType, - to: message?.mail?.commonHeaders?.to, - }) - break - case SesEventType.Complaint: - await updateComplaintStatus({ - ...metadata, - complaintType: message?.complaint?.complaintFeedbackType, - complaintSubType: message?.complaint?.complaintSubType, - to: message?.mail?.commonHeaders?.to, - }) - break - case SesEventType.Open: - await updateReadStatus(metadata) - break - default: - logger.warn({ - message: 'Unable to handle messages with this type', - action: 'parseNotification', - type, + tracer.wrap('parseNotificationAndEvent', async () => { + if (!isNotificationAndEventForMainRecipient(message, type)) { + logger.info({ + message: 'SES notification or event is not for the main recipient', + action: 'filterNotification', + body: message, }) return - } + } + switch (type) { + case SesEventType.Delivery: + await updateDeliveredStatus(metadata) + break + case SesEventType.Bounce: + await updateBouncedStatus({ + ...metadata, + bounceType: message?.bounce?.bounceType, + bounceSubType: message?.bounce?.bounceSubType, + to: message?.mail?.commonHeaders?.to, + }) + break + case SesEventType.Complaint: + await updateComplaintStatus({ + ...metadata, + complaintType: message?.complaint?.complaintFeedbackType, + complaintSubType: message?.complaint?.complaintSubType, + to: message?.mail?.commonHeaders?.to, + }) + break + case SesEventType.Open: + await updateReadStatus(metadata) + break + default: + logger.warn({ + message: 'Unable to handle messages with this type', + action: 'parseNotification', + type, + }) + return + } + }) } // Validate SES record hash, returns message ID if valid, otherwise throw errors @@ -183,28 +186,30 @@ const validateRecord = async ( record: SesRecord, smtpHeader: SmtpApiHeader | undefined ) => { - const username = smtpHeader?.auth?.username - const hash = smtpHeader?.auth?.hash - if ( - !username || - !hash || - !compareSha256Hash(config.get('emailCallback.hashSecret'), username, hash) - ) { - logger.info({ - message: 'Incorrect email callback hash', - username, - timestamp: record.Timestamp, - hash, - }) - - // if not passed with the new hash, retry with the old way - // TODO: remove this after all campaigns sent with the old way have completed + tracer.wrap('validateRecord', async () => { + const username = smtpHeader?.auth?.username + const hash = smtpHeader?.auth?.hash if ( - !(record.SignatureVersion === '1' && (await validateSignature(record))) + !username || + !hash || + !compareSha256Hash(config.get('emailCallback.hashSecret'), username, hash) ) { - throw new Error('Invalid record') + logger.info({ + message: 'Incorrect email callback hash', + username, + timestamp: record.Timestamp, + hash, + }) + + // if not passed with the new hash, retry with the old way + // TODO: remove this after all campaigns sent with the old way have completed + if ( + !(record.SignatureVersion === '1' && (await validateSignature(record))) + ) { + throw new Error('Invalid record') + } } - } + }) } const blacklistIfNeeded = async (message: any): Promise => { const notificationType = message?.notificationType || message?.eventType @@ -223,40 +228,48 @@ const blacklistIfNeeded = async (message: any): Promise => { } } const parseRecord = async (record: SesRecord): Promise => { - logger.info({ - message: 'Parsing SES callback record', - }) - const message = JSON.parse(record.Message) - const smtpApiHeader = getSmtpApiHeader(message) - await validateRecord(record, smtpApiHeader) + tracer.wrap('parseRecord', async () => { + logger.info({ + message: 'Parsing SES callback record', + }) + const parseRecordJson = tracer.startSpan('parseRecordJson') + const message = JSON.parse(record.Message) + parseRecordJson.finish() + const smtpApiHeader = getSmtpApiHeader(message) + await validateRecord(record, smtpApiHeader) - // Transactional emails don't have message IDs, so blacklist - // relevant email addresses before everything else - await blacklistIfNeeded(message) + // Transactional emails don't have message IDs, so blacklist + // relevant email addresses before everything else + await blacklistIfNeeded(message) - // primary key - const messageId = smtpApiHeader?.unique_args?.message_id - const isTransactional = smtpApiHeader?.isTransactional - const type = message?.notificationType || message?.eventType + // primary key + const messageId = smtpApiHeader?.unique_args?.message_id + const isTransactional = smtpApiHeader?.isTransactional + const type = message?.notificationType || message?.eventType - if (messageId && type) { - const metadata = { messageId, timestamp: record.Timestamp } - logger.info({ - message: 'Update for notification/event type', - action: 'parseRecord', - messageId, - type, - }) - if (isTransactional) { - return EmailTransactionalService.handleStatusCallbacks(type, messageId, { - timestamp: new Date(record.Timestamp), - bounce: message.bounce, - complaint: message.complaint, - delivery: message.delivery, + if (messageId && type) { + const metadata = { messageId, timestamp: record.Timestamp } + logger.info({ + message: 'Update for notification/event type', + action: 'parseRecord', + messageId, + type, }) + if (isTransactional) { + return EmailTransactionalService.handleStatusCallbacks( + type, + messageId, + { + timestamp: new Date(record.Timestamp), + bounce: message.bounce, + complaint: message.complaint, + delivery: message.delivery, + } + ) + } + return parseNotificationAndEvent(type, message, metadata) } - return parseNotificationAndEvent(type, message, metadata) - } + }) } // Checks whether the notification/event is meant for the main recipient of the email.