Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplemented email analytics prioritizing email opens #20914

Merged
merged 15 commits into from
Sep 5, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,30 @@ class EmailAnalyticsServiceWrapper {
});
}

async fetchLatest({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest started');
async fetchLatestOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest opened events started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchLatest({maxEvents});
const totalEvents = await this.service.fetchLatestOpenedEvents({maxEvents});
const fetchEndDate = new Date();

logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest opens)`);
return totalEvents;
}

async fetchLatestNonOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest non-opened events started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchLatestNonOpenedEvents({maxEvents});
const fetchEndDate = new Date();

logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest)`);
return totalEvents;
}

async fetchMissing({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch missing started');
logging.info('[EmailAnalytics] Fetch missing events started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchMissing({maxEvents});
Expand All @@ -83,7 +94,7 @@ class EmailAnalyticsServiceWrapper {
if (maxEvents < 300) {
return 0;
}
logging.info('[EmailAnalytics] Fetch scheduled started');
logging.info('[EmailAnalytics] Fetch scheduled events started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchScheduled({maxEvents});
Expand All @@ -100,13 +111,34 @@ class EmailAnalyticsServiceWrapper {
}
this.fetching = true;

// NOTE: Data shows we can process ~2500 events per minute on Pro for a large-ish db (150k members).
// This can vary locally, but we should be conservative with the number of events we fetch.
try {
const c1 = await this.fetchLatest({maxEvents: Infinity});
const c2 = await this.fetchMissing({maxEvents: Infinity});

// Only fetch scheduled if we didn't fetch a lot of normal events
await this.fetchScheduled({maxEvents: 20000 - c1 - c2});

// Prioritize opens since they are the most important (only data directly displayed to users)
const c1 = await this.fetchLatestOpenedEvents({maxEvents: 10000});
if (c1 >= 10000) {
cmraible marked this conversation as resolved.
Show resolved Hide resolved
this._restartFetch('high opened event count');
return;
}

// Set limits on how much we fetch without checkings for opened events. During surge events (following newsletter send)
// we want to make sure we don't spend too much time collecting delivery data.
const c2 = await this.fetchLatestNonOpenedEvents({maxEvents: 10000 - c1});
cmraible marked this conversation as resolved.
Show resolved Hide resolved
const c3 = await this.fetchMissing({maxEvents: 10000 - c1 - c2});

// Always restart immediately instead of waiting for the next scheduled job if we're fetching a lot of events
if ((c1 + c2 + c3) > 10000) {
this._restartFetch('high event count');
return;
}

// Only backfill if we're not currently fetching a lot of events
const c4 = await this.fetchScheduled({maxEvents: 10000});
if (c4 > 0) {
this._restartFetch('scheduled backfill');
return;
}

this.fetching = false;
} catch (e) {
logging.error(e, 'Error while fetching email analytics');
Expand All @@ -116,6 +148,12 @@ class EmailAnalyticsServiceWrapper {
}
this.fetching = false;
}

_restartFetch(reason) {
this.fetching = false;
logging.info(`[EmailAnalytics] Restarting fetch due to ${reason}`);
this.startFetch();
}
}

module.exports = EmailAnalyticsServiceWrapper;
138 changes: 117 additions & 21 deletions ghost/core/core/server/services/email-analytics/lib/queries.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,141 @@
const _ = require('lodash');
const debug = require('@tryghost/debug')('services:email-analytics');
const db = require('../../../data/db');
const logging = require('@tryghost/logging');
const {default: ObjectID} = require('bson-objectid');

const MIN_EMAIL_COUNT_FOR_OPEN_RATE = 5;

/** @typedef {'email-analytics-latest-opened'|'email-analytics-latest-others'|'email-analytics-missing'|'email-analytics-scheduled'} EmailAnalyticsJobName */
/** @typedef {'delivered'|'opened'|'failed'} EmailAnalyticsEvent */

module.exports = {
async shouldFetchStats() {
// don't fetch stats from Mailgun if we haven't sent any emails
const [emailCount] = await db.knex('emails').count('id as count');
return emailCount && emailCount.count > 0;
},

async getLastSeenEventTimestamp() {
/**
* Retrieves the timestamp of the last seen event for the specified email analytics events.
* @param {EmailAnalyticsJobName} jobName - The name of the job to update.
* @param {EmailAnalyticsEvent[]} [events=['delivered', 'opened', 'failed']] - The email analytics events to consider.
* @returns {Promise<Date|null>} The timestamp of the last seen event, or null if no events are found.
*/
async getLastEventTimestamp(jobName, events = ['delivered', 'opened', 'failed']) {
const startDate = new Date();

// three separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns
let {maxDeliveredAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() || {};
let {maxOpenedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() || {};
let {maxFailedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first() || {};

if (maxDeliveredAt && !(maxDeliveredAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxDeliveredAt = new Date(maxDeliveredAt);

let maxOpenedAt;
let maxDeliveredAt;
let maxFailedAt;

const jobData = await db.knex('jobs').select('finished_at', 'started_at').where('name', jobName).first();

if (jobData) {
debug(`Using job data for ${jobName}`);
const lastJobTimestamp = jobData.finished_at || jobData.started_at;
maxOpenedAt = events.includes('opened') ? lastJobTimestamp : null;
maxDeliveredAt = events.includes('delivered') ? lastJobTimestamp : null;
maxFailedAt = events.includes('failed') ? lastJobTimestamp : null;
} else {
debug(`Job data not found for ${jobName}, using email_recipients data`);
logging.info(`Job data not found for ${jobName}, using email_recipients data`);
if (events.includes('opened')) {
maxOpenedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first()).maxOpenedAt;
}
if (events.includes('delivered')) {
maxDeliveredAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first()).maxDeliveredAt;
}
if (events.includes('failed')) {
maxFailedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first()).maxFailedAt;
}

// Insert a new job row if it doesn't exist
await db.knex('jobs').insert({
id: new ObjectID().toHexString(),
name: jobName,
started_at: new Date(),
created_at: new Date(),
status: 'started'
}).onConflict('name').ignore();
}

if (maxOpenedAt && !(maxOpenedAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxOpenedAt = new Date(maxOpenedAt);
}

if (maxFailedAt && !(maxFailedAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxFailedAt = new Date(maxFailedAt);
}
// Convert string dates to Date objects for SQLite compatibility
[maxOpenedAt, maxDeliveredAt, maxFailedAt] = [maxOpenedAt, maxDeliveredAt, maxFailedAt].map(date => (
date && !(date instanceof Date) ? new Date(date) : date
));

const lastSeenEventTimestamp = _.max([maxDeliveredAt, maxOpenedAt, maxFailedAt]);
const lastSeenEventTimestamp = _.max([maxOpenedAt, maxDeliveredAt, maxFailedAt]);
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);

return lastSeenEventTimestamp;
},

/**
* Sets the timestamp of the last seen event for the specified email analytics events.
* @param {EmailAnalyticsJobName} jobName - The name of the job to update.
* @param {'completed'|'started'} field - The field to update.
* @param {Date} date - The timestamp of the last seen event.
* @returns {Promise<void>}
* @description
* Updates the `finished_at` or `started_at` column of the specified job in the `jobs` table with the provided timestamp.
* This is used to keep track of the last time the job was run to avoid expensive queries following reboot.
*/
async setJobTimestamp(jobName, field, date) {
// Convert string dates to Date objects for SQLite compatibility
try {
debug(`Setting ${field} timestamp for job ${jobName} to ${date}`);
const updateField = field === 'completed' ? 'finished_at' : 'started_at';
const status = field === 'completed' ? 'finished' : 'started';
const result = await db.knex('jobs').update({[updateField]: date, updated_at: new Date(), status: status}).where('name', jobName);
if (result === 0) {
await db.knex('jobs').insert({
id: new ObjectID().toHexString(),
name: jobName,
[updateField]: date,
updated_at: date,
status: status
});
}
} catch (err) {
debug(`Error setting ${field} timestamp for job ${jobName}: ${err.message}`);
}
},

/**
* Sets the status of the specified email analytics job.
* @param {EmailAnalyticsJobName} jobName - The name of the job to update.
* @param {'started'|'finished'|'failed'} status - The new status of the job.
* @returns {Promise<void>}
* @description
* Updates the `status` column of the specified job in the `jobs` table with the provided status.
* This is used to keep track of the current state of the job.
*/
async setJobStatus(jobName, status) {
debug(`Setting status for job ${jobName} to ${status}`);
try {
const result = await db.knex('jobs')
.update({
status: status,
updated_at: new Date()
})
.where('name', jobName);

if (result === 0) {
await db.knex('jobs').insert({
id: new ObjectID().toHexString(),
name: jobName,
status: status,
created_at: new Date(),
updated_at: new Date()
});
}
} catch (err) {
debug(`Error setting status for job ${jobName}: ${err.message}`);
throw err;
}
},

async aggregateEmailStats(emailId) {
const {totalCount} = await db.knex('emails').select(db.knex.raw('email_count as totalCount')).where('id', emailId).first() || {totalCount: 0};
// use IS NULL here because that will typically match far fewer rows than IS NOT NULL making the query faster
Expand Down Expand Up @@ -78,4 +174,4 @@ module.exports = {
.update(updateQuery)
.where('id', memberId);
}
};
};
Loading