Skip to content

Commit

Permalink
Added first mechanism of export to exploitation DB
Browse files Browse the repository at this point in the history
  • Loading branch information
antoineludeau committed Aug 29, 2023
1 parent 3ec7410 commit b336869
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 5 deletions.
5 changes: 5 additions & 0 deletions lib/api/address/models.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ export async function deleteAddress(addressID) {
export async function deleteAddresses(addressIDs) {
return Address.destroy({where: {id: addressIDs}})
}

export async function getAllDistrictIDsFromAddresses(addressIDs) {
const addresses = await Address.findAll({where: {id: addressIDs}, attributes: ['districtID'], raw: true})
return addresses.map(address => address.districtID)
}
5 changes: 5 additions & 0 deletions lib/api/common-toponym/models.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ export async function deleteCommonToponym(commonToponymID) {
export async function deleteCommonToponyms(commonToponymIDs) {
return CommonToponym.destroy({where: {id: commonToponymIDs}})
}

export async function getAllDistrictIDsFromCommonToponyms(commonToponymIDs) {
const commonToponyms = await CommonToponym.findAll({where: {id: commonToponymIDs}, attributes: ['districtID'], raw: true})
return commonToponyms.map(commonToponym => commonToponym.districtID)
}
71 changes: 67 additions & 4 deletions lib/api/consumers/api-consumer.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import {setJobStatus} from '../job-status/models.js'
import {setAddresses, updateAddresses, deleteAddresses} from '../address/models.js'
import queue from '../../util/queue.cjs'
import {getJobStatus, setJobStatus} from '../job-status/models.js'
import {setAddresses, updateAddresses, deleteAddresses, getAllDistrictIDsFromAddresses} from '../address/models.js'
import {checkAddressesRequest, checkAddressesIDsRequest} from '../address/utils.js'
import {setCommonToponyms, updateCommonToponyms, deleteCommonToponyms} from '../common-toponym/models.js'
import {setCommonToponyms, updateCommonToponyms, deleteCommonToponyms, getAllDistrictIDsFromCommonToponyms} from '../common-toponym/models.js'
import {checkCommonToponymsRequest, checkCommonToponymsIDsRequest} from '../common-toponym/utils.js'
import {setDistricts, updateDistricts, deleteDistricts} from '../district/models.js'
import {checkDistrictsRequest, checkDistrictsIDsRequest} from '../district/utils.js'
import {dataValidationReportFrom} from '../helper.js'
import {dataValidationReportFrom, addOrUpdateJob} from '../helper.js'

const exportToExploitationDBQueue = queue('export-to-exploitation-db')
const exportToExploitationDBJobDelay = 10_000

export default async function apiConsumers({data: {dataType, jobType, data, statusID}}, done) {
try {
Expand All @@ -22,6 +26,22 @@ export default async function apiConsumers({data: {dataType, jobType, data, stat
default:
console.warn(`Consumer Warn: Unknown data type : '${dataType}'`)
}

const jobStatus = await getJobStatus(statusID)
if (jobStatus.status === 'success') {
// Export data from the postgresql database to the exploitation database
const relatedDistrictIDs = await extractRelatedDistrictIDs(dataType, jobType, data)
const uniqueRelatedDistrictIDs = [...new Set(relatedDistrictIDs)]
const addOrUpdateJobPromises = uniqueRelatedDistrictIDs.map(async districtID => {
await addOrUpdateJob(
exportToExploitationDBQueue,
districtID,
{districtID},
exportToExploitationDBJobDelay
)
})
await Promise.all(addOrUpdateJobPromises)
}
} catch (error) {
console.error(error)
await setJobStatus(statusID, {
Expand Down Expand Up @@ -178,3 +198,46 @@ const districtConsumer = async (jobType, payload, statusID) => {
})
}
}

export const extractRelatedDistrictIDs = async (dataType, jobType, payload) => {
switch (dataType) {
case 'address':
switch (jobType) {
case 'insert':
case 'update':
return getAllDistrictIDsFromAddresses(payload.map(({id}) => id))
case 'delete':
return getAllDistrictIDsFromAddresses(payload)
default:
console.warn(`Unknown job type : '${jobType}'`)
}

break
case 'commonToponym':
switch (jobType) {
case 'insert':
case 'update':
return getAllDistrictIDsFromCommonToponyms(payload.map(({id}) => id))
case 'delete':
return getAllDistrictIDsFromCommonToponyms(payload)
default:
console.warn(`Unknown job type : '${jobType}'`)
}

break
case 'district':
switch (jobType) {
case 'insert':
case 'update':
return payload.map(({id}) => id)
case 'delete':
return payload
default:
console.warn(`Unknown job type : '${jobType}'`)
}

break
default:
console.warn(`Unknown data type : '${dataType}'`)
}
}
102 changes: 102 additions & 0 deletions lib/api/consumers/export-to-exploitation-db-consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import {sequelize, District, CommonToponym, Address} from '../../util/sequelize.js'
import mongo from '../../util/mongo.cjs'

const DistrictCollection = 'districts'
const CommonToponymCollection = 'common_toponyms'
const AddressCollection = 'addresses'

const pageSize = 100

export default async function exportToExploitationDB({data}) {
const transaction = await sequelize.transaction()
try {
const {districtID} = data
console.log(`Exporting districtID ${districtID} to exploitation DB...`)
const district = await District.findOne({
where: {id: districtID},
transaction,
raw: true,
})

if (!district) {
throw new Error(`District with ID ${districtID} not found.`)
}

// District
// Delete all data related to the district
await deleteAllDataRelatedToDistrict(districtID)

// Insert the district
await mongo.db.collection(DistrictCollection).insertOne(district)

// CommonToponym
const totalCommonToponymRecords = await CommonToponym.count({
where: {districtID},
transaction,
})
const totalCommonToponymPages = Math.ceil(totalCommonToponymRecords / pageSize)

for (let pageNumber = 1; pageNumber <= totalCommonToponymPages; pageNumber++) {
const offset = (pageNumber - 1) * pageSize

try {
// eslint-disable-next-line no-await-in-loop
const pageData = await CommonToponym.findAll({
order: [['id', 'ASC']],
offset,
limit: pageSize,
transaction,
lock: transaction.LOCK.UPDATE, // Apply an update lock for consistency
raw: true,
})
// Insert the common toponyms from the related page
// eslint-disable-next-line no-await-in-loop
await mongo.db.collection(CommonToponymCollection).insertMany(pageData)
} catch (error) {
console.error(error)
}
}

// Address
const totalAddressRecords = await Address.count({
where: {districtID},
transaction,
})
const totalAddressPages = Math.ceil(totalAddressRecords / pageSize)

for (let pageNumber = 1; pageNumber <= totalAddressPages; pageNumber++) {
const offset = (pageNumber - 1) * pageSize

try {
// eslint-disable-next-line no-await-in-loop
const pageData = await Address.findAll({
order: [['id', 'ASC']],
offset,
limit: pageSize,
transaction,
lock: transaction.LOCK.UPDATE, // Apply an update lock for consistency
raw: true,
})
// Insert the addresses from the related page
// eslint-disable-next-line no-await-in-loop
await mongo.db.collection(AddressCollection).insertMany(pageData)
} catch (error) {
console.error(error)
}
}

await transaction.commit()
console.log(`Exporting districtID ${districtID} done`)
} catch (error) {
await transaction.rollback()
console.error(error)
}
}

const deleteAllDataRelatedToDistrict = async districtID => {
await Promise.all([
mongo.db.collection(DistrictCollection).deleteOne({id: districtID}),
mongo.db.collection(CommonToponymCollection).deleteMany({districtID}),
mongo.db.collection(AddressCollection).deleteMany({districtID})
])
}
19 changes: 19 additions & 0 deletions lib/api/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,22 @@ export const checkIfDistrictsExist = async districtIDs => {
return dataValidationReportFrom(false, 'Some districts do not exist', nonExistingCommonToponymIDs)
}
}

export const addOrUpdateJob = async (queue, jobId, data, delay) => {
try {
const existingJob = await queue.getJob(jobId)

if (existingJob) {
await existingJob.remove()
}

await queue.add(data, {
jobId,
delay,
removeOnComplete: true,
removeOnFail: true
})
} catch (error) {
console.error(error)
}
}
2 changes: 1 addition & 1 deletion lib/util/sequelize.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {Sequelize, DataTypes} from 'sequelize'

// Create a new Sequelize instance
const sequelize = new Sequelize(process.env.POSTGRES_DBNAME, process.env.POSTGRES_USER, process.env.POSTGRES_PASSWORD, {
export const sequelize = new Sequelize(process.env.POSTGRES_DBNAME, process.env.POSTGRES_USER, process.env.POSTGRES_PASSWORD, {
host: process.env.POSTGRES_URL,
dialect: 'postgres',
logging: false
Expand Down
7 changes: 7 additions & 0 deletions worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@ import 'dotenv/config.js' // eslint-disable-line import/no-unassigned-import
import ms from 'ms'

import apiConsumer from './lib/api/consumers/api-consumer.js'
import exportToExploitationDBConsumer from './lib/api/consumers/export-to-exploitation-db-consumer.js'
import cleanJobStatusConsumer from './lib/api/consumers/clean-job-status-consumer.js'

import mongo from './lib/util/mongo.cjs'
import queue from './lib/util/queue.cjs'
import composeCommune from './lib/jobs/compose-commune.cjs'
import computeBanStats from './lib/jobs/compute-ban-stats.cjs'
import balGarbageCollector from './lib/compose/bal-garbage-collector/index.js'
import {init} from './lib/util/sequelize.js'

async function main() {
// Mongo DB : connecting and creating indexes
await mongo.connect()

// Postgres DB : Testing connection and syncing models
await init()

if (process.env.NODE_ENV === 'production') {
// Garbage collector
await balGarbageCollector()
Expand All @@ -27,6 +33,7 @@ async function main() {

// BanID
queue('api').process(1, apiConsumer)
queue('export-to-exploitation-db').process(1, exportToExploitationDBConsumer)
queue('clean-job-status').process(1, cleanJobStatusConsumer)
queue('clean-job-status').add({}, {jobId: 'cleanJobStatusJobId', repeat: {every: ms('1d')}, removeOnComplete: true})
}
Expand Down

0 comments on commit b336869

Please sign in to comment.