diff --git a/workers/documents.js b/workers/documents.js index 1f6ce1b7..26f6b1b9 100644 --- a/workers/documents.js +++ b/workers/documents.js @@ -10,6 +10,10 @@ const crypto = require('crypto'); const { readEnvValue, threadStats, getDuration } = require('../lib/tools'); +const GB_COLLECT_DELAY = 100; // 6 * 3600 * 1000; // 6h +const GB_FAILURE_DELAY = 3 * 1000; +const GB_EMPTY_DELAY = 10 * 1000; + const Bugsnag = require('@bugsnag/js'); if (readEnvValue('BUGSNAG_API_KEY')) { Bugsnag.start({ @@ -597,13 +601,28 @@ const documentsWorker = new Worker( let deleteResult = null; try { + let messageIdHeader; + try { + // resolve Message-ID value for deleted email + let getResult = await client.get({ + index, + id: `${job.data.account}:${messageId}`, + _source_includes: ['messageId'] + }); + messageIdHeader = (getResult?._source?.messageId || '').toString().trim(); + } catch (err) { + logger.error({ msg: 'Failed to retrieve Message-ID for deleted email', account: job.data.account, message: messageId, err }); + } + deleteResult = await client.delete({ index, id: `${job.data.account}:${messageId}` }); // add to embeddings delete queue - await redis.zadd(`${REDIS_PREFIX}expungequeue`, Date.now(), `${job.data.account}:${messageId}`); + if (messageIdHeader) { + await redis.zadd(`${REDIS_PREFIX}expungequeue`, Date.now(), `${job.data.account}:${messageIdHeader}`); + } } catch (err) { switch (err.meta && err.meta.body && err.meta.body.result) { case 'not_found': @@ -918,4 +937,154 @@ documentsWorker.on('failed', async job => { }); }); +const clearExpungedEmbeddings = async () => { + const { index, client } = await getESClient(logger); + let rangeEnd = Date.now() - GB_COLLECT_DELAY; + try { + let expungedEntry = await redis.zrange(`${REDIS_PREFIX}expungequeue`, 0, rangeEnd, 'BYSCORE', 'LIMIT', '0', '1'); + + if (!expungedEntry?.length) { + await new Promise(resolve => { + let timer = setTimeout(resolve, GB_EMPTY_DELAY); + timer.unref(); + }); + return; + } + + if (expungedEntry && expungedEntry.length) { + expungedEntry = expungedEntry[0]; + let [account, ...messageId] = expungedEntry.split(':'); + if (messageId) { + messageId = messageId.join(':'); + } + + if (account && messageId) { + let matchQuery = { + bool: { + must: [ + { + term: { + account + } + }, + { + term: { + messageId + } + } + ] + } + }; + + let existingResult; + + try { + existingResult = await client.search({ + index, + size: 1, + query: matchQuery, + _source: false + }); + + if (!existingResult || !existingResult.hits) { + logger.error({ + msg: 'Failed to run query to find emails by Message-ID. Empty result.', + account, + messageId, + existingResult + }); + throw new Error('Empty result'); + } + } catch (err) { + logger.error({ + msg: 'Failed to run query to find emails by Message-ID', + account, + messageId, + err + }); + throw err; + } + + if (existingResult?.hits?.total?.value === 0) { + // can purge embeddings + logger.trace({ + msg: 'Deleting embeddings for a missing email', + account, + messageId + }); + try { + let deleteResult = await client.deleteByQuery({ + index: `${index}.embeddings`, + query: { + bool: { + must: [ + { + term: { + account + } + }, + + { + term: { + messageId + } + } + ] + } + } + }); + if (deleteResult?.deleted) { + logger.info({ + msg: 'Deleted embeddings for a missing email', + account, + messageId, + deleted: deleteResult?.deleted + }); + } + // clear existing entry + await redis.zrem(`${REDIS_PREFIX}expungequeue`, `${account}:${messageId}`); + logger.trace({ + msg: 'Removed entry from expunge queue', + account, + messageId + }); + } catch (err) { + logger.info({ + msg: 'Dailed to delete embeddings for a missing email', + account, + messageId, + err + }); + } + } else { + // clear existing entry + await redis.zrem(`${REDIS_PREFIX}expungequeue`, `${account}:${messageId}`); + logger.trace({ + msg: 'Removed still existing entry from expunge queue', + account, + messageId + }); + } + } + } + } catch (err) { + logger.error({ msg: 'Failed to retrieve expunged entries', rangeStart: 0, rangeEnd, err }); + await new Promise(resolve => { + let timer = setTimeout(resolve, GB_FAILURE_DELAY); + timer.unref(); + }); + return; + } +}; + +function runGarbageCollector() { + clearExpungedEmbeddings() + .catch(err => { + logger.error({ msg: 'Failed to run garbage collector for embeddings', err }); + }) + .finally(() => runGarbageCollector()); +} + logger.info({ msg: 'Started Documents worker thread', version: packageData.version }); + +runGarbageCollector();