Skip to content

Commit

Permalink
Remove embeddings for deleted emails
Browse files Browse the repository at this point in the history
  • Loading branch information
andris9 committed Sep 27, 2023
1 parent da65a39 commit d3ec6f7
Showing 1 changed file with 170 additions and 1 deletion.
171 changes: 170 additions & 1 deletion workers/documents.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ const crypto = require('crypto');

const { readEnvValue, threadStats, getDuration } = require('../lib/tools');

const GB_COLLECT_DELAY = 100; // 6 * 3600 * 1000; // 6h
const GB_FAILURE_DELAY = 3 * 1000;
const GB_EMPTY_DELAY = 10 * 1000;

const Bugsnag = require('@bugsnag/js');
if (readEnvValue('BUGSNAG_API_KEY')) {
Bugsnag.start({
Expand Down Expand Up @@ -597,13 +601,28 @@ const documentsWorker = new Worker(
let deleteResult = null;

try {
let messageIdHeader;
try {
// resolve Message-ID value for deleted email
let getResult = await client.get({
index,
id: `${job.data.account}:${messageId}`,
_source_includes: ['messageId']
});
messageIdHeader = (getResult?._source?.messageId || '').toString().trim();
} catch (err) {
logger.error({ msg: 'Failed to retrieve Message-ID for deleted email', account: job.data.account, message: messageId, err });
}

deleteResult = await client.delete({
index,
id: `${job.data.account}:${messageId}`
});

// add to embeddings delete queue
await redis.zadd(`${REDIS_PREFIX}expungequeue`, Date.now(), `${job.data.account}:${messageId}`);
if (messageIdHeader) {
await redis.zadd(`${REDIS_PREFIX}expungequeue`, Date.now(), `${job.data.account}:${messageIdHeader}`);
}
} catch (err) {
switch (err.meta && err.meta.body && err.meta.body.result) {
case 'not_found':
Expand Down Expand Up @@ -918,4 +937,154 @@ documentsWorker.on('failed', async job => {
});
});

const clearExpungedEmbeddings = async () => {
const { index, client } = await getESClient(logger);
let rangeEnd = Date.now() - GB_COLLECT_DELAY;
try {
let expungedEntry = await redis.zrange(`${REDIS_PREFIX}expungequeue`, 0, rangeEnd, 'BYSCORE', 'LIMIT', '0', '1');

if (!expungedEntry?.length) {
await new Promise(resolve => {
let timer = setTimeout(resolve, GB_EMPTY_DELAY);
timer.unref();
});
return;
}

if (expungedEntry && expungedEntry.length) {
expungedEntry = expungedEntry[0];
let [account, ...messageId] = expungedEntry.split(':');
if (messageId) {
messageId = messageId.join(':');
}

if (account && messageId) {
let matchQuery = {
bool: {
must: [
{
term: {
account
}
},
{
term: {
messageId
}
}
]
}
};

let existingResult;

try {
existingResult = await client.search({
index,
size: 1,
query: matchQuery,
_source: false
});

if (!existingResult || !existingResult.hits) {
logger.error({
msg: 'Failed to run query to find emails by Message-ID. Empty result.',
account,
messageId,
existingResult
});
throw new Error('Empty result');
}
} catch (err) {
logger.error({
msg: 'Failed to run query to find emails by Message-ID',
account,
messageId,
err
});
throw err;
}

if (existingResult?.hits?.total?.value === 0) {
// can purge embeddings
logger.trace({
msg: 'Deleting embeddings for a missing email',
account,
messageId
});
try {
let deleteResult = await client.deleteByQuery({
index: `${index}.embeddings`,
query: {
bool: {
must: [
{
term: {
account
}
},

{
term: {
messageId
}
}
]
}
}
});
if (deleteResult?.deleted) {
logger.info({
msg: 'Deleted embeddings for a missing email',
account,
messageId,
deleted: deleteResult?.deleted
});
}
// clear existing entry
await redis.zrem(`${REDIS_PREFIX}expungequeue`, `${account}:${messageId}`);
logger.trace({
msg: 'Removed entry from expunge queue',
account,
messageId
});
} catch (err) {
logger.info({
msg: 'Dailed to delete embeddings for a missing email',
account,
messageId,
err
});
}
} else {
// clear existing entry
await redis.zrem(`${REDIS_PREFIX}expungequeue`, `${account}:${messageId}`);
logger.trace({
msg: 'Removed still existing entry from expunge queue',
account,
messageId
});
}
}
}
} catch (err) {
logger.error({ msg: 'Failed to retrieve expunged entries', rangeStart: 0, rangeEnd, err });
await new Promise(resolve => {
let timer = setTimeout(resolve, GB_FAILURE_DELAY);
timer.unref();
});
return;
}
};

function runGarbageCollector() {
clearExpungedEmbeddings()
.catch(err => {
logger.error({ msg: 'Failed to run garbage collector for embeddings', err });
})
.finally(() => runGarbageCollector());
}

logger.info({ msg: 'Started Documents worker thread', version: packageData.version });

runGarbageCollector();

0 comments on commit d3ec6f7

Please sign in to comment.