diff --git a/ghost/core/core/server/services/email-analytics/lib/queries.js b/ghost/core/core/server/services/email-analytics/lib/queries.js index 93185f62aaf..dfd3ece7be6 100644 --- a/ghost/core/core/server/services/email-analytics/lib/queries.js +++ b/ghost/core/core/server/services/email-analytics/lib/queries.js @@ -9,6 +9,21 @@ const MIN_EMAIL_COUNT_FOR_OPEN_RATE = 5; /** @typedef {'email-analytics-latest-opened'|'email-analytics-latest-others'|'email-analytics-missing'|'email-analytics-scheduled'} EmailAnalyticsJobName */ /** @typedef {'delivered'|'opened'|'failed'} EmailAnalyticsEvent */ +/** + * Creates a job in the jobs table if it does not already exist. + * @param {EmailAnalyticsJobName} jobName - The name of the job to create. + * @returns {Promise} + */ +async function createJobIfNotExists(jobName) { + await db.knex('jobs').insert({ + id: new ObjectID().toHexString(), + name: jobName, + started_at: new Date(), + created_at: new Date(), + status: 'started' + }).onConflict('name').ignore(); +} + module.exports = { async shouldFetchStats() { // don't fetch stats from Mailgun if we haven't sent any emails @@ -28,15 +43,13 @@ module.exports = { let maxOpenedAt; let maxDeliveredAt; let maxFailedAt; + const lastJobRunTimestamp = await this.getLastJobRunTimestamp(jobName); - const jobData = await db.knex('jobs').select('finished_at', 'started_at').where('name', jobName).first(); - - if (jobData) { + if (lastJobRunTimestamp) { debug(`Using job data for ${jobName}`); - const lastJobTimestamp = jobData.finished_at || jobData.started_at; - maxOpenedAt = events.includes('opened') ? lastJobTimestamp : null; - maxDeliveredAt = events.includes('delivered') ? lastJobTimestamp : null; - maxFailedAt = events.includes('failed') ? lastJobTimestamp : null; + maxOpenedAt = events.includes('opened') ? lastJobRunTimestamp : null; + maxDeliveredAt = events.includes('delivered') ? lastJobRunTimestamp : null; + maxFailedAt = events.includes('failed') ? lastJobRunTimestamp : null; } else { debug(`Job data not found for ${jobName}, using email_recipients data`); logging.info(`Job data not found for ${jobName}, using email_recipients data`); @@ -50,14 +63,7 @@ module.exports = { maxFailedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first()).maxFailedAt; } - // Insert a new job row if it doesn't exist - await db.knex('jobs').insert({ - id: new ObjectID().toHexString(), - name: jobName, - started_at: new Date(), - created_at: new Date(), - status: 'started' - }).onConflict('name').ignore(); + await createJobIfNotExists(jobName); } // Convert string dates to Date objects for SQLite compatibility @@ -66,11 +72,30 @@ module.exports = { )); const lastSeenEventTimestamp = _.max([maxOpenedAt, maxDeliveredAt, maxFailedAt]); - debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`); + debug(`getLastEventTimestamp: finished in ${Date.now() - startDate}ms`); return lastSeenEventTimestamp; }, + /** + * Retrieves the job data for the specified job name. + * @param {EmailAnalyticsJobName} jobName - The name of the job to retrieve data for. + * @returns {Promise} The job data, or null if no job data is found. + */ + async getJobData(jobName) { + return await db.knex('jobs').select('finished_at', 'started_at').where('name', jobName).first(); + }, + + /** + * Retrieves the timestamp of the last job run for the specified job name. + * @param {EmailAnalyticsJobName} jobName - The name of the job to retrieve the last run timestamp for. + * @returns {Promise} The timestamp of the last job run, or null if no job data is found. + */ + async getLastJobRunTimestamp(jobName) { + const jobData = await this.getJobData(jobName); + return jobData ? jobData.finished_at || jobData.started_at : null; + }, + /** * Sets the timestamp of the last seen event for the specified email analytics events. * @param {EmailAnalyticsJobName} jobName - The name of the job to update. diff --git a/ghost/email-analytics-service/lib/EmailAnalyticsService.js b/ghost/email-analytics-service/lib/EmailAnalyticsService.js index 589080b5cc2..1365ec2cf69 100644 --- a/ghost/email-analytics-service/lib/EmailAnalyticsService.js +++ b/ghost/email-analytics-service/lib/EmailAnalyticsService.js @@ -105,6 +105,13 @@ module.exports = class EmailAnalyticsService { return this.#fetchLatestOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(this.#fetchLatestOpenedData.jobName,['opened'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS); } + /** + * Returns the timestamp of the last missing event we processed. Defaults to now minus 2h if we have no data yet. + */ + async getLastMissingEventTimestamp() { + return this.#fetchMissingData?.lastEventTimestamp ?? (await this.queries.getLastJobRunTimestamp(this.#fetchMissingData.jobName)) ?? new Date(Date.now() - TRUST_THRESHOLD_MS * 4); + } + /** * Fetches the latest opened events. * @param {Object} options - The options for fetching events. @@ -112,7 +119,6 @@ module.exports = class EmailAnalyticsService { * @returns {Promise} The total number of events fetched. */ async fetchLatestOpenedEvents({maxEvents = Infinity} = {}) { - // Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available const begin = await this.getLastOpenedEventTimestamp(); const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // Always stop at x minutes ago to give Mailgun a bit more time to stabilize storage @@ -132,7 +138,6 @@ module.exports = class EmailAnalyticsService { * @returns {Promise} The total number of events fetched. */ async fetchLatestNonOpenedEvents({maxEvents = Infinity} = {}) { - // Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available const begin = await this.getLastNonOpenedEventTimestamp(); const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // Always stop at x minutes ago to give Mailgun a bit more time to stabilize storage @@ -151,8 +156,7 @@ module.exports = class EmailAnalyticsService { * @param {number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks. */ async fetchMissing({maxEvents = Infinity} = {}) { - // We start where we left of, or 1,5h ago after a server restart - const begin = this.#fetchMissingData?.lastEventTimestamp ?? this.#fetchMissingData?.lastBegin ?? new Date(Date.now() - TRUST_THRESHOLD_MS * 3); + const begin = await this.getLastMissingEventTimestamp(); // Always stop at the earlier of the time the fetchLatest started fetching on or 30 minutes ago const end = new Date(