Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First mechanism of export from internal DB (postgres) to exploitation DB (mongo) #251

Merged
merged 1 commit into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ BAN_API_URL=https://plateforme.adresse.data.gouv.fr/api
ADMIN_TOKEN= # Used for legacy routes
BAN_API_AUTHORIZED_TOKENS= # Used for new ban-id api routes
PORT=5000
EXPORT_TO_EXPLOITATION_DB_JOB_DELAY=10000 # Time in ms during which an export job is delayed before being processed

# API de dépôt
API_DEPOT_URL=https://plateforme.adresse.data.gouv.fr/api-depot
Expand Down
5 changes: 5 additions & 0 deletions lib/api/address/models.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ export async function deleteAddress(addressID) {
export async function deleteAddresses(addressIDs) {
return Address.destroy({where: {id: addressIDs}})
}

export async function getAllDistrictIDsFromAddresses(addressIDs) {
const addresses = await Address.findAll({where: {id: addressIDs}, attributes: ['districtID'], raw: true})
return addresses.map(address => address.districtID)
}
5 changes: 5 additions & 0 deletions lib/api/common-toponym/models.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ export async function deleteCommonToponym(commonToponymID) {
export async function deleteCommonToponyms(commonToponymIDs) {
return CommonToponym.destroy({where: {id: commonToponymIDs}})
}

export async function getAllDistrictIDsFromCommonToponyms(commonToponymIDs) {
const commonToponyms = await CommonToponym.findAll({where: {id: commonToponymIDs}, attributes: ['districtID'], raw: true})
return commonToponyms.map(commonToponym => commonToponym.districtID)
}
71 changes: 67 additions & 4 deletions lib/api/consumers/api-consumer.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import {setJobStatus} from '../job-status/models.js'
import {setAddresses, updateAddresses, deleteAddresses} from '../address/models.js'
import queue from '../../util/queue.cjs'
import {getJobStatus, setJobStatus} from '../job-status/models.js'
import {setAddresses, updateAddresses, deleteAddresses, getAllDistrictIDsFromAddresses} from '../address/models.js'
import {checkAddressesRequest, checkAddressesIDsRequest} from '../address/utils.js'
import {setCommonToponyms, updateCommonToponyms, deleteCommonToponyms} from '../common-toponym/models.js'
import {setCommonToponyms, updateCommonToponyms, deleteCommonToponyms, getAllDistrictIDsFromCommonToponyms} from '../common-toponym/models.js'
import {checkCommonToponymsRequest, checkCommonToponymsIDsRequest} from '../common-toponym/utils.js'
import {setDistricts, updateDistricts, deleteDistricts} from '../district/models.js'
import {checkDistrictsRequest, checkDistrictsIDsRequest} from '../district/utils.js'
import {dataValidationReportFrom} from '../helper.js'
import {dataValidationReportFrom, addOrUpdateJob} from '../helper.js'

const exportToExploitationDBQueue = queue('export-to-exploitation-db')
const exportToExploitationDBJobDelay = process.env.EXPORT_TO_EXPLOITATION_DB_JOB_DELAY || 10_000

export default async function apiConsumers({data: {dataType, jobType, data, statusID}}, done) {
try {
Expand All @@ -22,6 +26,22 @@ export default async function apiConsumers({data: {dataType, jobType, data, stat
default:
console.warn(`Consumer Warn: Unknown data type : '${dataType}'`)
}

const jobStatus = await getJobStatus(statusID)
if (jobStatus.status === 'success') {
// Export data from the postgresql database to the exploitation database
const relatedDistrictIDs = await extractRelatedDistrictIDs(dataType, jobType, data)
const uniqueRelatedDistrictIDs = [...new Set(relatedDistrictIDs)]
const addOrUpdateJobPromises = uniqueRelatedDistrictIDs.map(async districtID => {
await addOrUpdateJob(
exportToExploitationDBQueue,
districtID,
{districtID},
exportToExploitationDBJobDelay
)
})
await Promise.all(addOrUpdateJobPromises)
}
} catch (error) {
console.error(error)
await setJobStatus(statusID, {
Expand Down Expand Up @@ -178,3 +198,46 @@ const districtConsumer = async (jobType, payload, statusID) => {
})
}
}

export const extractRelatedDistrictIDs = async (dataType, jobType, payload) => {
switch (dataType) {
case 'address':
switch (jobType) {
case 'insert':
case 'update':
return getAllDistrictIDsFromAddresses(payload.map(({id}) => id))
case 'delete':
return getAllDistrictIDsFromAddresses(payload)
default:
console.warn(`Unknown job type : '${jobType}'`)
}

break
case 'commonToponym':
switch (jobType) {
case 'insert':
case 'update':
return getAllDistrictIDsFromCommonToponyms(payload.map(({id}) => id))
case 'delete':
return getAllDistrictIDsFromCommonToponyms(payload)
default:
console.warn(`Unknown job type : '${jobType}'`)
}

break
case 'district':
switch (jobType) {
case 'insert':
case 'update':
return payload.map(({id}) => id)
case 'delete':
return payload
default:
console.warn(`Unknown job type : '${jobType}'`)
}

break
default:
console.warn(`Unknown data type : '${dataType}'`)
}
}
101 changes: 101 additions & 0 deletions lib/api/consumers/export-to-exploitation-db-consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import {Transaction} from 'sequelize'
import {sequelize, District, CommonToponym, Address} from '../../util/sequelize.js'
import mongo from '../../util/mongo.cjs'

const DistrictCollection = 'districts'
const CommonToponymCollection = 'common_toponyms'
const AddressCollection = 'addresses'

const pageSize = 100

export default async function exportToExploitationDB({data}) {
const {districtID} = data
console.log(`Exporting districtID ${districtID} to exploitation DB...`)
// Use REPEATABLE_READ isolation level to balance data consistency and concurrency
// - Ensures data consistency within each table during the transaction
// - Allows concurrent reads across tables, minimizing read contention
const transaction = await sequelize.transaction({
isolationLevel: Transaction.ISOLATION_LEVELS.REPEATABLE_READ
})

try {
const district = await District.findOne({
where: {id: districtID},
transaction,
raw: true,
})

if (!district) {
throw new Error(`District with ID ${districtID} not found.`)
}

// District
// Delete all data related to the district
await deleteAllDataRelatedToDistrict(districtID)

// Insert the district
await mongo.db.collection(DistrictCollection).insertOne(district)

// CommonToponym
const totalCommonToponymRecords = await CommonToponym.count({
where: {districtID},
transaction,
})
const totalCommonToponymPages = Math.ceil(totalCommonToponymRecords / pageSize)

const fetchAndExportDataFromPage = async (model, collection, pageNumber) => {
const offset = (pageNumber - 1) * pageSize
const pageData = await model.findAll({
where: {districtID},
order: [['id', 'ASC']],
offset,
limit: pageSize,
transaction,
raw: true,
})
// Insert the common toponyms from the related page
await mongo.db.collection(collection).insertMany(pageData, {ordered: false})
}

const commonToponymsExportPromises = []

for (let pageNumber = 1; pageNumber <= totalCommonToponymPages; pageNumber++) {
commonToponymsExportPromises.push(
fetchAndExportDataFromPage(CommonToponym, CommonToponymCollection, pageNumber)
)
}

await Promise.all(commonToponymsExportPromises)

// Address
const totalAddressRecords = await Address.count({
where: {districtID},
transaction,
})
const totalAddressPages = Math.ceil(totalAddressRecords / pageSize)

const addressesExportPromises = []

for (let pageNumber = 1; pageNumber <= totalAddressPages; pageNumber++) {
addressesExportPromises.push(
fetchAndExportDataFromPage(Address, AddressCollection, pageNumber)
)
}

await Promise.all(addressesExportPromises)

await transaction.commit()
console.log(`Exporting districtID ${districtID} done`)
} catch (error) {
await transaction.rollback()
console.error(`Exporting districtID ${districtID} failed: ${error.message}`)
}
}

const deleteAllDataRelatedToDistrict = async districtID => {
await Promise.all([
mongo.db.collection(DistrictCollection).deleteOne({id: districtID}),
mongo.db.collection(CommonToponymCollection).deleteMany({districtID}),
mongo.db.collection(AddressCollection).deleteMany({districtID})
])
}
19 changes: 19 additions & 0 deletions lib/api/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,22 @@ export const checkIfDistrictsExist = async districtIDs => {
return dataValidationReportFrom(false, 'Some districts do not exist', nonExistingCommonToponymIDs)
}
}

export const addOrUpdateJob = async (queue, jobId, data, delay) => {
try {
const existingJob = await queue.getJob(jobId)

if (existingJob) {
await existingJob.remove()
}

await queue.add(data, {
jobId,
delay,
removeOnComplete: true,
removeOnFail: true
})
} catch (error) {
console.error(error)
}
}
2 changes: 1 addition & 1 deletion lib/util/sequelize.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {Sequelize, DataTypes} from 'sequelize'

// Create a new Sequelize instance
const sequelize = new Sequelize(process.env.POSTGRES_DBNAME, process.env.POSTGRES_USER, process.env.POSTGRES_PASSWORD, {
export const sequelize = new Sequelize(process.env.POSTGRES_DBNAME, process.env.POSTGRES_USER, process.env.POSTGRES_PASSWORD, {
host: process.env.POSTGRES_URL,
dialect: 'postgres',
logging: false
Expand Down
7 changes: 7 additions & 0 deletions worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@ import 'dotenv/config.js' // eslint-disable-line import/no-unassigned-import
import ms from 'ms'

import apiConsumer from './lib/api/consumers/api-consumer.js'
import exportToExploitationDBConsumer from './lib/api/consumers/export-to-exploitation-db-consumer.js'
import cleanJobStatusConsumer from './lib/api/consumers/clean-job-status-consumer.js'

import mongo from './lib/util/mongo.cjs'
import queue from './lib/util/queue.cjs'
import composeCommune from './lib/jobs/compose-commune.cjs'
import computeBanStats from './lib/jobs/compute-ban-stats.cjs'
import balGarbageCollector from './lib/compose/bal-garbage-collector/index.js'
import {init} from './lib/util/sequelize.js'

async function main() {
// Mongo DB : connecting and creating indexes
await mongo.connect()

// Postgres DB : Testing connection and syncing models
await init()

if (process.env.NODE_ENV === 'production') {
// Garbage collector
await balGarbageCollector()
Expand All @@ -27,6 +33,7 @@ async function main() {

// BanID
queue('api').process(1, apiConsumer)
queue('export-to-exploitation-db').process(1, exportToExploitationDBConsumer)
queue('clean-job-status').process(1, cleanJobStatusConsumer)
queue('clean-job-status').add({}, {jobId: 'cleanJobStatusJobId', repeat: {every: ms('1d')}, removeOnComplete: true})
}
Expand Down