Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[From Postgres to Mongo] : Added temp collections to lower data unavailability time #461

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ services:
- FORCE_DOWNLOAD_CONTOUR=
- FORCE_DOWNLOAD_DATASETS=
- IS_GENERATE_BANID_ON_ASSEMBLY=${IS_GENERATE_BANID_ON_ASSEMBLY}
- CP_PATH=${CP_PATH}
- DATANOVA_PATH=${DATANOVA_PATH}
ports:
- "${PORT:-5000}:5000"
volumes:
Expand Down
264 changes: 189 additions & 75 deletions lib/api/consumers/export-to-exploitation-db-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {Transaction} from 'sequelize'
import {createFantoirCommune} from '@ban-team/fantoir'
import {findCodePostal} from 'codes-postaux/full.js'
import mongo from '../../util/mongo.cjs'
import {sequelize, District, CommonToponym, Address} from '../../util/sequelize.js'
import {sequelize, District, CommonToponym} from '../../util/sequelize.js'
import {derivePositionProps} from '../../util/geo.cjs'
import {createPseudoCodeVoieGenerator} from '../../pseudo-codes-voies.cjs'

Expand Down Expand Up @@ -54,7 +54,7 @@ const createAddressTempTableQuery = tempTableName => `
WHERE AV."districtID" = :districtID
`

const commonToponymPageQuery = tempTableName => `
const pageQuery = tempTableName => `
SELECT
*
FROM
Expand All @@ -63,13 +63,27 @@ const commonToponymPageQuery = tempTableName => `
LIMIT :limit
`

const addressPageQuery = tempTableName => `
const countQuery = tempTableName => `
SELECT
*
COUNT(*)
FROM
${tempTableName}
OFFSET :offset
LIMIT :limit
`

const specificCommonToponymTempTableCountQuery = tempTableName => `
SELECT
COUNT(*)
FROM
${tempTableName}
WHERE meta->'bal'->>'isLieuDit' = 'true';
`

const addressCertifiedTempTableCountQuery = tempTableName => `
SELECT
COUNT(*)
FROM
${tempTableName}
WHERE certified = TRUE;
`

export default async function exportToExploitationDB({data}) {
Expand Down Expand Up @@ -130,51 +144,78 @@ export default async function exportToExploitationDB({data}) {
return
}

// Clean collections
// Delete all data related to the district (legacy and banID)
await deleteAllLegacyDataRelatedToCOG(cog)

// Generate temporary table names based on districtID
// Setting temporary tables and collections
// Temporary table names
const tempCommonToponymTableName = `temp_common_toponym_${cog}`
const tempAddressTableName = `temp_address_${cog}`

// Drop temporary tables
try {
await sequelize.query(`DROP TABLE IF EXISTS ${tempCommonToponymTableName}`, {transaction})
await sequelize.query(`DROP TABLE IF EXISTS ${tempAddressTableName}`, {transaction})
// Create temporary tables
await sequelize.query(createCommonToponymTempTableQuery(tempCommonToponymTableName), {
replacements: {districtID},
transaction,
})
console.log(`Temporary table ${tempCommonToponymTableName} created`)
await sequelize.query(createAddressTempTableQuery(tempAddressTableName), {
replacements: {districtID},
transaction,
})
console.log(`Temporary table ${tempAddressTableName} created`)
// Temporary collections names
const tempCommonToponymCollectionName = `temp_${EXPLOITATION_DB_COLLECTION_NAMES.commonToponym}_${cog}`
const tempAddressCollectionName = `temp_${EXPLOITATION_DB_COLLECTION_NAMES.address}_${cog}`
const tempDistrictCollectionName = `temp_${EXPLOITATION_DB_COLLECTION_NAMES.district}_${cog}`

// CommonToponym
// Count the total number of common toponyms and pages to process
const totalCommonToponymPages = Math.ceil(totalCommonToponymRecords / PAGE_SIZE)
// Temporary collections references
const tempCommonToponymCollection = mongo.db.collection(tempCommonToponymCollectionName)
const tempAddressCollection = mongo.db.collection(tempAddressCollectionName)
const tempDistrictCollection = mongo.db.collection(tempDistrictCollectionName)

const fetchAndExportDataFromCommonToponymPage = async pageNumber => {
const offset = (pageNumber - 1) * PAGE_SIZE
const [pageData] = await sequelize.query(commonToponymPageQuery(tempCommonToponymTableName), {
try {
// Drop temporary tables and collections if they exist to be sure to start from a clean state
await deleteTempTables([tempCommonToponymTableName, tempAddressTableName])
await deleteTempCollections([tempCommonToponymCollection, tempAddressCollection, tempDistrictCollection])

// Create temporary tables
await sequelize.query(
createCommonToponymTempTableQuery(tempCommonToponymTableName),
{
replacements: {
districtID,
offset,
limit: PAGE_SIZE
districtID
},
transaction,
raw: true,
})
// Format the data and calculate the fantoir code, tiles and postal code
const pageDataWithExtraDataCalculation = pageData.map(commonToponym => calculateExtraDataForCommonToponym(commonToponym, cog, fantoirFinder, commonToponymIDFantoirCodeMap))
const formatedPageDataForLegacy = pageDataWithExtraDataCalculation.map(commonToponym => formatCommonToponymDataForLegacy(commonToponym, district, pseudoCodeVoieGenerator, commonToponymLegacyIDCommonToponymIDMap, commonToponymLegacyIDSet))
await sequelize.query(
createAddressTempTableQuery(tempAddressTableName),
{
replacements: {
tempTable: tempAddressTableName,
districtID
},
transaction,
})
// CommonToponym
// Count the total number of common toponyms and pages to process
const commonToponymTempTableCountQueryResult = await sequelize.query(
countQuery(tempCommonToponymTableName),
{
type: sequelize.QueryTypes.SELECT,
transaction,
})
const totalCommonToponymTempTableRecordsResult = Number(commonToponymTempTableCountQueryResult?.[0]?.count)
const totalCommonToponymPages = Math.ceil(totalCommonToponymTempTableRecordsResult / PAGE_SIZE)

// Insert the data in the collection (legacy and banID)
await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.commonToponym).insertMany(formatedPageDataForLegacy, {ordered: false})
const fetchAndExportDataFromCommonToponymPage = async pageNumber => {
try {
const offset = (pageNumber - 1) * PAGE_SIZE
const pageData = await sequelize.query(
pageQuery(tempCommonToponymTableName),
{
replacements: {
offset,
limit: PAGE_SIZE
},
type: sequelize.QueryTypes.SELECT,
transaction,
})
// Format the data and calculate the fantoir code, tiles and postal code
const pageDataWithExtraDataCalculation = pageData.map(commonToponym => calculateExtraDataForCommonToponym(commonToponym, cog, fantoirFinder, commonToponymIDFantoirCodeMap))
const formatedPageDataForLegacy = pageDataWithExtraDataCalculation.map(commonToponym => formatCommonToponymDataForLegacy(commonToponym, district, pseudoCodeVoieGenerator, commonToponymLegacyIDCommonToponymIDMap, commonToponymLegacyIDSet))

// Insert the data in the temp collection
await tempCommonToponymCollection.insertMany(formatedPageDataForLegacy, {ordered: false})
} catch (error) {
console.error(`Error exporting common toponym page ${pageNumber}: ${error.message}`)
throw error
}
}

const commonToponymsExportPromises = []
Expand All @@ -186,33 +227,34 @@ export default async function exportToExploitationDB({data}) {

// Address
// Count the total number of addresses and pages to process
const totalAddressRecords = await Address.count({
where: {
districtID,
isActive: true
},
transaction,
})
const addressTempTableCountQueryResult = await sequelize.query(
countQuery(tempAddressTableName),
{
type: sequelize.QueryTypes.SELECT,
transaction,
})
const totalAddressRecords = Number(addressTempTableCountQueryResult?.[0]?.count)
const totalAddressPages = Math.ceil(totalAddressRecords / PAGE_SIZE)

const fetchAndExportDataFromAddressPage = async pageNumber => {
const offset = (pageNumber - 1) * PAGE_SIZE
const [pageData] = await sequelize.query(addressPageQuery(tempAddressTableName), {
replacements: {
districtID,
offset,
limit: PAGE_SIZE
},
transaction,
raw: true,
})
const pageData = await sequelize.query(
pageQuery(tempAddressTableName),
{
replacements: {
offset,
limit: PAGE_SIZE
},
type: sequelize.QueryTypes.SELECT,
transaction,
})

// Format the data and calculate the fantoir code, tiles and postal code
const pageDataWithExtraDataCalculation = pageData.map(address => calculateExtraDataForAddress(address, cog, commonToponymIDFantoirCodeMap))
const formatedPageDataForLegacy = pageDataWithExtraDataCalculation.map(address => formatAddressDataForLegacy(address, district, commonToponymLegacyIDCommonToponymIDMap, addressLegacyIDSet))

// Insert the data in the collection (legacy and banID)
await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.address).insertMany(formatedPageDataForLegacy, {ordered: false})
// Insert the data in the temp collection
tempAddressCollection.insertMany(formatedPageDataForLegacy, {ordered: false})
}

const addressesExportPromises = []
Expand All @@ -223,24 +265,53 @@ export default async function exportToExploitationDB({data}) {
await Promise.all(addressesExportPromises)

// District
// For Legacy collections
const districtFormatedForLegacy = await formatDistrictDataForLegacy(district, totalCommonToponymRecords, totalAddressRecords, transaction)
await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.district).updateOne({codeCommune: cog}, {$set: districtFormatedForLegacy}, {upsert: true})
// Count the total number of "lieu-dit" common toponym used for the district legacy format
const specificCommonToponymTempTableCountQueryResult = await sequelize.query(
specificCommonToponymTempTableCountQuery(tempCommonToponymTableName),
{
type: sequelize.QueryTypes.SELECT,
transaction,
})
const totalSpecifCommonToponymRecords = Number(specificCommonToponymTempTableCountQueryResult?.[0]?.count)

// Count the total number of certified address used for the district legacy format
const addressCertifiedTempTableCountQueryResult = await sequelize.query(
addressCertifiedTempTableCountQuery(tempAddressTableName),
{
type: sequelize.QueryTypes.SELECT,
transaction,
})
const totalAddressCertifiedRecords = Number(addressCertifiedTempTableCountQueryResult?.[0]?.count)

// Commit the transaction once the temporary tables are created
await transaction.commit()

// Format the district data for the legacy format
const districtFormatedForLegacy = await formatDistrictDataForLegacy(district, {totalCommonToponymRecords, totalSpecifCommonToponymRecords, totalAddressRecords, totalAddressCertifiedRecords})

// Insert the data in the temp collection
await tempDistrictCollection.insertOne(districtFormatedForLegacy)

// Pseudo code voie generator saving data
await pseudoCodeVoieGenerator.save()

// Drop the old data
await deleteOldDataFromFinaleCollections(cog)

// Merge the temporary tables into the final collections
await mergeTempCollectionsIntoFinaleCollections(tempDistrictCollection, tempCommonToponymCollection, tempAddressCollection)

// Drop temporary collections
await deleteTempCollections([tempDistrictCollection, tempCommonToponymCollection, tempAddressCollection])

// Drop temporary tables
await sequelize.query(`DROP TABLE IF EXISTS ${tempCommonToponymTableName}`, {transaction})
await sequelize.query(`DROP TABLE IF EXISTS ${tempAddressTableName}`, {transaction})
await deleteTempTables([tempCommonToponymTableName, tempAddressTableName])
} catch (error) {
await sequelize.query(`DROP TABLE IF EXISTS ${tempCommonToponymTableName}`, {transaction})
await sequelize.query(`DROP TABLE IF EXISTS ${tempAddressTableName}`, {transaction})
console.error(`Exporting districtID ${districtID} failed: ${error.message}`)
await deleteTempCollections([tempDistrictCollection, tempCommonToponymCollection, tempAddressCollection])
await deleteTempTables([tempCommonToponymTableName, tempAddressTableName])
throw error
}
// Commit the transaction

await transaction.commit()
console.log(`Exporting districtID ${districtID} done`)
} catch (error) {
await transaction.rollback()
Expand All @@ -251,13 +322,55 @@ export default async function exportToExploitationDB({data}) {

// Helpers

// Helpers for exploitation DB
// Helpers for the temporary tables and collections
const deleteOldDataFromFinaleCollections = async cog => {
await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.district).deleteOne({codeCommune: cog})
await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.commonToponym).deleteMany({codeCommune: cog})
await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.address).deleteMany({codeCommune: cog})
}

const mergeTempCollectionsIntoFinaleCollections = async (tempDistrictCollection, tempCommonToponymCollection, tempAddressCollection) => {
const collectionsToMerge = [
{tempCollection: tempDistrictCollection, finalCollectionName: EXPLOITATION_DB_COLLECTION_NAMES.district},
{tempCollection: tempCommonToponymCollection, finalCollectionName: EXPLOITATION_DB_COLLECTION_NAMES.commonToponym},
{tempCollection: tempAddressCollection, finalCollectionName: EXPLOITATION_DB_COLLECTION_NAMES.address}
]

const mergeCollection = async ({tempCollection, finalCollectionName}) => {
await tempCollection.aggregate([
{$match: {}},
{
$merge: {
into: finalCollectionName
}
}
]).toArray()
}

const promises = collectionsToMerge.map(collectionToMerge => mergeCollection(collectionToMerge))
await Promise.all(promises)
}

const deleteTempCollections = async collectionReferences => {
// Get the list of existing collections
const existingCollections = await mongo.db.listCollections().toArray()

// Extract the names of the existing collections
const existingCollectionNames = new Set(existingCollections.map(collection => collection.name))

const promises = collectionReferences.map(async collectionReference => {
// Check if the collection exists
if (existingCollectionNames.has(collectionReference.collectionName)) {
// Drop the collection if it exists
await collectionReference.drop()
}
})
await Promise.all(promises)
}

const deleteAllLegacyDataRelatedToCOG = async cog => {
await Promise.all([
mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.commonToponym).deleteMany({codeCommune: cog}),
mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.address).deleteMany({codeCommune: cog}),
])
const deleteTempTables = async tableNames => {
const promises = tableNames.map(tableName => sequelize.query(`DROP TABLE IF EXISTS ${tableName}`))
await Promise.all(promises)
}

// Helpers for calculation
Expand Down Expand Up @@ -361,6 +474,7 @@ const calculateAddressPostalCode = (commonToponymIDFantoirCodeMap, address, cog)
}
}

// Helpers to calculate the geometry and tiles
const calculateCommonToponymGeometryAndTiles = commonToponym => {
const {geometry: geometryFromCommonToponym, centroid} = commonToponym
let geometryFromCentroid
Expand Down
Loading
Loading