From 86538baff7037598788e38b68495f4d3958d52bc Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Fri, 6 Oct 2023 09:18:44 +0300 Subject: [PATCH] fix(chat): Fixed chat feature support for older Redis versions --- workers/documents.js | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/workers/documents.js b/workers/documents.js index 0888ac17..69053354 100644 --- a/workers/documents.js +++ b/workers/documents.js @@ -10,7 +10,7 @@ const crypto = require('crypto'); const { readEnvValue, threadStats, getDuration } = require('../lib/tools'); -const GB_COLLECT_DELAY = 100; // 6 * 3600 * 1000; // 6h +const GB_COLLECT_DELAY = 6 * 3600 * 1000; // 6h const GB_FAILURE_DELAY = 3 * 1000; const GB_EMPTY_DELAY = 10 * 1000; @@ -940,11 +940,18 @@ documentsWorker.on('failed', async job => { }); const clearExpungedEmbeddings = async () => { - const { index, client } = await getESClient(logger); + if (!(await redis.exists(`${REDIS_PREFIX}expungequeue`))) { + // nothing to do here + await new Promise(resolve => { + let timer = setTimeout(resolve, GB_EMPTY_DELAY); + timer.unref(); + }); + return; + } + let rangeEnd = Date.now() - GB_COLLECT_DELAY; try { - let expungedEntry = await redis.zrange(`${REDIS_PREFIX}expungequeue`, 0, rangeEnd, 'BYSCORE', 'LIMIT', '0', '1'); - + let expungedEntry = await redis.zrangebyscore(`${REDIS_PREFIX}expungequeue`, 0, rangeEnd, 'LIMIT', '0', '1'); if (!expungedEntry?.length) { await new Promise(resolve => { let timer = setTimeout(resolve, GB_EMPTY_DELAY); @@ -978,8 +985,9 @@ const clearExpungedEmbeddings = async () => { } }; - let existingResult; + const { index, client } = await getESClient(logger); + let existingResult; try { existingResult = await client.search({ index,