From e3d490dc8f838ae5264f20ea04dd0be7a65da7f9 Mon Sep 17 00:00:00 2001 From: antoineludeau <52679050+antoineludeau@users.noreply.github.com> Date: Thu, 20 Jun 2024 22:21:40 +0200 Subject: [PATCH] Added report routes, storage and batch processing --- lib/api/consumers/build-reports.js | 22 +++++ lib/api/report/routes.js | 109 ++++++++++++++++++++++++ lib/api/report/utils.js | 128 +++++++++++++++++++++++++++++ lib/api/routes.js | 2 + worker.js | 3 + 5 files changed, 264 insertions(+) create mode 100644 lib/api/consumers/build-reports.js create mode 100644 lib/api/report/routes.js create mode 100644 lib/api/report/utils.js diff --git a/lib/api/consumers/build-reports.js b/lib/api/consumers/build-reports.js new file mode 100644 index 00000000..30db2618 --- /dev/null +++ b/lib/api/consumers/build-reports.js @@ -0,0 +1,22 @@ +import mongo from '../../util/mongo.cjs' +import {formatAndUpdateReports} from '../report/utils.js' + +const buildReportsConsumer = async () => { + try { + console.log('Building reports...') + const filters = { + status: null, + preProcessingStatusKey: 0, + 'meta.targetedPlateform': 'ban', + preProcessingResponse: {$ne: {}} + } + + const reportsNotCompletelyBuilt = await mongo.db.collection('processing-reports').find({...filters}).toArray() + await formatAndUpdateReports(reportsNotCompletelyBuilt) + console.log('Reports built successfully') + } catch (error) { + console.error(error) + } +} + +export default buildReportsConsumer diff --git a/lib/api/report/routes.js b/lib/api/report/routes.js new file mode 100644 index 00000000..4dd89e8d --- /dev/null +++ b/lib/api/report/routes.js @@ -0,0 +1,109 @@ +import 'dotenv/config.js' // eslint-disable-line import/no-unassigned-import +import express from 'express' +import {getDistrict} from '../district/models.js' +import mongo from '../../util/mongo.cjs' +import auth from '../../middleware/auth.js' +import analyticsMiddleware from '../../middleware/analytics.js' +import {formatAndUpdateReports, formatReportToInsert} from './utils.js' + +const app = new express.Router() + +app.route('/district/:districtID') + .get(auth, analyticsMiddleware, async (req, res) => { + let response + try { + const {districtID} = req.params + const reports = await mongo.db.collection('processing-reports') + .find({districtID}) + .sort({preProcessingDate: -1}) + .toArray() + const formattedReports = await formatAndUpdateReports(reports) + res.send(formattedReports) + } catch (error) { + const {message} = error + response = { + date: new Date(), + status: 'error', + message, + response: {}, + } + } + + res.send(response) + }) + +app.route('/district/cog/:cog') + .get(auth, analyticsMiddleware, async (req, res) => { + let response + try { + const {cog} = req.params + const reports = await mongo.db.collection('processing-reports') + .find({'meta.cog': cog}) + .sort({preProcessingDate: -1}) + .toArray() + const formattedReports = await formatAndUpdateReports(reports) + res.send(formattedReports) + } catch (error) { + const {message} = error + response = { + date: new Date(), + status: 'error', + message, + response: {}, + } + } + + res.send(response) + }) + +app.route('/district/:districtID') + .post(auth, analyticsMiddleware, async (req, res) => { + let response + try { + const {districtID} = req.params + const district = await getDistrict(districtID) + + if (!district) { + res.status(404).send('Request ID unknown') + return + } + + // Count the number of documents with the same districtID + const count = await mongo.db.collection('processing-reports').countDocuments({districtID}) + + // If the count is 5 or more, delete the oldest document based on preProcessingDate + if (count >= 5) { + const oldestDoc = await mongo.db.collection('processing-reports').find({districtID}) + .sort({preProcessingDate: 1}) + .limit(1) + .toArray() + console.log('oldestDoc', oldestDoc) + if (oldestDoc.length > 0) { + await mongo.db.collection('processing-reports').deleteOne({_id: oldestDoc[0]._id}) + } + } + + const report = req.body + const formatedReportToInsert = formatReportToInsert(districtID, report) + + await mongo.db.collection('processing-reports').insertOne(formatedReportToInsert, {upsert: true}) + response = { + date: new Date(), + status: 'success', + message: 'Processing report created successfully', + response: {}, + } + } catch (error) { + const {message} = error + response = { + date: new Date(), + status: 'error', + message, + response: {}, + } + } + + res.send(response) + }) + +export default app diff --git a/lib/api/report/utils.js b/lib/api/report/utils.js new file mode 100644 index 00000000..20fec567 --- /dev/null +++ b/lib/api/report/utils.js @@ -0,0 +1,128 @@ +import mongo from '../../util/mongo.cjs' +import {getJobStatus} from '../job-status/models.js' + +const categories = ['addresses', 'commonToponyms', 'districts'] +const operations = ['add', 'update', 'delete'] + +export const formatReportToInsert = (districtID, report) => { + const {preProcessingDate, ...rest} = report + return { + districtID, + ...rest, + ...(preProcessingDate ? {preProcessingDate: new Date(preProcessingDate)} : {}), + } +} + +export const formatAndUpdateReports = async reports => { + const formattedReportsPromises = reports.map(async report => formatAndUpdateReport(report)) + return Promise.all(formattedReportsPromises) +} + +const formatAndUpdateReport = async report => { + const {_id, ...reportRest} = report + const {status, preProcessingStatusKey, preProcessingResponse, meta: {targetedPlateform}} = reportRest + // If the report does not have a final status yet and the pre-processing status is in success, we need to reconstruct. + // the flag 'targetedPlateform' is used to determine if the pre-processing data has been sent to the ban APIs or the legacy compose API + // Our ban APIs are asynchronous so the preprocessing report need to be reconstructed to get the final status + if (!status && preProcessingStatusKey === 0 + && targetedPlateform === 'ban' + && Object.keys(preProcessingResponse).length > 0 + ) { + try { + let errorCount = 0 + let mostRecentDate = new Date(0) + const formattedResponse = {} + + for (const category of categories) { + for (const operation of operations) { + if (!preProcessingResponse[category] || !preProcessingResponse[category][operation]) { // eslint-disable-line max-depth + continue + } + + formattedResponse[category] = {} + const jobStatusArr = await Promise.all( // eslint-disable-line no-await-in-loop + preProcessingResponse[category][operation].map(async report => { + const statusID = report?.response?.statusID + if (!statusID) { + throw new Error('Missing statusID in pre-processing report response') + } + + const jobStatus = await getJobStatus(statusID) + if (!jobStatus) { + throw new Error(`Job status ${report.response.statusID} not found : either pending or expired`) + } + + return jobStatus + }) + ) + + formattedResponse[category][operation] = jobStatusArr.reduce((acc, jobStatus) => { + const {status, count, report, updatedAt} = jobStatus + + const date = new Date(updatedAt) + if (date > mostRecentDate) { + mostRecentDate = date + } + + if (status === 'error') { + errorCount++ + acc.error = { + count: acc.error ? acc.error.count + count : count, + report: [...acc.error.report, report], + } + } + + if (status === 'success') { + acc.success = { + count: acc.success ? acc.success.count + count : count, + } + } + + return acc + }, { + success: { + count: 0, + }, + error: { + count: 0, + report: [] + }, + }) + } + } + + const processingReport = { + statusKey: errorCount ? 1 : 0, + status: errorCount ? 'error' : 'success', + message: errorCount ? 'Processed with errors' : 'Processed successfully', + response: formattedResponse, + date: mostRecentDate + } + + // Update the report with the processing report + await mongo.db.collection('processing-reports').updateOne({_id}, {$set: {...processingReport}}) + + return { + ...reportRest, + ...processingReport + } + } catch (error) { + const processingReport = { + statusKey: 1, + status: 'error', + message: `Could not process the report : ${error.message}`, + response: {}, + date: new Date() + } + + const {_id, ...reportRest} = report + + return { + ...reportRest, + ...processingReport + } + } + } + + return report +} diff --git a/lib/api/routes.js b/lib/api/routes.js index 2ac47ff5..66530530 100644 --- a/lib/api/routes.js +++ b/lib/api/routes.js @@ -6,6 +6,7 @@ import commonToponymRoutes from './common-toponym/routes.js' import districtRoutes from './district/routes.js' import statusRoutes from './job-status/routes.js' import banIdRoutes from './ban-id/routes.js' +import reportRoutes from './report/routes.js' const app = new express.Router() @@ -14,5 +15,6 @@ app.use('/common-toponym', commonToponymRoutes) app.use('/district', districtRoutes) app.use('/job-status', statusRoutes) app.use('/ban-id', banIdRoutes) +app.use('/report/', reportRoutes) export default app diff --git a/worker.js b/worker.js index d5e359ab..f888082c 100644 --- a/worker.js +++ b/worker.js @@ -6,6 +6,7 @@ import ms from 'ms' import apiConsumer from './lib/api/consumers/api-consumer.js' import exportToExploitationDBConsumer from './lib/api/consumers/export-to-exploitation-db-consumer.js' import cleanJobStatusConsumer from './lib/api/consumers/clean-job-status-consumer.js' +import buildReportsConsumer from './lib/api/consumers/build-reports.js' import mongo from './lib/util/mongo.cjs' import queue from './lib/util/queue.cjs' @@ -36,6 +37,8 @@ async function main() { queue('export-to-exploitation-db').process(1, exportToExploitationDBConsumer) queue('clean-job-status').process(1, cleanJobStatusConsumer) queue('clean-job-status').add({}, {jobId: 'cleanJobStatusJobId', repeat: {every: ms('1d')}, removeOnComplete: true}) + queue('build-reports').process(1, buildReportsConsumer) + queue('build-reports').add({}, {jobId: 'buildReportsJobId', repeat: {every: ms('1d')}, removeOnComplete: true}) } main().catch(error => {