Skip to content

Commit

Permalink
add user mbdscore sync workers and cronjob
Browse files Browse the repository at this point in the history
  • Loading branch information
CarlosQ96 committed Sep 25, 2024
1 parent a14c97d commit 5ca6f02
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 0 deletions.
24 changes: 24 additions & 0 deletions src/repositories/qfRoundRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ const qfRoundEstimatedMatchingParamsCacheDuration = Number(
process.env.QF_ROUND_ESTIMATED_MATCHING_CACHE_DURATION || 60000,
);

const qfRoundUsersMissedMBDScore = Number(
process.env.QF_ROUND_USERS_MISSED_SCORE || 0

Check failure on line 19 in src/repositories/qfRoundRepository.ts

View workflow job for this annotation

GitHub Actions / test

Insert `,`
);

const qfRoundsCacheDuration =
(config.get('QF_ROUND_AND_MAIN_CATEGORIES_CACHE_DURATION') as number) ||
1000 * 60 * 2;
Expand Down Expand Up @@ -172,6 +176,26 @@ export const findActiveQfRound = async (
return query.cache('findActiveQfRound', qfRoundsCacheDuration).getOne();
};

export const findUsersWithoutMBDScoreInActiveAround = async (): Promise<number[]> => {

Check failure on line 179 in src/repositories/qfRoundRepository.ts

View workflow job for this annotation

GitHub Actions / test

Replace `number[]` with `⏎··number[]⏎`
const activeQfRoundId = ((await findActiveQfRound())?.id || qfRoundUsersMissedMBDScore);

Check failure on line 180 in src/repositories/qfRoundRepository.ts

View workflow job for this annotation

GitHub Actions / test

Replace `·((await·findActiveQfRound())?.id·||·qfRoundUsersMissedMBDScore)` with `⏎····(await·findActiveQfRound())?.id·||·qfRoundUsersMissedMBDScore`

if (!activeQfRoundId || activeQfRoundId === 0) return [];

const usersMissingMDBScore = await QfRound.query(`

Check failure on line 184 in src/repositories/qfRoundRepository.ts

View workflow job for this annotation

GitHub Actions / test

Insert `⏎····`
SELECT DISTINCT d."userId"
FROM public.donation d
LEFT JOIN user_qf_round_model_score uqrms ON d."userId" = uqrms."userId" AND uqrms."qfRoundId" = $1
WHERE d."qfRoundId" = $1
AND d.status = 'verified'
AND uqrms.id IS NULL
AND d."userId" IS NOT NULL
ORDER BY d."userId";
`, [activeQfRoundId],

Check failure on line 193 in src/repositories/qfRoundRepository.ts

View workflow job for this annotation

GitHub Actions / test

Insert `⏎···`
);

return usersMissingMDBScore;
}

Check failure on line 197 in src/repositories/qfRoundRepository.ts

View workflow job for this annotation

GitHub Actions / test

Insert `;`

export const findQfRoundById = async (id: number): Promise<QfRound | null> => {
return QfRound.createQueryBuilder('qf_round').where(`id = ${id}`).getOne();
};
Expand Down
2 changes: 2 additions & 0 deletions src/server/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import { runCheckUserSuperTokenBalancesJob } from '../services/cronJobs/checkUse
import { runCheckPendingRecurringDonationsCronJob } from '../services/cronJobs/syncRecurringDonationsWithNetwork';
import { runCheckQRTransactionJob } from '../services/cronJobs/checkQRTransactionJob';
import { addClient } from '../services/sse/sse';
import { runCheckPendingUserModelScoreCronjob } from '../services/cronJobs/syncUsersModelScore';

Resource.validate = validate;

Expand Down Expand Up @@ -361,6 +362,7 @@ export async function bootstrap() {
runCheckPendingRecurringDonationsCronJob();
runNotifyMissingDonationsCronJob();
runCheckPendingProjectListingCronJob();
runCheckPendingUserModelScoreCronjob();

if (process.env.PROJECT_REVOKE_SERVICE_ACTIVE === 'true') {
runCheckProjectVerificationStatus();
Expand Down
71 changes: 71 additions & 0 deletions src/services/cronJobs/syncUsersModelScore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { schedule } from "node-cron";

Check failure on line 1 in src/services/cronJobs/syncUsersModelScore.ts

View workflow job for this annotation

GitHub Actions / test

Replace `"node-cron"` with `'node-cron'`
import config from "../../config";

Check failure on line 2 in src/services/cronJobs/syncUsersModelScore.ts

View workflow job for this annotation

GitHub Actions / test

Replace `"../../config"` with `'../../config'`
import { ModuleThread, Pool, spawn, Worker } from "threads";

Check failure on line 3 in src/services/cronJobs/syncUsersModelScore.ts

View workflow job for this annotation

GitHub Actions / test

`threads` import should occur before import of `../../config`

Check failure on line 3 in src/services/cronJobs/syncUsersModelScore.ts

View workflow job for this annotation

GitHub Actions / test

Replace `"threads"` with `'threads'`
import { logger } from "../../utils/logger";
import { findActiveQfRound, findUsersWithoutMBDScoreInActiveAround } from "../../repositories/qfRoundRepository";
import { UserMDBScoreSyncWorker } from "../../workers/usersMBDScoreSyncWorker";
import { findUserById } from "../../repositories/userRepository";
import { UserQfRoundModelScore } from "../../entities/userQfRoundModelScore";

const cronJobTime =
(config.get('MAKE_UNREVIEWED_PROJECT_LISTED_CRONJOB_EXPRESSION') as string) ||
'0 0 * * *';

const qfRoundUsersMissedMBDScore = Number(
process.env.QF_ROUND_USERS_MISSED_SCORE || 0
);

const workerOptions = {
concurrency: Number(
process.env.USER_SCORE_SYNC_THREADS_POOL_CONCURRENCY || 1,
),
name:
process.env.USER_SCORE_SYNC_THREADS_POOL_NAME || 'ProjectFiltersThreadPool',
size: Number(process.env.USER_SCORE_SYNC_THREADS_POOL_SIZE || 4),
};

export const runCheckPendingUserModelScoreCronjob = () => {
logger.debug(
'runCheckPendingUserModelScoreCronjob() has been called, cronJobTime',
cronJobTime,
);
schedule(cronJobTime, async () => {
await updateUsersWithoutMBDScoreInRound();
});
};

export const updateUsersWithoutMBDScoreInRound = async () => {
const usersMDBScoreSyncThreadPool: Pool<ModuleThread<UserMDBScoreSyncWorker>> =
Pool(
() => spawn(new Worker('../workers/usersMBDScoreSyncWoker')),
workerOptions,
);

const userIds = await findUsersWithoutMBDScoreInActiveAround();
const activeQfRoundId = ((await findActiveQfRound())?.id || qfRoundUsersMissedMBDScore);
if (!activeQfRoundId || activeQfRoundId === 0) return;

if (userIds.length === 0) return;

for (const userId of userIds) {
try {
const userWallet = await findUserById(userId);
const userScore = await usersMDBScoreSyncThreadPool.queue(worker =>
worker.syncUserScore({
userWallet,
}),
);
if (userScore) {
const userScoreInRound = UserQfRoundModelScore.create({
userId,
qfRoundId: activeQfRoundId,
score: userScore,
});

await userScoreInRound.save();
}
} catch (e) {
logger.info(`User with Id ${userId} did not sync MBD score this batch`);
}
}
};
20 changes: 20 additions & 0 deletions src/workers/usersMBDScoreSyncWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// workers/auth.js
import { expose } from 'threads/worker';
import { WorkerModule } from 'threads/dist/types/worker';
import { getGitcoinAdapter } from '../adapters/adaptersFactory';

type UsersMBDScoreSyncWorkerFunctions =
| 'syncUserScore';

export type UserMDBScoreSyncWorker =
WorkerModule<UsersMBDScoreSyncWorkerFunctions>;

const worker: UserMDBScoreSyncWorker = {
async syncUserScore(args: {
userWallet: string;
}) {
return await getGitcoinAdapter().getUserAnalysisScore(args.userWallet);
},
};

expose(worker);

0 comments on commit 5ca6f02

Please sign in to comment.