From ae588a6bff3ded2774c5682854c5e63ebeec2a0a Mon Sep 17 00:00:00 2001 From: antoineludeau <52679050+antoineludeau@users.noreply.github.com> Date: Thu, 6 Jun 2024 15:08:36 +0200 Subject: [PATCH 1/2] Added global error catch for legacy compose --- lib/jobs/compose-commune.cjs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/lib/jobs/compose-commune.cjs b/lib/jobs/compose-commune.cjs index 660ba6aa..8a392aca 100644 --- a/lib/jobs/compose-commune.cjs +++ b/lib/jobs/compose-commune.cjs @@ -3,18 +3,22 @@ const {getCommune, finishComposition} = require('../models/commune.cjs') const composeCommune = require('../compose/index.cjs') async function handle({data: {codeCommune, compositionAskedAt, ignoreIdConfig}}) { - const communeEntry = await getCommune(codeCommune) + try { + const communeEntry = await getCommune(codeCommune) - if (!communeEntry.compositionAskedAt || !isEqual(communeEntry.compositionAskedAt, parseISO(compositionAskedAt))) { - return - } + if (!communeEntry.compositionAskedAt || !isEqual(communeEntry.compositionAskedAt, parseISO(compositionAskedAt))) { + return + } - console.log(`Composition des adresses de la commune ${codeCommune}`) + console.log(`Composition des adresses de la commune ${codeCommune}`) - await composeCommune(codeCommune, ignoreIdConfig) - await finishComposition(codeCommune) + await composeCommune(codeCommune, ignoreIdConfig) + await finishComposition(codeCommune) - console.log(`Composition des adresses de la commune ${codeCommune} terminée`) + console.log(`Composition des adresses de la commune ${codeCommune} terminée`) + } catch (error) { + console.error(`Erreur lors de la composition de la commune ${codeCommune}`, error) + } } module.exports = handle From e4197edcd57c6cca3fcb3a7107bf8fd5c42af124 Mon Sep 17 00:00:00 2001 From: antoineludeau <52679050+antoineludeau@users.noreply.github.com> Date: Thu, 20 Jun 2024 22:21:40 +0200 Subject: [PATCH 2/2] Added report routes, storage and batch processing --- lib/api/consumers/build-reports.js | 22 +++++ lib/api/report/routes.js | 102 ++++++++++++++++++++++ lib/api/report/utils.js | 134 +++++++++++++++++++++++++++++ lib/api/routes.js | 2 + package.json | 1 + worker.js | 3 + yarn.lock | 5 ++ 7 files changed, 269 insertions(+) create mode 100644 lib/api/consumers/build-reports.js create mode 100644 lib/api/report/routes.js create mode 100644 lib/api/report/utils.js diff --git a/lib/api/consumers/build-reports.js b/lib/api/consumers/build-reports.js new file mode 100644 index 00000000..33bb91ad --- /dev/null +++ b/lib/api/consumers/build-reports.js @@ -0,0 +1,22 @@ +import mongo from '../../util/mongo.cjs' +import {formatAndUpdateReports} from '../report/utils.js' + +const buildReportsConsumer = async () => { + try { + console.log('Building reports...') + const filters = { + status: null, + preProcessingStatusKey: 0, + 'meta.targetedPlateform': 'ban', + preProcessingResponse: {$ne: {}} + } + + const reportsNotCompletelyBuilt = await mongo.db.collection('processing_reports').find({...filters}).toArray() + await formatAndUpdateReports(reportsNotCompletelyBuilt) + console.log('Reports built successfully') + } catch (error) { + console.error(error) + } +} + +export default buildReportsConsumer diff --git a/lib/api/report/routes.js b/lib/api/report/routes.js new file mode 100644 index 00000000..583f989a --- /dev/null +++ b/lib/api/report/routes.js @@ -0,0 +1,102 @@ +import 'dotenv/config.js' // eslint-disable-line import/no-unassigned-import +import express from 'express' +import {getDistrict} from '../district/models.js' +import mongo from '../../util/mongo.cjs' +import auth from '../../middleware/auth.js' +import analyticsMiddleware from '../../middleware/analytics.js' +import {formatAndUpdateReports, formatReportToInsert} from './utils.js' + +const app = new express.Router() + +app.route('/district/:districtID') + .get(auth, analyticsMiddleware, async (req, res) => { + let response + try { + const {districtID} = req.params + const reports = await mongo.db.collection('processing_reports') + .find({districtID}) + .sort({preProcessingDate: -1}) + .toArray() + const formattedReports = await formatAndUpdateReports(reports) + res.send(formattedReports) + } catch (error) { + const {message} = error + response = { + date: new Date(), + status: 'error', + message, + response: {}, + } + } + + res.send(response) + }) + +app.route('/district/cog/:cog') + .get(auth, analyticsMiddleware, async (req, res) => { + let response + try { + const {cog} = req.params + const reports = await mongo.db.collection('processing_reports') + .find({'meta.cog': cog}) + .sort({preProcessingDate: 1}) + .toArray() + const formattedReports = await formatAndUpdateReports(reports) + res.send(formattedReports) + } catch (error) { + const {message} = error + response = { + date: new Date(), + status: 'error', + message, + response: {}, + } + } + + res.send(response) + }) + +app.route('/district/:districtID') + .post(auth, analyticsMiddleware, async (req, res) => { + let response + try { + const {districtID} = req.params + const district = await getDistrict(districtID) + + if (!district) { + res.status(404).send('Request ID unknown') + return + } + + // Count the number of documents with the same districtID + const count = await mongo.db.collection('processing_reports').countDocuments({districtID}) + + // If the count is 5 or more, delete the oldest document based on preProcessingDate + if (count >= 5) { + await mongo.db.collection('processing_reports').findOneAndDelete({districtID}, {sort: {preProcessingDate: 1}}).toArray() + } + + const report = req.body + const formatedReportToInsert = formatReportToInsert(districtID, report) + + await mongo.db.collection('processing_reports').insertOne(formatedReportToInsert, {upsert: true}) + response = { + date: new Date(), + status: 'success', + message: 'Processing report created successfully', + response: {}, + } + } catch (error) { + const {message} = error + response = { + date: new Date(), + status: 'error', + message, + response: {}, + } + } + + res.send(response) + }) + +export default app diff --git a/lib/api/report/utils.js b/lib/api/report/utils.js new file mode 100644 index 00000000..ec25d35d --- /dev/null +++ b/lib/api/report/utils.js @@ -0,0 +1,134 @@ +import {PromisePool} from '@supercharge/promise-pool' +import mongo from '../../util/mongo.cjs' +import {getJobStatus} from '../job-status/models.js' + +const categories = ['addresses', 'commonToponyms', 'districts'] +const operations = ['add', 'update', 'delete'] + +// Status key: 0 = success, 1 = error, -1 = unknown + +export const formatReportToInsert = (districtID, report) => { + const {preProcessingDate, ...rest} = report + return { + districtID, + ...rest, + ...(preProcessingDate ? {preProcessingDate: new Date(preProcessingDate)} : {}), + } +} + +export const formatAndUpdateReports = async reports => { + const {results} = await PromisePool + .withConcurrency(10) + .for(reports) + .process(formatAndUpdateReport) + return results +} + +const formatAndUpdateReport = async report => { + const {_id, ...reportRest} = report + const {status, preProcessingStatusKey, preProcessingResponse, meta: {targetedPlateform}} = reportRest + // If the report does not have a final status yet and the pre-processing status is in success, we need to reconstruct. + // the flag 'targetedPlateform' is used to determine if the pre-processing data has been sent to the ban APIs or the legacy compose API + // Our ban APIs are asynchronous so the preprocessing report need to be reconstructed to get the final status + if (!status && preProcessingStatusKey === 0 + && targetedPlateform === 'ban' + && Object.keys(preProcessingResponse).length > 0 + ) { + try { + let errorCount = 0 + let mostRecentDate = new Date(0) + const formattedResponse = {} + + for (const category of categories) { + for (const operation of operations) { + if (!preProcessingResponse[category] || !preProcessingResponse[category][operation]) { // eslint-disable-line max-depth + continue + } + + formattedResponse[category] ??= {} + const jobStatusArr = await Promise.all( // eslint-disable-line no-await-in-loop + preProcessingResponse[category][operation].map(async report => { + const statusID = report?.response?.statusID + if (!statusID) { + throw new Error('Missing statusID in pre-processing report response') + } + + const jobStatus = await getJobStatus(statusID) + if (!jobStatus) { + throw new Error(`Job status ${report.response.statusID} not found : either pending or expired`) + } + + return jobStatus + }) + ) + + formattedResponse[category][operation] = jobStatusArr.reduce((acc, jobStatus) => { + const {status, count, report, updatedAt} = jobStatus + + const date = new Date(updatedAt) + if (date > mostRecentDate) { + mostRecentDate = date + } + + if (status === 'error') { + errorCount++ + acc.error = { + count: acc.error ? acc.error.count + count : count, + report: [...acc.error.report, report], + } + } + + if (status === 'success') { + acc.success = { + count: acc.success ? acc.success.count + count : count, + } + } + + return acc + }, { + success: { + count: 0, + }, + error: { + count: 0, + report: [] + }, + }) + } + } + + const processingReport = { + statusKey: errorCount ? 1 : 0, + status: errorCount ? 'error' : 'success', + message: errorCount ? 'Processed with errors' : 'Processed successfully', + response: formattedResponse, + date: mostRecentDate + } + + // Update the report with the processing report + await mongo.db.collection('processing_reports').updateOne({_id}, {$set: {...processingReport}}) + + return { + ...reportRest, + ...processingReport + } + } catch (error) { + const message = `Could not process the report : ${error.message}` + console.log(message) + const processingReport = { + statusKey: -1, + status: 'error', + message, + response: {}, + date: new Date() + } + + return { + ...reportRest, + ...processingReport + } + } + } + + return reportRest +} diff --git a/lib/api/routes.js b/lib/api/routes.js index 2ac47ff5..d49adac1 100644 --- a/lib/api/routes.js +++ b/lib/api/routes.js @@ -6,6 +6,7 @@ import commonToponymRoutes from './common-toponym/routes.js' import districtRoutes from './district/routes.js' import statusRoutes from './job-status/routes.js' import banIdRoutes from './ban-id/routes.js' +import reportRoutes from './report/routes.js' const app = new express.Router() @@ -14,5 +15,6 @@ app.use('/common-toponym', commonToponymRoutes) app.use('/district', districtRoutes) app.use('/job-status', statusRoutes) app.use('/ban-id', banIdRoutes) +app.use('/report', reportRoutes) export default app diff --git a/package.json b/package.json index 0766d5e0..a0250923 100644 --- a/package.json +++ b/package.json @@ -45,6 +45,7 @@ "@etalab/project-legal": "^0.6.0", "@keyv/sqlite": "^3.6.5", "@mapbox/tilebelt": "^1.0.2", + "@supercharge/promise-pool": "^3.2.0", "@turf/turf": "^6.5.0", "bluebird": "^3.7.2", "bull": "^3.29.3", diff --git a/worker.js b/worker.js index d5e359ab..f888082c 100644 --- a/worker.js +++ b/worker.js @@ -6,6 +6,7 @@ import ms from 'ms' import apiConsumer from './lib/api/consumers/api-consumer.js' import exportToExploitationDBConsumer from './lib/api/consumers/export-to-exploitation-db-consumer.js' import cleanJobStatusConsumer from './lib/api/consumers/clean-job-status-consumer.js' +import buildReportsConsumer from './lib/api/consumers/build-reports.js' import mongo from './lib/util/mongo.cjs' import queue from './lib/util/queue.cjs' @@ -36,6 +37,8 @@ async function main() { queue('export-to-exploitation-db').process(1, exportToExploitationDBConsumer) queue('clean-job-status').process(1, cleanJobStatusConsumer) queue('clean-job-status').add({}, {jobId: 'cleanJobStatusJobId', repeat: {every: ms('1d')}, removeOnComplete: true}) + queue('build-reports').process(1, buildReportsConsumer) + queue('build-reports').add({}, {jobId: 'buildReportsJobId', repeat: {every: ms('1d')}, removeOnComplete: true}) } main().catch(error => { diff --git a/yarn.lock b/yarn.lock index 4bd9ea47..27417647 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1547,6 +1547,11 @@ dependencies: "@sinonjs/commons" "^2.0.0" +"@supercharge/promise-pool@^3.2.0": + version "3.2.0" + resolved "https://registry.yarnpkg.com/@supercharge/promise-pool/-/promise-pool-3.2.0.tgz#a6ab4afdf798e453a6bb51c4ae340852e1266af8" + integrity sha512-pj0cAALblTZBPtMltWOlZTQSLT07jIaFNeM8TWoJD1cQMgDB9mcMlVMoetiB35OzNJpqQ2b+QEtwiR9f20mADg== + "@tootallnate/once@1": version "1.1.2" resolved "https://registry.yarnpkg.com/@tootallnate/once/-/once-1.1.2.tgz#ccb91445360179a04e7fe6aff78c00ffc1eeaf82"