Skip to content

Commit

Permalink
Merge pull request #461 from BaseAdresseNationale/antoineludeau/impro…
Browse files Browse the repository at this point in the history
…ve-export-with-temp-collection

[From Postgres to Mongo] : Added temp collections to lower data unavailability time
  • Loading branch information
antoineludeau authored Sep 18, 2024
2 parents a2dc030 + 787ec7e commit e013897
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 97 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ services:
- FORCE_DOWNLOAD_CONTOUR=
- FORCE_DOWNLOAD_DATASETS=
- IS_GENERATE_BANID_ON_ASSEMBLY=${IS_GENERATE_BANID_ON_ASSEMBLY}
- CP_PATH=${CP_PATH}
- DATANOVA_PATH=${DATANOVA_PATH}
ports:
- "${PORT:-5000}:5000"
volumes:
Expand Down
264 changes: 189 additions & 75 deletions lib/api/consumers/export-to-exploitation-db-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {Transaction} from 'sequelize'
import {createFantoirCommune} from '@ban-team/fantoir'
import {findCodePostal} from 'codes-postaux/full.js'
import mongo from '../../util/mongo.cjs'
import {sequelize, District, CommonToponym, Address} from '../../util/sequelize.js'
import {sequelize, District, CommonToponym} from '../../util/sequelize.js'
import {derivePositionProps} from '../../util/geo.cjs'
import {createPseudoCodeVoieGenerator} from '../../pseudo-codes-voies.cjs'

Expand Down Expand Up @@ -54,7 +54,7 @@ const createAddressTempTableQuery = tempTableName => `
WHERE AV."districtID" = :districtID
`

const commonToponymPageQuery = tempTableName => `
const pageQuery = tempTableName => `
SELECT
*
FROM
Expand All @@ -63,13 +63,27 @@ const commonToponymPageQuery = tempTableName => `
LIMIT :limit
`

const addressPageQuery = tempTableName => `
const countQuery = tempTableName => `
SELECT
*
COUNT(*)
FROM
${tempTableName}
OFFSET :offset
LIMIT :limit
`

const specificCommonToponymTempTableCountQuery = tempTableName => `
SELECT
COUNT(*)
FROM
${tempTableName}
WHERE meta->'bal'->>'isLieuDit' = 'true';
`

const addressCertifiedTempTableCountQuery = tempTableName => `
SELECT
COUNT(*)
FROM
${tempTableName}
WHERE certified = TRUE;
`

export default async function exportToExploitationDB({data}) {
Expand Down Expand Up @@ -130,51 +144,78 @@ export default async function exportToExploitationDB({data}) {
return
}

// Clean collections
// Delete all data related to the district (legacy and banID)
await deleteAllLegacyDataRelatedToCOG(cog)

// Generate temporary table names based on districtID
// Setting temporary tables and collections
// Temporary table names
const tempCommonToponymTableName = `temp_common_toponym_${cog}`
const tempAddressTableName = `temp_address_${cog}`

// Drop temporary tables
try {
await sequelize.query(`DROP TABLE IF EXISTS ${tempCommonToponymTableName}`, {transaction})
await sequelize.query(`DROP TABLE IF EXISTS ${tempAddressTableName}`, {transaction})
// Create temporary tables
await sequelize.query(createCommonToponymTempTableQuery(tempCommonToponymTableName), {
replacements: {districtID},
transaction,
})
console.log(`Temporary table ${tempCommonToponymTableName} created`)
await sequelize.query(createAddressTempTableQuery(tempAddressTableName), {
replacements: {districtID},
transaction,
})
console.log(`Temporary table ${tempAddressTableName} created`)
// Temporary collections names
const tempCommonToponymCollectionName = `temp_${EXPLOITATION_DB_COLLECTION_NAMES.commonToponym}_${cog}`
const tempAddressCollectionName = `temp_${EXPLOITATION_DB_COLLECTION_NAMES.address}_${cog}`
const tempDistrictCollectionName = `temp_${EXPLOITATION_DB_COLLECTION_NAMES.district}_${cog}`

// CommonToponym
// Count the total number of common toponyms and pages to process
const totalCommonToponymPages = Math.ceil(totalCommonToponymRecords / PAGE_SIZE)
// Temporary collections references
const tempCommonToponymCollection = mongo.db.collection(tempCommonToponymCollectionName)
const tempAddressCollection = mongo.db.collection(tempAddressCollectionName)
const tempDistrictCollection = mongo.db.collection(tempDistrictCollectionName)

const fetchAndExportDataFromCommonToponymPage = async pageNumber => {
const offset = (pageNumber - 1) * PAGE_SIZE
const [pageData] = await sequelize.query(commonToponymPageQuery(tempCommonToponymTableName), {
try {
// Drop temporary tables and collections if they exist to be sure to start from a clean state
await deleteTempTables([tempCommonToponymTableName, tempAddressTableName])
await deleteTempCollections([tempCommonToponymCollection, tempAddressCollection, tempDistrictCollection])

// Create temporary tables
await sequelize.query(
createCommonToponymTempTableQuery(tempCommonToponymTableName),
{
replacements: {
districtID,
offset,
limit: PAGE_SIZE
districtID
},
transaction,
raw: true,
})
// Format the data and calculate the fantoir code, tiles and postal code
const pageDataWithExtraDataCalculation = pageData.map(commonToponym => calculateExtraDataForCommonToponym(commonToponym, cog, fantoirFinder, commonToponymIDFantoirCodeMap))
const formatedPageDataForLegacy = pageDataWithExtraDataCalculation.map(commonToponym => formatCommonToponymDataForLegacy(commonToponym, district, pseudoCodeVoieGenerator, commonToponymLegacyIDCommonToponymIDMap, commonToponymLegacyIDSet))
await sequelize.query(
createAddressTempTableQuery(tempAddressTableName),
{
replacements: {
tempTable: tempAddressTableName,
districtID
},
transaction,
})
// CommonToponym
// Count the total number of common toponyms and pages to process
const commonToponymTempTableCountQueryResult = await sequelize.query(
countQuery(tempCommonToponymTableName),
{
type: sequelize.QueryTypes.SELECT,
transaction,
})
const totalCommonToponymTempTableRecordsResult = Number(commonToponymTempTableCountQueryResult?.[0]?.count)
const totalCommonToponymPages = Math.ceil(totalCommonToponymTempTableRecordsResult / PAGE_SIZE)

// Insert the data in the collection (legacy and banID)
await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.commonToponym).insertMany(formatedPageDataForLegacy, {ordered: false})
const fetchAndExportDataFromCommonToponymPage = async pageNumber => {
try {
const offset = (pageNumber - 1) * PAGE_SIZE
const pageData = await sequelize.query(
pageQuery(tempCommonToponymTableName),
{
replacements: {
offset,
limit: PAGE_SIZE
},
type: sequelize.QueryTypes.SELECT,
transaction,
})
// Format the data and calculate the fantoir code, tiles and postal code
const pageDataWithExtraDataCalculation = pageData.map(commonToponym => calculateExtraDataForCommonToponym(commonToponym, cog, fantoirFinder, commonToponymIDFantoirCodeMap))
const formatedPageDataForLegacy = pageDataWithExtraDataCalculation.map(commonToponym => formatCommonToponymDataForLegacy(commonToponym, district, pseudoCodeVoieGenerator, commonToponymLegacyIDCommonToponymIDMap, commonToponymLegacyIDSet))

// Insert the data in the temp collection
await tempCommonToponymCollection.insertMany(formatedPageDataForLegacy, {ordered: false})
} catch (error) {
console.error(`Error exporting common toponym page ${pageNumber}: ${error.message}`)
throw error
}
}

const commonToponymsExportPromises = []
Expand All @@ -186,33 +227,34 @@ export default async function exportToExploitationDB({data}) {

// Address
// Count the total number of addresses and pages to process
const totalAddressRecords = await Address.count({
where: {
districtID,
isActive: true
},
transaction,
})
const addressTempTableCountQueryResult = await sequelize.query(
countQuery(tempAddressTableName),
{
type: sequelize.QueryTypes.SELECT,
transaction,
})
const totalAddressRecords = Number(addressTempTableCountQueryResult?.[0]?.count)
const totalAddressPages = Math.ceil(totalAddressRecords / PAGE_SIZE)

const fetchAndExportDataFromAddressPage = async pageNumber => {
const offset = (pageNumber - 1) * PAGE_SIZE
const [pageData] = await sequelize.query(addressPageQuery(tempAddressTableName), {
replacements: {
districtID,
offset,
limit: PAGE_SIZE
},
transaction,
raw: true,
})
const pageData = await sequelize.query(
pageQuery(tempAddressTableName),
{
replacements: {
offset,
limit: PAGE_SIZE
},
type: sequelize.QueryTypes.SELECT,
transaction,
})

// Format the data and calculate the fantoir code, tiles and postal code
const pageDataWithExtraDataCalculation = pageData.map(address => calculateExtraDataForAddress(address, cog, commonToponymIDFantoirCodeMap))
const formatedPageDataForLegacy = pageDataWithExtraDataCalculation.map(address => formatAddressDataForLegacy(address, district, commonToponymLegacyIDCommonToponymIDMap, addressLegacyIDSet))

// Insert the data in the collection (legacy and banID)
await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.address).insertMany(formatedPageDataForLegacy, {ordered: false})
// Insert the data in the temp collection
tempAddressCollection.insertMany(formatedPageDataForLegacy, {ordered: false})
}

const addressesExportPromises = []
Expand All @@ -223,24 +265,53 @@ export default async function exportToExploitationDB({data}) {
await Promise.all(addressesExportPromises)

// District
// For Legacy collections
const districtFormatedForLegacy = await formatDistrictDataForLegacy(district, totalCommonToponymRecords, totalAddressRecords, transaction)
await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.district).updateOne({codeCommune: cog}, {$set: districtFormatedForLegacy}, {upsert: true})
// Count the total number of "lieu-dit" common toponym used for the district legacy format
const specificCommonToponymTempTableCountQueryResult = await sequelize.query(
specificCommonToponymTempTableCountQuery(tempCommonToponymTableName),
{
type: sequelize.QueryTypes.SELECT,
transaction,
})
const totalSpecifCommonToponymRecords = Number(specificCommonToponymTempTableCountQueryResult?.[0]?.count)

// Count the total number of certified address used for the district legacy format
const addressCertifiedTempTableCountQueryResult = await sequelize.query(
addressCertifiedTempTableCountQuery(tempAddressTableName),
{
type: sequelize.QueryTypes.SELECT,
transaction,
})
const totalAddressCertifiedRecords = Number(addressCertifiedTempTableCountQueryResult?.[0]?.count)

// Commit the transaction once the temporary tables are created
await transaction.commit()

// Format the district data for the legacy format
const districtFormatedForLegacy = await formatDistrictDataForLegacy(district, {totalCommonToponymRecords, totalSpecifCommonToponymRecords, totalAddressRecords, totalAddressCertifiedRecords})

// Insert the data in the temp collection
await tempDistrictCollection.insertOne(districtFormatedForLegacy)

// Pseudo code voie generator saving data
await pseudoCodeVoieGenerator.save()

// Drop the old data
await deleteOldDataFromFinaleCollections(cog)

// Merge the temporary tables into the final collections
await mergeTempCollectionsIntoFinaleCollections(tempDistrictCollection, tempCommonToponymCollection, tempAddressCollection)

// Drop temporary collections
await deleteTempCollections([tempDistrictCollection, tempCommonToponymCollection, tempAddressCollection])

// Drop temporary tables
await sequelize.query(`DROP TABLE IF EXISTS ${tempCommonToponymTableName}`, {transaction})
await sequelize.query(`DROP TABLE IF EXISTS ${tempAddressTableName}`, {transaction})
await deleteTempTables([tempCommonToponymTableName, tempAddressTableName])
} catch (error) {
await sequelize.query(`DROP TABLE IF EXISTS ${tempCommonToponymTableName}`, {transaction})
await sequelize.query(`DROP TABLE IF EXISTS ${tempAddressTableName}`, {transaction})
console.error(`Exporting districtID ${districtID} failed: ${error.message}`)
await deleteTempCollections([tempDistrictCollection, tempCommonToponymCollection, tempAddressCollection])
await deleteTempTables([tempCommonToponymTableName, tempAddressTableName])
throw error
}
// Commit the transaction

await transaction.commit()
console.log(`Exporting districtID ${districtID} done`)
} catch (error) {
await transaction.rollback()
Expand All @@ -251,13 +322,55 @@ export default async function exportToExploitationDB({data}) {

// Helpers

// Helpers for exploitation DB
// Helpers for the temporary tables and collections
const deleteOldDataFromFinaleCollections = async cog => {
await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.district).deleteOne({codeCommune: cog})
await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.commonToponym).deleteMany({codeCommune: cog})
await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.address).deleteMany({codeCommune: cog})
}

const mergeTempCollectionsIntoFinaleCollections = async (tempDistrictCollection, tempCommonToponymCollection, tempAddressCollection) => {
const collectionsToMerge = [
{tempCollection: tempDistrictCollection, finalCollectionName: EXPLOITATION_DB_COLLECTION_NAMES.district},
{tempCollection: tempCommonToponymCollection, finalCollectionName: EXPLOITATION_DB_COLLECTION_NAMES.commonToponym},
{tempCollection: tempAddressCollection, finalCollectionName: EXPLOITATION_DB_COLLECTION_NAMES.address}
]

const mergeCollection = async ({tempCollection, finalCollectionName}) => {
await tempCollection.aggregate([
{$match: {}},
{
$merge: {
into: finalCollectionName
}
}
]).toArray()
}

const promises = collectionsToMerge.map(collectionToMerge => mergeCollection(collectionToMerge))
await Promise.all(promises)
}

const deleteTempCollections = async collectionReferences => {
// Get the list of existing collections
const existingCollections = await mongo.db.listCollections().toArray()

// Extract the names of the existing collections
const existingCollectionNames = new Set(existingCollections.map(collection => collection.name))

const promises = collectionReferences.map(async collectionReference => {
// Check if the collection exists
if (existingCollectionNames.has(collectionReference.collectionName)) {
// Drop the collection if it exists
await collectionReference.drop()
}
})
await Promise.all(promises)
}

const deleteAllLegacyDataRelatedToCOG = async cog => {
await Promise.all([
mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.commonToponym).deleteMany({codeCommune: cog}),
mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.address).deleteMany({codeCommune: cog}),
])
const deleteTempTables = async tableNames => {
const promises = tableNames.map(tableName => sequelize.query(`DROP TABLE IF EXISTS ${tableName}`))
await Promise.all(promises)
}

// Helpers for calculation
Expand Down Expand Up @@ -361,6 +474,7 @@ const calculateAddressPostalCode = (commonToponymIDFantoirCodeMap, address, cog)
}
}

// Helpers to calculate the geometry and tiles
const calculateCommonToponymGeometryAndTiles = commonToponym => {
const {geometry: geometryFromCommonToponym, centroid} = commonToponym
let geometryFromCentroid
Expand Down
Loading

0 comments on commit e013897

Please sign in to comment.