diff --git a/lib/api/address/models.js b/lib/api/address/models.js index a785df4f..763b5c93 100644 --- a/lib/api/address/models.js +++ b/lib/api/address/models.js @@ -35,3 +35,8 @@ export async function deleteAddress(addressID) { export async function deleteAddresses(addressIDs) { return Address.destroy({where: {id: addressIDs}}) } + +export async function getAllDistrictIDsFromAddresses(addressIDs) { + const addresses = await Address.findAll({where: {id: addressIDs}, attributes: ['districtID'], raw: true}) + return addresses.map(address => address.districtID) +} diff --git a/lib/api/common-toponym/models.js b/lib/api/common-toponym/models.js index 1cc6b9f4..b80f815c 100644 --- a/lib/api/common-toponym/models.js +++ b/lib/api/common-toponym/models.js @@ -35,3 +35,8 @@ export async function deleteCommonToponym(commonToponymID) { export async function deleteCommonToponyms(commonToponymIDs) { return CommonToponym.destroy({where: {id: commonToponymIDs}}) } + +export async function getAllDistrictIDsFromCommonToponyms(commonToponymIDs) { + const commonToponyms = await CommonToponym.findAll({where: {id: commonToponymIDs}, attributes: ['districtID'], raw: true}) + return commonToponyms.map(commonToponym => commonToponym.districtID) +} diff --git a/lib/api/consumers/api-consumer.js b/lib/api/consumers/api-consumer.js index 672cb6a4..ce5138c3 100644 --- a/lib/api/consumers/api-consumer.js +++ b/lib/api/consumers/api-consumer.js @@ -1,11 +1,15 @@ -import {setJobStatus} from '../job-status/models.js' -import {setAddresses, updateAddresses, deleteAddresses} from '../address/models.js' +import queue from '../../util/queue.cjs' +import {getJobStatus, setJobStatus} from '../job-status/models.js' +import {setAddresses, updateAddresses, deleteAddresses, getAllDistrictIDsFromAddresses} from '../address/models.js' import {checkAddressesRequest, checkAddressesIDsRequest} from '../address/utils.js' -import {setCommonToponyms, updateCommonToponyms, deleteCommonToponyms} from '../common-toponym/models.js' +import {setCommonToponyms, updateCommonToponyms, deleteCommonToponyms, getAllDistrictIDsFromCommonToponyms} from '../common-toponym/models.js' import {checkCommonToponymsRequest, checkCommonToponymsIDsRequest} from '../common-toponym/utils.js' import {setDistricts, updateDistricts, deleteDistricts} from '../district/models.js' import {checkDistrictsRequest, checkDistrictsIDsRequest} from '../district/utils.js' -import {dataValidationReportFrom} from '../helper.js' +import {dataValidationReportFrom, addOrUpdateJob} from '../helper.js' + +const exportToExploitationDBQueue = queue('export-to-exploitation-db') +const exportToExploitationDBJobDelay = 10_000 export default async function apiConsumers({data: {dataType, jobType, data, statusID}}, done) { try { @@ -22,6 +26,22 @@ export default async function apiConsumers({data: {dataType, jobType, data, stat default: console.warn(`Consumer Warn: Unknown data type : '${dataType}'`) } + + const jobStatus = await getJobStatus(statusID) + if (jobStatus.status === 'success') { + // Export data from the postgresql database to the exploitation database + const relatedDistrictIDs = await extractRelatedDistrictIDs(dataType, jobType, data) + const uniqueRelatedDistrictIDs = [...new Set(relatedDistrictIDs)] + const addOrUpdateJobPromises = uniqueRelatedDistrictIDs.map(async districtID => { + await addOrUpdateJob( + exportToExploitationDBQueue, + districtID, + {districtID}, + exportToExploitationDBJobDelay + ) + }) + await Promise.all(addOrUpdateJobPromises) + } } catch (error) { console.error(error) await setJobStatus(statusID, { @@ -178,3 +198,46 @@ const districtConsumer = async (jobType, payload, statusID) => { }) } } + +export const extractRelatedDistrictIDs = async (dataType, jobType, payload) => { + switch (dataType) { + case 'address': + switch (jobType) { + case 'insert': + case 'update': + return getAllDistrictIDsFromAddresses(payload.map(({id}) => id)) + case 'delete': + return getAllDistrictIDsFromAddresses(payload) + default: + console.warn(`Unknown job type : '${jobType}'`) + } + + break + case 'commonToponym': + switch (jobType) { + case 'insert': + case 'update': + return getAllDistrictIDsFromCommonToponyms(payload.map(({id}) => id)) + case 'delete': + return getAllDistrictIDsFromCommonToponyms(payload) + default: + console.warn(`Unknown job type : '${jobType}'`) + } + + break + case 'district': + switch (jobType) { + case 'insert': + case 'update': + return payload.map(({id}) => id) + case 'delete': + return payload + default: + console.warn(`Unknown job type : '${jobType}'`) + } + + break + default: + console.warn(`Unknown data type : '${dataType}'`) + } +} diff --git a/lib/api/consumers/export-to-exploitation-db-consumer.js b/lib/api/consumers/export-to-exploitation-db-consumer.js new file mode 100644 index 00000000..076e4b9f --- /dev/null +++ b/lib/api/consumers/export-to-exploitation-db-consumer.js @@ -0,0 +1,102 @@ +import {sequelize, District, CommonToponym, Address} from '../../util/sequelize.js' +import mongo from '../../util/mongo.cjs' + +const DistrictCollection = 'districts' +const CommonToponymCollection = 'common_toponyms' +const AddressCollection = 'addresses' + +const pageSize = 100 + +export default async function exportToExploitationDB({data}) { + const transaction = await sequelize.transaction() + try { + const {districtID} = data + console.log(`Exporting districtID ${districtID} to exploitation DB...`) + const district = await District.findOne({ + where: {id: districtID}, + transaction, + raw: true, + }) + + if (!district) { + throw new Error(`District with ID ${districtID} not found.`) + } + + // District + // Delete all data related to the district + await deleteAllDataRelatedToDistrict(districtID) + + // Insert the district + await mongo.db.collection(DistrictCollection).insertOne(district) + + // CommonToponym + const totalCommonToponymRecords = await CommonToponym.count({ + where: {districtID}, + transaction, + }) + const totalCommonToponymPages = Math.ceil(totalCommonToponymRecords / pageSize) + + for (let pageNumber = 1; pageNumber <= totalCommonToponymPages; pageNumber++) { + const offset = (pageNumber - 1) * pageSize + + try { + // eslint-disable-next-line no-await-in-loop + const pageData = await CommonToponym.findAll({ + order: [['id', 'ASC']], + offset, + limit: pageSize, + transaction, + lock: transaction.LOCK.UPDATE, // Apply an update lock for consistency + raw: true, + }) + // Insert the common toponyms from the related page + // eslint-disable-next-line no-await-in-loop + await mongo.db.collection(CommonToponymCollection).insertMany(pageData) + } catch (error) { + console.error(error) + } + } + + // Address + const totalAddressRecords = await Address.count({ + where: {districtID}, + transaction, + }) + const totalAddressPages = Math.ceil(totalAddressRecords / pageSize) + + for (let pageNumber = 1; pageNumber <= totalAddressPages; pageNumber++) { + const offset = (pageNumber - 1) * pageSize + + try { + // eslint-disable-next-line no-await-in-loop + const pageData = await Address.findAll({ + order: [['id', 'ASC']], + offset, + limit: pageSize, + transaction, + lock: transaction.LOCK.UPDATE, // Apply an update lock for consistency + raw: true, + }) + // Insert the addresses from the related page + // eslint-disable-next-line no-await-in-loop + await mongo.db.collection(AddressCollection).insertMany(pageData) + } catch (error) { + console.error(error) + } + } + + await transaction.commit() + console.log(`Exporting districtID ${districtID} done`) + } catch (error) { + await transaction.rollback() + console.error(error) + } +} + +const deleteAllDataRelatedToDistrict = async districtID => { + await Promise.all([ + mongo.db.collection(DistrictCollection).deleteOne({id: districtID}), + mongo.db.collection(CommonToponymCollection).deleteMany({districtID}), + mongo.db.collection(AddressCollection).deleteMany({districtID}) + ]) +} diff --git a/lib/api/helper.js b/lib/api/helper.js index c75422f3..cd3328de 100644 --- a/lib/api/helper.js +++ b/lib/api/helper.js @@ -113,3 +113,22 @@ export const checkIfDistrictsExist = async districtIDs => { return dataValidationReportFrom(false, 'Some districts do not exist', nonExistingCommonToponymIDs) } } + +export const addOrUpdateJob = async (queue, jobId, data, delay) => { + try { + const existingJob = await queue.getJob(jobId) + + if (existingJob) { + await existingJob.remove() + } + + await queue.add(data, { + jobId, + delay, + removeOnComplete: true, + removeOnFail: true + }) + } catch (error) { + console.error(error) + } +} diff --git a/lib/util/sequelize.js b/lib/util/sequelize.js index 36a9c2c1..130a8e65 100644 --- a/lib/util/sequelize.js +++ b/lib/util/sequelize.js @@ -1,7 +1,7 @@ import {Sequelize, DataTypes} from 'sequelize' // Create a new Sequelize instance -const sequelize = new Sequelize(process.env.POSTGRES_DBNAME, process.env.POSTGRES_USER, process.env.POSTGRES_PASSWORD, { +export const sequelize = new Sequelize(process.env.POSTGRES_DBNAME, process.env.POSTGRES_USER, process.env.POSTGRES_PASSWORD, { host: process.env.POSTGRES_URL, dialect: 'postgres', logging: false diff --git a/worker.js b/worker.js index 8680a4c4..8ebfdefd 100644 --- a/worker.js +++ b/worker.js @@ -4,6 +4,7 @@ import 'dotenv/config.js' // eslint-disable-line import/no-unassigned-import import ms from 'ms' import apiConsumer from './lib/api/consumers/api-consumer.js' +import exportToExploitationDBConsumer from './lib/api/consumers/export-to-exploitation-db-consumer.js' import cleanJobStatusConsumer from './lib/api/consumers/clean-job-status-consumer.js' import mongo from './lib/util/mongo.cjs' @@ -11,10 +12,15 @@ import queue from './lib/util/queue.cjs' import composeCommune from './lib/jobs/compose-commune.cjs' import computeBanStats from './lib/jobs/compute-ban-stats.cjs' import balGarbageCollector from './lib/compose/bal-garbage-collector/index.js' +import {init} from './lib/util/sequelize.js' async function main() { + // Mongo DB : connecting and creating indexes await mongo.connect() + // Postgres DB : Testing connection and syncing models + await init() + if (process.env.NODE_ENV === 'production') { // Garbage collector await balGarbageCollector() @@ -27,6 +33,7 @@ async function main() { // BanID queue('api').process(1, apiConsumer) + queue('export-to-exploitation-db').process(1, exportToExploitationDBConsumer) queue('clean-job-status').process(1, cleanJobStatusConsumer) queue('clean-job-status').add({}, {jobId: 'cleanJobStatusJobId', repeat: {every: ms('1d')}, removeOnComplete: true}) }