Skip to content

Commit

Permalink
Updated fetchMissing to use db-persisted values - this helps tremendo…
Browse files Browse the repository at this point in the history
…usly with handles reboots to ensure we re-fetch every event to capture missing events
  • Loading branch information
9larsons committed Sep 3, 2024
1 parent 719316d commit d797848
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 20 deletions.
57 changes: 41 additions & 16 deletions ghost/core/core/server/services/email-analytics/lib/queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ const MIN_EMAIL_COUNT_FOR_OPEN_RATE = 5;
/** @typedef {'email-analytics-latest-opened'|'email-analytics-latest-others'|'email-analytics-missing'|'email-analytics-scheduled'} EmailAnalyticsJobName */
/** @typedef {'delivered'|'opened'|'failed'} EmailAnalyticsEvent */

/**
* Creates a job in the jobs table if it does not already exist.
* @param {EmailAnalyticsJobName} jobName - The name of the job to create.
* @returns {Promise<void>}
*/
async function createJobIfNotExists(jobName) {
await db.knex('jobs').insert({
id: new ObjectID().toHexString(),
name: jobName,
started_at: new Date(),
created_at: new Date(),
status: 'started'
}).onConflict('name').ignore();
}

module.exports = {
async shouldFetchStats() {
// don't fetch stats from Mailgun if we haven't sent any emails
Expand All @@ -28,15 +43,13 @@ module.exports = {
let maxOpenedAt;
let maxDeliveredAt;
let maxFailedAt;
const lastJobRunTimestamp = await this.getLastJobRunTimestamp(jobName);

const jobData = await db.knex('jobs').select('finished_at', 'started_at').where('name', jobName).first();

if (jobData) {
if (lastJobRunTimestamp) {
debug(`Using job data for ${jobName}`);
const lastJobTimestamp = jobData.finished_at || jobData.started_at;
maxOpenedAt = events.includes('opened') ? lastJobTimestamp : null;
maxDeliveredAt = events.includes('delivered') ? lastJobTimestamp : null;
maxFailedAt = events.includes('failed') ? lastJobTimestamp : null;
maxOpenedAt = events.includes('opened') ? lastJobRunTimestamp : null;
maxDeliveredAt = events.includes('delivered') ? lastJobRunTimestamp : null;
maxFailedAt = events.includes('failed') ? lastJobRunTimestamp : null;
} else {
debug(`Job data not found for ${jobName}, using email_recipients data`);
logging.info(`Job data not found for ${jobName}, using email_recipients data`);
Expand All @@ -50,14 +63,7 @@ module.exports = {
maxFailedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first()).maxFailedAt;
}

// Insert a new job row if it doesn't exist
await db.knex('jobs').insert({
id: new ObjectID().toHexString(),
name: jobName,
started_at: new Date(),
created_at: new Date(),
status: 'started'
}).onConflict('name').ignore();
await createJobIfNotExists(jobName);
}

// Convert string dates to Date objects for SQLite compatibility
Expand All @@ -66,11 +72,30 @@ module.exports = {
));

const lastSeenEventTimestamp = _.max([maxOpenedAt, maxDeliveredAt, maxFailedAt]);
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);
debug(`getLastEventTimestamp: finished in ${Date.now() - startDate}ms`);

return lastSeenEventTimestamp;
},

/**
* Retrieves the job data for the specified job name.
* @param {EmailAnalyticsJobName} jobName - The name of the job to retrieve data for.
* @returns {Promise<Object|null>} The job data, or null if no job data is found.
*/
async getJobData(jobName) {
return await db.knex('jobs').select('finished_at', 'started_at').where('name', jobName).first();
},

/**
* Retrieves the timestamp of the last job run for the specified job name.
* @param {EmailAnalyticsJobName} jobName - The name of the job to retrieve the last run timestamp for.
* @returns {Promise<Date|null>} The timestamp of the last job run, or null if no job data is found.
*/
async getLastJobRunTimestamp(jobName) {
const jobData = await this.getJobData(jobName);
return jobData ? jobData.finished_at || jobData.started_at : null;
},

/**
* Sets the timestamp of the last seen event for the specified email analytics events.
* @param {EmailAnalyticsJobName} jobName - The name of the job to update.
Expand Down
12 changes: 8 additions & 4 deletions ghost/email-analytics-service/lib/EmailAnalyticsService.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,20 @@ module.exports = class EmailAnalyticsService {
return this.#fetchLatestOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(this.#fetchLatestOpenedData.jobName,['opened'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS);
}

/**
* Returns the timestamp of the last missing event we processed. Defaults to now minus 2h if we have no data yet.
*/
async getLastMissingEventTimestamp() {
return this.#fetchMissingData?.lastEventTimestamp ?? (await this.queries.getLastJobRunTimestamp(this.#fetchMissingData.jobName)) ?? new Date(Date.now() - TRUST_THRESHOLD_MS * 4);
}

/**
* Fetches the latest opened events.
* @param {Object} options - The options for fetching events.
* @param {number} [options.maxEvents=Infinity] - The maximum number of events to fetch.
* @returns {Promise<number>} The total number of events fetched.
*/
async fetchLatestOpenedEvents({maxEvents = Infinity} = {}) {
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
const begin = await this.getLastOpenedEventTimestamp();
const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // Always stop at x minutes ago to give Mailgun a bit more time to stabilize storage

Expand All @@ -132,7 +138,6 @@ module.exports = class EmailAnalyticsService {
* @returns {Promise<number>} The total number of events fetched.
*/
async fetchLatestNonOpenedEvents({maxEvents = Infinity} = {}) {
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
const begin = await this.getLastNonOpenedEventTimestamp();
const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // Always stop at x minutes ago to give Mailgun a bit more time to stabilize storage

Expand All @@ -151,8 +156,7 @@ module.exports = class EmailAnalyticsService {
* @param {number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks.
*/
async fetchMissing({maxEvents = Infinity} = {}) {
// We start where we left of, or 1,5h ago after a server restart
const begin = this.#fetchMissingData?.lastEventTimestamp ?? this.#fetchMissingData?.lastBegin ?? new Date(Date.now() - TRUST_THRESHOLD_MS * 3);
const begin = await this.getLastMissingEventTimestamp();

// Always stop at the earlier of the time the fetchLatest started fetching on or 30 minutes ago
const end = new Date(
Expand Down

0 comments on commit d797848

Please sign in to comment.