diff --git a/.env.sample b/.env.sample index be52169f..594d90fe 100644 --- a/.env.sample +++ b/.env.sample @@ -20,6 +20,7 @@ BAN_API_URL=https://plateforme.adresse.data.gouv.fr/api ADMIN_TOKEN= # Used for legacy routes BAN_API_AUTHORIZED_TOKENS= # Used for new ban-id api routes PORT=5000 +EXPORT_TO_EXPLOITATION_DB_JOB_DELAY=10000 # Time in ms during which an export job is delayed before being processed # API de dépôt API_DEPOT_URL=https://plateforme.adresse.data.gouv.fr/api-depot diff --git a/lib/api/address/models.js b/lib/api/address/models.js index a785df4f..763b5c93 100644 --- a/lib/api/address/models.js +++ b/lib/api/address/models.js @@ -35,3 +35,8 @@ export async function deleteAddress(addressID) { export async function deleteAddresses(addressIDs) { return Address.destroy({where: {id: addressIDs}}) } + +export async function getAllDistrictIDsFromAddresses(addressIDs) { + const addresses = await Address.findAll({where: {id: addressIDs}, attributes: ['districtID'], raw: true}) + return addresses.map(address => address.districtID) +} diff --git a/lib/api/common-toponym/models.js b/lib/api/common-toponym/models.js index 1cc6b9f4..b80f815c 100644 --- a/lib/api/common-toponym/models.js +++ b/lib/api/common-toponym/models.js @@ -35,3 +35,8 @@ export async function deleteCommonToponym(commonToponymID) { export async function deleteCommonToponyms(commonToponymIDs) { return CommonToponym.destroy({where: {id: commonToponymIDs}}) } + +export async function getAllDistrictIDsFromCommonToponyms(commonToponymIDs) { + const commonToponyms = await CommonToponym.findAll({where: {id: commonToponymIDs}, attributes: ['districtID'], raw: true}) + return commonToponyms.map(commonToponym => commonToponym.districtID) +} diff --git a/lib/api/consumers/api-consumer.js b/lib/api/consumers/api-consumer.js index 672cb6a4..9892e512 100644 --- a/lib/api/consumers/api-consumer.js +++ b/lib/api/consumers/api-consumer.js @@ -1,11 +1,15 @@ -import {setJobStatus} from '../job-status/models.js' -import {setAddresses, updateAddresses, deleteAddresses} from '../address/models.js' +import queue from '../../util/queue.cjs' +import {getJobStatus, setJobStatus} from '../job-status/models.js' +import {setAddresses, updateAddresses, deleteAddresses, getAllDistrictIDsFromAddresses} from '../address/models.js' import {checkAddressesRequest, checkAddressesIDsRequest} from '../address/utils.js' -import {setCommonToponyms, updateCommonToponyms, deleteCommonToponyms} from '../common-toponym/models.js' +import {setCommonToponyms, updateCommonToponyms, deleteCommonToponyms, getAllDistrictIDsFromCommonToponyms} from '../common-toponym/models.js' import {checkCommonToponymsRequest, checkCommonToponymsIDsRequest} from '../common-toponym/utils.js' import {setDistricts, updateDistricts, deleteDistricts} from '../district/models.js' import {checkDistrictsRequest, checkDistrictsIDsRequest} from '../district/utils.js' -import {dataValidationReportFrom} from '../helper.js' +import {dataValidationReportFrom, addOrUpdateJob} from '../helper.js' + +const exportToExploitationDBQueue = queue('export-to-exploitation-db') +const exportToExploitationDBJobDelay = process.env.EXPORT_TO_EXPLOITATION_DB_JOB_DELAY || 10_000 export default async function apiConsumers({data: {dataType, jobType, data, statusID}}, done) { try { @@ -22,6 +26,22 @@ export default async function apiConsumers({data: {dataType, jobType, data, stat default: console.warn(`Consumer Warn: Unknown data type : '${dataType}'`) } + + const jobStatus = await getJobStatus(statusID) + if (jobStatus.status === 'success') { + // Export data from the postgresql database to the exploitation database + const relatedDistrictIDs = await extractRelatedDistrictIDs(dataType, jobType, data) + const uniqueRelatedDistrictIDs = [...new Set(relatedDistrictIDs)] + const addOrUpdateJobPromises = uniqueRelatedDistrictIDs.map(async districtID => { + await addOrUpdateJob( + exportToExploitationDBQueue, + districtID, + {districtID}, + exportToExploitationDBJobDelay + ) + }) + await Promise.all(addOrUpdateJobPromises) + } } catch (error) { console.error(error) await setJobStatus(statusID, { @@ -178,3 +198,46 @@ const districtConsumer = async (jobType, payload, statusID) => { }) } } + +export const extractRelatedDistrictIDs = async (dataType, jobType, payload) => { + switch (dataType) { + case 'address': + switch (jobType) { + case 'insert': + case 'update': + return getAllDistrictIDsFromAddresses(payload.map(({id}) => id)) + case 'delete': + return getAllDistrictIDsFromAddresses(payload) + default: + console.warn(`Unknown job type : '${jobType}'`) + } + + break + case 'commonToponym': + switch (jobType) { + case 'insert': + case 'update': + return getAllDistrictIDsFromCommonToponyms(payload.map(({id}) => id)) + case 'delete': + return getAllDistrictIDsFromCommonToponyms(payload) + default: + console.warn(`Unknown job type : '${jobType}'`) + } + + break + case 'district': + switch (jobType) { + case 'insert': + case 'update': + return payload.map(({id}) => id) + case 'delete': + return payload + default: + console.warn(`Unknown job type : '${jobType}'`) + } + + break + default: + console.warn(`Unknown data type : '${dataType}'`) + } +} diff --git a/lib/api/consumers/export-to-exploitation-db-consumer.js b/lib/api/consumers/export-to-exploitation-db-consumer.js new file mode 100644 index 00000000..50df6964 --- /dev/null +++ b/lib/api/consumers/export-to-exploitation-db-consumer.js @@ -0,0 +1,105 @@ +import {Transaction} from 'sequelize' +import {sequelize, District, CommonToponym, Address} from '../../util/sequelize.js' +import mongo from '../../util/mongo.cjs' + +const DistrictCollection = 'districts' +const CommonToponymCollection = 'common_toponyms' +const AddressCollection = 'addresses' + +const pageSize = 100 + +export default async function exportToExploitationDB({data}) { + const {districtID} = data + console.log(`Exporting districtID ${districtID} to exploitation DB...`) + // Use REPEATABLE_READ isolation level to balance data consistency and concurrency + // - Ensures data consistency within each table during the transaction + // - Allows concurrent reads across tables, minimizing read contention + const transactionMainDB = await sequelize.transaction({ + isolationLevel: Transaction.ISOLATION_LEVELS.REPEATABLE_READ + }) + + try { + const district = await District.findOne({ + where: {id: districtID}, + transactionMainDB, + raw: true, + }) + + if (!district) { + throw new Error(`District with ID ${districtID} not found.`) + } + + // District + // Delete all data related to the district + await deleteAllDataRelatedToDistrict(districtID) + + // Insert the district + await mongo.db.collection(DistrictCollection).insertOne(district) + + // CommonToponym + const totalCommonToponymRecords = await CommonToponym.count({ + where: {districtID}, + transactionMainDB, + }) + const totalCommonToponymPages = Math.ceil(totalCommonToponymRecords / pageSize) + + const fetchAndExportData = async (model, collection, pageNumber) => { + const offset = (pageNumber - 1) * pageSize + try { + const pageData = await model.findAll({ + where: {districtID}, + order: [['id', 'ASC']], + offset, + limit: pageSize, + transactionMainDB, + raw: true, + }) + // Insert the common toponyms from the related page + await mongo.db.collection(collection).insertMany(pageData) + } catch (error) { + console.error(error) + } + } + + const commonToponymsExportPromises = [] + + for (let pageNumber = 1; pageNumber <= totalCommonToponymPages; pageNumber++) { + commonToponymsExportPromises.push( + fetchAndExportData(CommonToponym, CommonToponymCollection, pageNumber) + ) + } + + await Promise.all(commonToponymsExportPromises) + + // Address + const totalAddressRecords = await Address.count({ + where: {districtID}, + transactionMainDB, + }) + const totalAddressPages = Math.ceil(totalAddressRecords / pageSize) + + const addressesExportPromises = [] + + for (let pageNumber = 1; pageNumber <= totalAddressPages; pageNumber++) { + addressesExportPromises.push( + fetchAndExportData(Address, AddressCollection, pageNumber) + ) + } + + await Promise.all(addressesExportPromises) + + await transactionMainDB.commit() + console.log(`Exporting districtID ${districtID} done`) + } catch (error) { + await transactionMainDB.rollback() + throw error + } +} + +const deleteAllDataRelatedToDistrict = async districtID => { + await Promise.all([ + mongo.db.collection(DistrictCollection).deleteOne({id: districtID}), + mongo.db.collection(CommonToponymCollection).deleteMany({districtID}), + mongo.db.collection(AddressCollection).deleteMany({districtID}) + ]) +} diff --git a/lib/api/helper.js b/lib/api/helper.js index c75422f3..cd3328de 100644 --- a/lib/api/helper.js +++ b/lib/api/helper.js @@ -113,3 +113,22 @@ export const checkIfDistrictsExist = async districtIDs => { return dataValidationReportFrom(false, 'Some districts do not exist', nonExistingCommonToponymIDs) } } + +export const addOrUpdateJob = async (queue, jobId, data, delay) => { + try { + const existingJob = await queue.getJob(jobId) + + if (existingJob) { + await existingJob.remove() + } + + await queue.add(data, { + jobId, + delay, + removeOnComplete: true, + removeOnFail: true + }) + } catch (error) { + console.error(error) + } +} diff --git a/lib/util/sequelize.js b/lib/util/sequelize.js index 36a9c2c1..130a8e65 100644 --- a/lib/util/sequelize.js +++ b/lib/util/sequelize.js @@ -1,7 +1,7 @@ import {Sequelize, DataTypes} from 'sequelize' // Create a new Sequelize instance -const sequelize = new Sequelize(process.env.POSTGRES_DBNAME, process.env.POSTGRES_USER, process.env.POSTGRES_PASSWORD, { +export const sequelize = new Sequelize(process.env.POSTGRES_DBNAME, process.env.POSTGRES_USER, process.env.POSTGRES_PASSWORD, { host: process.env.POSTGRES_URL, dialect: 'postgres', logging: false diff --git a/worker.js b/worker.js index 8680a4c4..8ebfdefd 100644 --- a/worker.js +++ b/worker.js @@ -4,6 +4,7 @@ import 'dotenv/config.js' // eslint-disable-line import/no-unassigned-import import ms from 'ms' import apiConsumer from './lib/api/consumers/api-consumer.js' +import exportToExploitationDBConsumer from './lib/api/consumers/export-to-exploitation-db-consumer.js' import cleanJobStatusConsumer from './lib/api/consumers/clean-job-status-consumer.js' import mongo from './lib/util/mongo.cjs' @@ -11,10 +12,15 @@ import queue from './lib/util/queue.cjs' import composeCommune from './lib/jobs/compose-commune.cjs' import computeBanStats from './lib/jobs/compute-ban-stats.cjs' import balGarbageCollector from './lib/compose/bal-garbage-collector/index.js' +import {init} from './lib/util/sequelize.js' async function main() { + // Mongo DB : connecting and creating indexes await mongo.connect() + // Postgres DB : Testing connection and syncing models + await init() + if (process.env.NODE_ENV === 'production') { // Garbage collector await balGarbageCollector() @@ -27,6 +33,7 @@ async function main() { // BanID queue('api').process(1, apiConsumer) + queue('export-to-exploitation-db').process(1, exportToExploitationDBConsumer) queue('clean-job-status').process(1, cleanJobStatusConsumer) queue('clean-job-status').add({}, {jobId: 'cleanJobStatusJobId', repeat: {every: ms('1d')}, removeOnComplete: true}) }