Skip to content

Commit

Permalink
Added report routes, storage and batch processing
Browse files Browse the repository at this point in the history
  • Loading branch information
antoineludeau committed Jul 31, 2024
1 parent ae588a6 commit e4197ed
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 0 deletions.
22 changes: 22 additions & 0 deletions lib/api/consumers/build-reports.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import mongo from '../../util/mongo.cjs'
import {formatAndUpdateReports} from '../report/utils.js'

const buildReportsConsumer = async () => {
try {
console.log('Building reports...')
const filters = {
status: null,
preProcessingStatusKey: 0,
'meta.targetedPlateform': 'ban',
preProcessingResponse: {$ne: {}}
}

const reportsNotCompletelyBuilt = await mongo.db.collection('processing_reports').find({...filters}).toArray()
await formatAndUpdateReports(reportsNotCompletelyBuilt)
console.log('Reports built successfully')
} catch (error) {
console.error(error)
}
}

export default buildReportsConsumer
102 changes: 102 additions & 0 deletions lib/api/report/routes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import 'dotenv/config.js' // eslint-disable-line import/no-unassigned-import
import express from 'express'
import {getDistrict} from '../district/models.js'
import mongo from '../../util/mongo.cjs'
import auth from '../../middleware/auth.js'
import analyticsMiddleware from '../../middleware/analytics.js'
import {formatAndUpdateReports, formatReportToInsert} from './utils.js'

const app = new express.Router()

app.route('/district/:districtID')
.get(auth, analyticsMiddleware, async (req, res) => {
let response
try {
const {districtID} = req.params
const reports = await mongo.db.collection('processing_reports')
.find({districtID})
.sort({preProcessingDate: -1})
.toArray()
const formattedReports = await formatAndUpdateReports(reports)
res.send(formattedReports)
} catch (error) {
const {message} = error
response = {
date: new Date(),
status: 'error',
message,
response: {},
}
}

res.send(response)
})

app.route('/district/cog/:cog')
.get(auth, analyticsMiddleware, async (req, res) => {
let response
try {
const {cog} = req.params
const reports = await mongo.db.collection('processing_reports')
.find({'meta.cog': cog})
.sort({preProcessingDate: 1})
.toArray()
const formattedReports = await formatAndUpdateReports(reports)
res.send(formattedReports)
} catch (error) {
const {message} = error
response = {
date: new Date(),
status: 'error',
message,
response: {},
}
}

res.send(response)
})

app.route('/district/:districtID')
.post(auth, analyticsMiddleware, async (req, res) => {
let response
try {
const {districtID} = req.params
const district = await getDistrict(districtID)

if (!district) {
res.status(404).send('Request ID unknown')
return
}

// Count the number of documents with the same districtID
const count = await mongo.db.collection('processing_reports').countDocuments({districtID})

// If the count is 5 or more, delete the oldest document based on preProcessingDate
if (count >= 5) {
await mongo.db.collection('processing_reports').findOneAndDelete({districtID}, {sort: {preProcessingDate: 1}}).toArray()
}

const report = req.body
const formatedReportToInsert = formatReportToInsert(districtID, report)

await mongo.db.collection('processing_reports').insertOne(formatedReportToInsert, {upsert: true})
response = {
date: new Date(),
status: 'success',
message: 'Processing report created successfully',
response: {},
}
} catch (error) {
const {message} = error
response = {
date: new Date(),
status: 'error',
message,
response: {},
}
}

res.send(response)
})

export default app
134 changes: 134 additions & 0 deletions lib/api/report/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import {PromisePool} from '@supercharge/promise-pool'
import mongo from '../../util/mongo.cjs'
import {getJobStatus} from '../job-status/models.js'

const categories = ['addresses', 'commonToponyms', 'districts']
const operations = ['add', 'update', 'delete']

// Status key: 0 = success, 1 = error, -1 = unknown

export const formatReportToInsert = (districtID, report) => {
const {preProcessingDate, ...rest} = report
return {
districtID,
...rest,
...(preProcessingDate ? {preProcessingDate: new Date(preProcessingDate)} : {}),
}
}

export const formatAndUpdateReports = async reports => {
const {results} = await PromisePool
.withConcurrency(10)
.for(reports)
.process(formatAndUpdateReport)
return results
}

const formatAndUpdateReport = async report => {
const {_id, ...reportRest} = report
const {status, preProcessingStatusKey, preProcessingResponse, meta: {targetedPlateform}} = reportRest
// If the report does not have a final status yet and the pre-processing status is in success, we need to reconstruct.
// the flag 'targetedPlateform' is used to determine if the pre-processing data has been sent to the ban APIs or the legacy compose API
// Our ban APIs are asynchronous so the preprocessing report need to be reconstructed to get the final status
if (!status && preProcessingStatusKey === 0
&& targetedPlateform === 'ban'
&& Object.keys(preProcessingResponse).length > 0
) {
try {
let errorCount = 0
let mostRecentDate = new Date(0)
const formattedResponse = {}

for (const category of categories) {
for (const operation of operations) {
if (!preProcessingResponse[category] || !preProcessingResponse[category][operation]) { // eslint-disable-line max-depth
continue
}

formattedResponse[category] ??= {}
const jobStatusArr = await Promise.all( // eslint-disable-line no-await-in-loop
preProcessingResponse[category][operation].map(async report => {
const statusID = report?.response?.statusID
if (!statusID) {
throw new Error('Missing statusID in pre-processing report response')
}

const jobStatus = await getJobStatus(statusID)
if (!jobStatus) {
throw new Error(`Job status ${report.response.statusID} not found : either pending or expired`)
}

return jobStatus
})
)

formattedResponse[category][operation] = jobStatusArr.reduce((acc, jobStatus) => {
const {status, count, report, updatedAt} = jobStatus

const date = new Date(updatedAt)
if (date > mostRecentDate) {
mostRecentDate = date
}

if (status === 'error') {
errorCount++
acc.error = {
count: acc.error ? acc.error.count + count : count,
report: [...acc.error.report, report],
}
}

if (status === 'success') {
acc.success = {
count: acc.success ? acc.success.count + count : count,
}
}

return acc
}, {
success: {
count: 0,
},
error: {
count: 0,
report: []
},
})
}
}

const processingReport = {
statusKey: errorCount ? 1 : 0,
status: errorCount ? 'error' : 'success',
message: errorCount ? 'Processed with errors' : 'Processed successfully',
response: formattedResponse,
date: mostRecentDate
}

// Update the report with the processing report
await mongo.db.collection('processing_reports').updateOne({_id}, {$set: {...processingReport}})

return {
...reportRest,
...processingReport
}
} catch (error) {
const message = `Could not process the report : ${error.message}`
console.log(message)
const processingReport = {
statusKey: -1,
status: 'error',
message,
response: {},
date: new Date()
}

return {
...reportRest,
...processingReport
}
}
}

return reportRest
}
2 changes: 2 additions & 0 deletions lib/api/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import commonToponymRoutes from './common-toponym/routes.js'
import districtRoutes from './district/routes.js'
import statusRoutes from './job-status/routes.js'
import banIdRoutes from './ban-id/routes.js'
import reportRoutes from './report/routes.js'

const app = new express.Router()

Expand All @@ -14,5 +15,6 @@ app.use('/common-toponym', commonToponymRoutes)
app.use('/district', districtRoutes)
app.use('/job-status', statusRoutes)
app.use('/ban-id', banIdRoutes)
app.use('/report', reportRoutes)

export default app
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"@etalab/project-legal": "^0.6.0",
"@keyv/sqlite": "^3.6.5",
"@mapbox/tilebelt": "^1.0.2",
"@supercharge/promise-pool": "^3.2.0",
"@turf/turf": "^6.5.0",
"bluebird": "^3.7.2",
"bull": "^3.29.3",
Expand Down
3 changes: 3 additions & 0 deletions worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import ms from 'ms'
import apiConsumer from './lib/api/consumers/api-consumer.js'
import exportToExploitationDBConsumer from './lib/api/consumers/export-to-exploitation-db-consumer.js'
import cleanJobStatusConsumer from './lib/api/consumers/clean-job-status-consumer.js'
import buildReportsConsumer from './lib/api/consumers/build-reports.js'

import mongo from './lib/util/mongo.cjs'
import queue from './lib/util/queue.cjs'
Expand Down Expand Up @@ -36,6 +37,8 @@ async function main() {
queue('export-to-exploitation-db').process(1, exportToExploitationDBConsumer)
queue('clean-job-status').process(1, cleanJobStatusConsumer)
queue('clean-job-status').add({}, {jobId: 'cleanJobStatusJobId', repeat: {every: ms('1d')}, removeOnComplete: true})
queue('build-reports').process(1, buildReportsConsumer)
queue('build-reports').add({}, {jobId: 'buildReportsJobId', repeat: {every: ms('1d')}, removeOnComplete: true})
}

main().catch(error => {
Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1547,6 +1547,11 @@
dependencies:
"@sinonjs/commons" "^2.0.0"

"@supercharge/promise-pool@^3.2.0":
version "3.2.0"
resolved "https://registry.yarnpkg.com/@supercharge/promise-pool/-/promise-pool-3.2.0.tgz#a6ab4afdf798e453a6bb51c4ae340852e1266af8"
integrity sha512-pj0cAALblTZBPtMltWOlZTQSLT07jIaFNeM8TWoJD1cQMgDB9mcMlVMoetiB35OzNJpqQ2b+QEtwiR9f20mADg==

"@tootallnate/once@1":
version "1.1.2"
resolved "https://registry.yarnpkg.com/@tootallnate/once/-/once-1.1.2.tgz#ccb91445360179a04e7fe6aff78c00ffc1eeaf82"
Expand Down

0 comments on commit e4197ed

Please sign in to comment.