diff --git a/.env.sample b/.env.sample index 8fceec6c..13b2098c 100644 --- a/.env.sample +++ b/.env.sample @@ -38,6 +38,8 @@ GAZETTEER_DB_PATH=data/gazetteer.sqlite MAJIC_PATH=/data/majic.sqlite CONTOURS_DATA_PATH=data/communes-50m.sqlite COMMUNES_LOCAUX_ADRESSES_DATA_PATH=data/communes-locaux-adresses.json +CP_PATH=db-migrations/data/cp_group.geojson +DATANOVA_PATH=db-migrations/data/datanova_06.csv # Others DEPARTEMENTS= # Comma separated list of departements for dev only diff --git a/.gitignore b/.gitignore index 3027a08d..5b370814 100644 --- a/.gitignore +++ b/.gitignore @@ -68,4 +68,5 @@ typings/ /toolbox.dev/data # Migration data backup -db-migrations/migrations/data-backup \ No newline at end of file +db-migrations/migrations/data-backup +db-migrations/data/ \ No newline at end of file diff --git a/db-migrations/migrations/20240619135943-init-postal-area-table.cjs b/db-migrations/migrations/20240619135943-init-postal-area-table.cjs new file mode 100644 index 00000000..2f93af3e --- /dev/null +++ b/db-migrations/migrations/20240619135943-init-postal-area-table.cjs @@ -0,0 +1,103 @@ +'use strict' + +const fs = require('fs') +const {Transform} = require('stream') +const JSONStream = require('JSONStream') + +const {POSTGRES_BAN_USER} = process.env +const {CP_PATH} = process.env + +/** @type {import('sequelize-cli').Migration} */ +module.exports = { + async up(queryInterface, Sequelize) { + await queryInterface.sequelize.query('CREATE SCHEMA IF NOT EXISTS external;') + await queryInterface.sequelize.query(`GRANT USAGE ON SCHEMA external TO "${POSTGRES_BAN_USER}";`) + + await queryInterface.createTable('postal_area', { + id: { + type: Sequelize.INTEGER, + primaryKey: true, + autoIncrement: true, + allowNull: false, + }, + postalCode: { + type: Sequelize.STRING, + allowNull: false, + }, + inseeCom: { + type: Sequelize.STRING, + allowNull: false, + }, + geometry: { + type: Sequelize.GEOMETRY, + allowNull: false, + }, + createdAt: { + type: Sequelize.DATE, + allowNull: true, + }, + updatedAt: { + type: Sequelize.DATE, + allowNull: true, + }, + }, { + schema: 'external', + ifNotExists: true, + }) + + await queryInterface.sequelize.query(` + CREATE OR REPLACE FUNCTION update_updated_at_column() + RETURNS TRIGGER AS $$ + BEGIN + NEW."updatedAt" = NOW(); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + `) + + await queryInterface.sequelize.query(` + CREATE TRIGGER update_postal_area_updated_at + BEFORE UPDATE ON external.postal_area + FOR EACH ROW + EXECUTE FUNCTION update_updated_at_column(); + `) + + const {sequelize} = queryInterface + + const insertFeature = async feature => { + const {cp: postalCode, insee_com: inseeCom} = feature.properties + const geom = JSON.stringify(feature.geometry) + const query = ` + INSERT INTO external.postal_area ("postalCode", "inseeCom", geometry, "createdAt", "updatedAt") + VALUES ($1, $2, ST_SetSRID(ST_GeomFromGeoJSON($3), 2154), NOW(), NOW()) + ` + await sequelize.query(query, { + bind: [postalCode, inseeCom, geom], + }) + } + + const stream = fs.createReadStream(CP_PATH) + .pipe(JSONStream.parse('features.*')) + .pipe(new Transform({ + objectMode: true, + async transform(feature, encoding, callback) { + try { + await insertFeature(feature) + callback() + } catch (error) { + callback(error) + } + }, + })) + return new Promise((resolve, reject) => { + stream.on('finish', resolve) + stream.on('error', reject) + }) + }, + + async down(queryInterface, _Sequelize) { + await queryInterface.dropTable({tableName: 'postal_area', schema: 'external'}) + await queryInterface.sequelize.query('DROP FUNCTION IF EXISTS update_updated_at_column() CASCADE;') + await queryInterface.sequelize.query('DROP SCHEMA IF EXISTS external CASCADE;') + }, +} diff --git a/db-migrations/migrations/20240621094048-init-datanova-table.cjs b/db-migrations/migrations/20240621094048-init-datanova-table.cjs new file mode 100644 index 00000000..31c7d248 --- /dev/null +++ b/db-migrations/migrations/20240621094048-init-datanova-table.cjs @@ -0,0 +1,140 @@ +'use strict' + +const fs = require('fs') +const path = require('path') +const Papa = require('papaparse') + +const {DATANOVA_PATH} = process.env + +module.exports = { + async up(queryInterface, Sequelize) { + const transaction = await queryInterface.sequelize.transaction() + try { + await queryInterface.sequelize.query('CREATE SCHEMA IF NOT EXISTS external', {transaction}) + + const {POSTGRES_BAN_USER} = process.env + await queryInterface.sequelize.query(`GRANT USAGE ON SCHEMA external TO "${POSTGRES_BAN_USER}"`, {transaction}) + + await queryInterface.createTable({ + schema: 'external', + tableName: 'datanova' + }, { + id: { + type: Sequelize.INTEGER, + primaryKey: true, + autoIncrement: true + }, + inseeCom: { + type: Sequelize.STRING, + allowNull: false, + }, + postalCodes: { + type: Sequelize.ARRAY(Sequelize.STRING), + allowNull: false, + }, + libelleAcheminementWithPostalCodes: { + type: Sequelize.TEXT, + allowNull: false, + }, + createdAt: { + type: Sequelize.DATE, + allowNull: false, + defaultValue: Sequelize.fn('now') + }, + updatedAt: { + type: Sequelize.DATE, + allowNull: false, + defaultValue: Sequelize.fn('now') + } + }, {transaction}) + + const csvFilePath = path.resolve(DATANOVA_PATH) + + const csvFileContent = fs.readFileSync(csvFilePath, 'utf8') + + console.log('CSV file read successfully') + + const dataRaw = Papa.parse(csvFileContent, { + header: true, + transformHeader(name) { + switch (name.toLowerCase()) { + case 'code_commune_insee': + return 'codeInsee' + case 'nom_de_la_commune': + return 'nomCommune' + case 'code_postal': + return 'codePostal' + case 'libelle_d_acheminement': + return 'libelleAcheminement' + case 'ligne_5': + return 'ligne5' + case '_geopoint': + return 'geopoint' + default: + return name + } + }, + skipEmptyLines: true, + }) + + console.log('CSV file parsed successfully') + + const inseeDataMap = dataRaw.data.reduce((acc, {codeInsee, codePostal, libelleAcheminement}) => { + if (!acc[codeInsee]) { + acc[codeInsee] = { + inseeCom: codeInsee, + postalCodes: new Set(), + libelleAcheminementWithPostalCodes: {}, + createdAt: new Date(), + updatedAt: new Date(), + } + } + + acc[codeInsee].postalCodes.add(codePostal) + if (!acc[codeInsee].libelleAcheminementWithPostalCodes[codePostal]) { + acc[codeInsee].libelleAcheminementWithPostalCodes[codePostal] = libelleAcheminement + } + + return acc + }, {}) + + const formattedData = Object.values(inseeDataMap).map(entry => ({ + ...entry, + postalCodes: [...entry.postalCodes], + libelleAcheminementWithPostalCodes: JSON.stringify(entry.libelleAcheminementWithPostalCodes) + })) + + await queryInterface.bulkInsert({schema: 'external', tableName: 'datanova'}, formattedData, {transaction}) + console.log('Data inserted successfully into external.datanova table') + + // Convert the column to JSONB after insertion + await queryInterface.sequelize.query(` + ALTER TABLE external.datanova + ALTER COLUMN "libelleAcheminementWithPostalCodes" + TYPE JSONB USING "libelleAcheminementWithPostalCodes"::JSONB + `, {transaction}) + console.log('Column libelleAcheminementWithPostalCodes converted to JSONB') + + await transaction.commit() + } catch (error) { + await transaction.rollback() + console.error('Error during migration:', error) + } + }, + + async down(queryInterface) { + const transaction = await queryInterface.sequelize.transaction() + try { + await queryInterface.dropTable({schema: 'external', tableName: 'datanova'}, {transaction}) + console.log('Table external.datanova dropped successfully') + + await queryInterface.sequelize.query('DROP SCHEMA IF EXISTS external CASCADE', {transaction}) + console.log('Schema external dropped successfully') + + await transaction.commit() + } catch (error) { + await transaction.rollback() + console.error('Error during migration rollback:', error) + } + } +} diff --git a/db-migrations/migrations/20240625132013-init-postal-code-view.cjs b/db-migrations/migrations/20240625132013-init-postal-code-view.cjs new file mode 100644 index 00000000..185e925c --- /dev/null +++ b/db-migrations/migrations/20240625132013-init-postal-code-view.cjs @@ -0,0 +1,232 @@ +'use strict' + +const {POSTGRES_BAN_USER} = process.env + +/** @type {import('sequelize-cli').Migration} */ +module.exports = { + async up(queryInterface) { + const bboxBufferAdressView = 50 + const addressBboxBuffer = 200 + const bboxBuffer = 100 + + try { + // Create the address_view + + await queryInterface.sequelize.query(` + CREATE VIEW ban."address_view_cp" AS + WITH address_view AS ( + SELECT + A.*, + ST_Transform(ST_Buffer(ST_Transform(ST_Envelope(ST_SetSRID(ST_GeomFromGeoJSON((A.positions[1])->'geometry'), 4326)), 2154), ${bboxBufferAdressView}, 'join=mitre endcap=square'), 4326) AS bbox + FROM + ban.address AS A + WHERE A."isActive" = true + ORDER BY A.id ASC ), + postal_codes_array AS ( + SELECT + a.*, + array_length(d."postalCodes", 1) AS array_length, + d."postalCodes" AS postalCodes, + b.meta->'insee'->>'cog' AS insee_com, + d."libelleAcheminementWithPostalCodes" + FROM + address_view AS a + LEFT JOIN + ban.district AS b + ON a."districtID" = b.id + LEFT JOIN + external.datanova AS d + ON b.meta->'insee'->>'cog' = d."inseeCom" + ) + SELECT + pca.id, + pca."mainCommonToponymID", + pca."secondaryCommonToponymIDs", + pca."districtID", + pca."number", + pca."suffix", + pca."labels", + pca."certified", + pca."positions", + pca."updateDate", + pca."meta", + pca."range_validity", + pca."isActive", + pca."bbox", + pca.insee_com, + CASE + WHEN pca.array_length = 1 THEN pca.postalCodes[1] + WHEN pca.array_length > 1 + THEN ( + SELECT c."postalCode" + FROM external.postal_area AS c + WHERE pca.insee_com = c."inseeCom" + ORDER BY ST_Area(ST_Intersection(ST_Transform(pca.bbox, 2154), ST_Transform(c.geometry, 2154))) desc + LIMIT 1 + ) + ELSE NULL + END AS postal_code, + CASE + WHEN pca.array_length = 1 THEN pca."libelleAcheminementWithPostalCodes"->>pca.postalCodes[1] + WHEN pca.array_length > 1 + THEN ( + SELECT pca."libelleAcheminementWithPostalCodes"->>c."postalCode" + FROM external.postal_area AS c + WHERE pca.insee_com = c."inseeCom" + ORDER BY ST_Area(ST_Intersection(ST_Transform(pca.bbox, 2154), ST_Transform(c.geometry, 2154))) DESC + LIMIT 1 + ) + ELSE NULL + END AS "libelleAcheminement", + pca.postalCodes, + pca."libelleAcheminementWithPostalCodes", + CASE + WHEN pca.array_length = 1 THEN 'DATANOVA' + WHEN pca.array_length > 1 THEN + CASE + WHEN EXISTS ( + SELECT 1 + FROM external.postal_area AS c + WHERE pca.insee_com = c."inseeCom" + AND ST_Intersects(ST_Transform(pca.bbox, 2154), ST_Transform(c.geometry, 2154)) + ) THEN 'CONTOURS_CP' + ELSE 'DGFIP' + END + ELSE 'DGFIP' + END AS source_cp + FROM + postal_codes_array AS pca + ORDER BY pca.id ASC + `) + await queryInterface.sequelize.query(`GRANT SELECT ON ban."address_view_cp" TO "${POSTGRES_BAN_USER}";`) + + await queryInterface.sequelize.query(` + CREATE VIEW ban."common_toponym_view_cp" AS + WITH common_toponym_view AS( + SELECT + CT.id, CT."districtID", CT.labels, CT.geometry, CT."updateDate", CT.meta, CT.range_validity, CT."isActive", + ST_Centroid(ST_Collect(ST_SetSRID(ST_GeomFromGeoJSON((A.positions[1])->'geometry'), 4326))) AS centroid, + ST_Transform(ST_Buffer(ST_Transform(ST_Envelope(ST_Collect(ST_SetSRID(ST_GeomFromGeoJSON((A.positions[1])->'geometry'), 4326))), 2154), ${addressBboxBuffer}, 'join=mitre endcap=square'), 4326) AS "addressBbox", + ST_Transform(ST_Buffer(ST_Transform(ST_Envelope(ST_SetSRID(ST_GeomFromGeoJSON(CT.geometry), 4326)), 2154), ${bboxBuffer}, 'join=mitre endcap=square'), 4326) AS "bbox", + COUNT(A.id) AS "addressCount", + COUNT(DISTINCT CASE WHEN A.certified = true THEN A.id ELSE NULL END) AS "certifiedAddressCount" + FROM + ban.common_toponym AS CT + LEFT JOIN + ban.address AS A + ON + (CT.id = A."mainCommonToponymID" + OR CT.id = ANY(A."secondaryCommonToponymIDs")) AND A."isActive" = true + WHERE CT."isActive" = true + GROUP BY CT.id + ORDER BY CT.id ASC ), + + postal_codes_array AS ( + SELECT + ct.*, + b.meta->'insee'->>'cog' AS insee_com, + array_length(d."postalCodes", 1) AS array_length, + d."postalCodes" AS postalCodes, + d."libelleAcheminementWithPostalCodes", + CASE + WHEN ct."addressCount" = 0 THEN ct.bbox + ELSE ct."addressBbox" + END AS used_bbox + FROM + common_toponym_view AS ct + LEFT JOIN + ban.district AS b + ON ct."districtID" = b.id + LEFT JOIN + external.datanova AS d + ON b.meta->'insee'->>'cog' = d."inseeCom" + ) + SELECT + pca.id, + pca."districtID", + pca.labels, + pca.geometry, + pca."updateDate", + pca.meta, + pca.range_validity, + pca."isActive", + pca.centroid, + pca."addressBbox", + pca.bbox, + pca."addressCount", + pca."certifiedAddressCount", + pca.insee_com, + CASE + WHEN pca.array_length = 1 THEN pca.postalCodes[1] + WHEN pca.array_length > 1 + THEN ( + SELECT c."postalCode" + FROM external.postal_area AS c + WHERE pca.insee_com = c."inseeCom" + ORDER BY ST_Area(ST_Intersection(ST_Transform(pca.used_bbox, 2154), ST_Transform(c.geometry, 2154))) DESC + LIMIT 1 + ) + ELSE NULL + END AS postal_code, + CASE + WHEN pca.array_length = 1 THEN pca."libelleAcheminementWithPostalCodes"->>pca.postalCodes[1] + WHEN pca.array_length > 1 + THEN ( + SELECT pca."libelleAcheminementWithPostalCodes"->>c."postalCode" + FROM external.postal_area AS c + WHERE pca.insee_com = c."inseeCom" + ORDER BY ST_Area(ST_Intersection(ST_Transform(pca.bbox, 2154), ST_Transform(c.geometry, 2154))) DESC + LIMIT 1 + ) + ELSE NULL + END AS "libelleAcheminement", + pca.postalCodes, + pca."libelleAcheminementWithPostalCodes", + CASE + WHEN pca.array_length = 1 THEN 'DATANOVA' + WHEN pca.array_length > 1 THEN + CASE + WHEN EXISTS ( + SELECT 1 + FROM external.postal_area AS c + WHERE pca.insee_com = c."inseeCom" + AND ST_Intersects(ST_Transform(pca.used_bbox, 2154), ST_Transform(c.geometry, 2154)) + ) THEN 'CONTOURS_CP' + ELSE 'DGFIP' + END + ELSE 'DGFIP' + END AS source_cp, + pca.used_bbox + FROM + postal_codes_array AS pca + ORDER BY pca.id ASC + `) + await queryInterface.sequelize.query(`GRANT SELECT ON ban."common_toponym_view_cp" TO "${POSTGRES_BAN_USER}";`) + await queryInterface.sequelize.query(`GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA ban TO "${POSTGRES_BAN_USER}";`) + await queryInterface.sequelize.query(`GRANT USAGE ON SCHEMA ban TO "${POSTGRES_BAN_USER}";`) + await queryInterface.sequelize.query(`GRANT ALL PRIVILEGES ON SCHEMA ban TO "${POSTGRES_BAN_USER}";`) + await queryInterface.sequelize.query(`GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA ban TO "${POSTGRES_BAN_USER}";`) + await queryInterface.sequelize.query(`GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO "${POSTGRES_BAN_USER}";`) + await queryInterface.sequelize.query(`GRANT USAGE ON SCHEMA public TO "${POSTGRES_BAN_USER}";`) + await queryInterface.sequelize.query(`GRANT ALL PRIVILEGES ON SCHEMA public TO "${POSTGRES_BAN_USER}";`) + await queryInterface.sequelize.query(`GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO "${POSTGRES_BAN_USER}";`) + + // Drop unused view + + await queryInterface.sequelize.query('DROP VIEW IF EXISTS ban."address_view";') + await queryInterface.sequelize.query('DROP VIEW IF EXISTS ban."common_toponym_view";') + } catch (error) { + console.log(error) + } + }, + + async down(queryInterface) { + try { + // Drop the address_view if it exists + await queryInterface.sequelize.query('DROP VIEW IF EXISTS ban."address_view_cp";') + await queryInterface.sequelize.query('DROP VIEW IF EXISTS ban."common_toponym_view_cp";') + } catch (error) { + console.log(error) + } + } +} diff --git a/docker-compose.yml b/docker-compose.yml index ee137314..f9b82f7a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -57,6 +57,8 @@ services: - REDIS_URL=redis://redis - FANTOIR_PATH=${FANTOIR_PATH} - GAZETTEER_DB_PATH=${GAZETTEER_DB_PATH} + - CP_PATH=${CP_PATH} + - DATANOVA_PATH=${DATANOVA_PATH} - CONTOURS_DATA_PATH=${CONTOURS_DATA_PATH} - COMMUNES_LOCAUX_ADRESSES_DATA_PATH=${COMMUNES_LOCAUX_ADRESSES_DATA_PATH} - DEPARTEMENTS=${DEPARTEMENTS} diff --git a/lib/api/consumers/export-to-exploitation-db-consumer.js b/lib/api/consumers/export-to-exploitation-db-consumer.js index f7de0dd0..9559fc7f 100644 --- a/lib/api/consumers/export-to-exploitation-db-consumer.js +++ b/lib/api/consumers/export-to-exploitation-db-consumer.js @@ -36,21 +36,38 @@ const EXPLOITATION_DB_COLLECTION_NAMES = { } // QUERIES -const commonToponymPageQuery = ` +const createCommonToponymTempTableQuery = tempTableName => ` + CREATE TEMP TABLE ${tempTableName} AS SELECT CTV.* FROM - ban.common_toponym_view AS CTV + ban.common_toponym_view_cp AS CTV WHERE CTV."districtID" = :districtID - OFFSET :offset - LIMIT :limit ` -const addressPageQuery = ` + +const createAddressTempTableQuery = tempTableName => ` + CREATE TEMP TABLE ${tempTableName} AS SELECT AV.* FROM - ban.address_view AS AV + ban.address_view_cp AS AV WHERE AV."districtID" = :districtID +` + +const commonToponymPageQuery = tempTableName => ` + SELECT + * + FROM + ${tempTableName} + OFFSET :offset + LIMIT :limit +` + +const addressPageQuery = tempTableName => ` + SELECT + * + FROM + ${tempTableName} OFFSET :offset LIMIT :limit ` @@ -67,6 +84,11 @@ export default async function exportToExploitationDB({data}) { }) try { + // Then start your transaction + const transaction = await sequelize.transaction({ + isolationLevel: Transaction.ISOLATION_LEVELS.REPEATABLE_READ + }) + // Find the district const district = await District.findOne({ where: {id: districtID}, transaction, @@ -117,86 +139,118 @@ export default async function exportToExploitationDB({data}) { // Delete all data related to the district (legacy and banID) await deleteAllLegacyDataRelatedToCOG(cog) - // CommonToponym - // Count the total number of common toponyms and pages to process - const totalCommonToponymPages = Math.ceil(totalCommonToponymRecords / PAGE_SIZE) - - const fetchAndExportDataFromCommonToponymPage = async pageNumber => { - const offset = (pageNumber - 1) * PAGE_SIZE - const [pageData] = await sequelize.query(commonToponymPageQuery, { - replacements: { - districtID, - offset, - limit: PAGE_SIZE}, + // Generate temporary table names based on districtID + const tempCommonToponymTableName = `temp_common_toponym_${cog}` + const tempAddressTableName = `temp_address_${cog}` + + // Drop temporary tables + try { + await sequelize.query(`DROP TABLE IF EXISTS ${tempCommonToponymTableName}`, {transaction}) + await sequelize.query(`DROP TABLE IF EXISTS ${tempAddressTableName}`, {transaction}) + // Create temporary tables + await sequelize.query(createCommonToponymTempTableQuery(tempCommonToponymTableName), { + replacements: {districtID}, transaction, - raw: true, }) - // Format the data and calculate the fantoir code, tiles and postal code - const pageDataWithExtraDataCalculation = pageData.map(commonToponym => calculateExtraDataForCommonToponym(commonToponym, cog, fantoirFinder, commonToponymIDFantoirCodeMap)) - const formatedPageDataForLegacy = pageDataWithExtraDataCalculation.map(commonToponym => formatCommonToponymDataForLegacy(commonToponym, district, pseudoCodeVoieGenerator, commonToponymLegacyIDCommonToponymIDMap, commonToponymLegacyIDSet)) - - // Insert the data in the collection (legacy and banID) - await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.commonToponym).insertMany(formatedPageDataForLegacy, {ordered: false}) - } - - const commonToponymsExportPromises = [] - for (let pageNumber = 1; pageNumber <= totalCommonToponymPages; pageNumber++) { - commonToponymsExportPromises.push(fetchAndExportDataFromCommonToponymPage(pageNumber)) - } - - await Promise.all(commonToponymsExportPromises) - - // Address - // Count the total number of addresses and pages to process - const totalAddressRecords = await Address.count({ - where: { - districtID, - isActive: true - }, - transaction, - }) - const totalAddressPages = Math.ceil(totalAddressRecords / PAGE_SIZE) - - const fetchAndExportDataFromAddressPage = async pageNumber => { - const offset = (pageNumber - 1) * PAGE_SIZE - const [pageData] = await sequelize.query(addressPageQuery, { - replacements: { + console.log(`Temporary table ${tempCommonToponymTableName} created`) + await sequelize.query(createAddressTempTableQuery(tempAddressTableName), { + replacements: {districtID}, + transaction, + }) + console.log(`Temporary table ${tempAddressTableName} created`) + + // CommonToponym + // Count the total number of common toponyms and pages to process + const totalCommonToponymPages = Math.ceil(totalCommonToponymRecords / PAGE_SIZE) + + const fetchAndExportDataFromCommonToponymPage = async pageNumber => { + const offset = (pageNumber - 1) * PAGE_SIZE + const [pageData] = await sequelize.query(commonToponymPageQuery(tempCommonToponymTableName), { + replacements: { + districtID, + offset, + limit: PAGE_SIZE + }, + transaction, + raw: true, + }) + // Format the data and calculate the fantoir code, tiles and postal code + const pageDataWithExtraDataCalculation = pageData.map(commonToponym => calculateExtraDataForCommonToponym(commonToponym, cog, fantoirFinder, commonToponymIDFantoirCodeMap)) + const formatedPageDataForLegacy = pageDataWithExtraDataCalculation.map(commonToponym => formatCommonToponymDataForLegacy(commonToponym, district, pseudoCodeVoieGenerator, commonToponymLegacyIDCommonToponymIDMap, commonToponymLegacyIDSet)) + + // Insert the data in the collection (legacy and banID) + await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.commonToponym).insertMany(formatedPageDataForLegacy, {ordered: false}) + } + + const commonToponymsExportPromises = [] + for (let pageNumber = 1; pageNumber <= totalCommonToponymPages; pageNumber++) { + commonToponymsExportPromises.push(fetchAndExportDataFromCommonToponymPage(pageNumber)) + } + + await Promise.all(commonToponymsExportPromises) + + // Address + // Count the total number of addresses and pages to process + const totalAddressRecords = await Address.count({ + where: { districtID, - offset, - limit: PAGE_SIZE}, + isActive: true + }, transaction, - raw: true, }) - - // Format the data and calculate the fantoir code, tiles and postal code - const pageDataWithExtraDataCalculation = pageData.map(address => calculateExtraDataForAddress(address, cog, commonToponymIDFantoirCodeMap)) - const formatedPageDataForLegacy = pageDataWithExtraDataCalculation.map(address => formatAddressDataForLegacy(address, district, commonToponymLegacyIDCommonToponymIDMap, addressLegacyIDSet)) - - // Insert the data in the collection (legacy and banID) - await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.address).insertMany(formatedPageDataForLegacy, {ordered: false}) - } - - const addressesExportPromises = [] - for (let pageNumber = 1; pageNumber <= totalAddressPages; pageNumber++) { - addressesExportPromises.push(fetchAndExportDataFromAddressPage(pageNumber)) + const totalAddressPages = Math.ceil(totalAddressRecords / PAGE_SIZE) + + const fetchAndExportDataFromAddressPage = async pageNumber => { + const offset = (pageNumber - 1) * PAGE_SIZE + const [pageData] = await sequelize.query(addressPageQuery(tempAddressTableName), { + replacements: { + districtID, + offset, + limit: PAGE_SIZE + }, + transaction, + raw: true, + }) + + // Format the data and calculate the fantoir code, tiles and postal code + const pageDataWithExtraDataCalculation = pageData.map(address => calculateExtraDataForAddress(address, cog, commonToponymIDFantoirCodeMap)) + const formatedPageDataForLegacy = pageDataWithExtraDataCalculation.map(address => formatAddressDataForLegacy(address, district, commonToponymLegacyIDCommonToponymIDMap, addressLegacyIDSet)) + + // Insert the data in the collection (legacy and banID) + await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.address).insertMany(formatedPageDataForLegacy, {ordered: false}) + } + + const addressesExportPromises = [] + for (let pageNumber = 1; pageNumber <= totalAddressPages; pageNumber++) { + addressesExportPromises.push(fetchAndExportDataFromAddressPage(pageNumber)) + } + + await Promise.all(addressesExportPromises) + + // District + // For Legacy collections + const districtFormatedForLegacy = await formatDistrictDataForLegacy(district, totalCommonToponymRecords, totalAddressRecords, transaction) + await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.district).updateOne({codeCommune: cog}, {$set: districtFormatedForLegacy}, {upsert: true}) + + // Pseudo code voie generator saving data + await pseudoCodeVoieGenerator.save() + + // Drop temporary tables + await sequelize.query(`DROP TABLE IF EXISTS ${tempCommonToponymTableName}`, {transaction}) + await sequelize.query(`DROP TABLE IF EXISTS ${tempAddressTableName}`, {transaction}) + } catch (error) { + await sequelize.query(`DROP TABLE IF EXISTS ${tempCommonToponymTableName}`, {transaction}) + await sequelize.query(`DROP TABLE IF EXISTS ${tempAddressTableName}`, {transaction}) + console.error(`Exporting districtID ${districtID} failed: ${error.message}`) } - - await Promise.all(addressesExportPromises) - - // District - // For Legacy collections - const districtFormatedForLegacy = await formatDistrictDataForLegacy(district, totalCommonToponymRecords, totalAddressRecords, transaction) - await mongo.db.collection(EXPLOITATION_DB_COLLECTION_NAMES.district).updateOne({codeCommune: cog}, {$set: districtFormatedForLegacy}, {upsert: true}) - - // Pseudo code voie generator saving data - await pseudoCodeVoieGenerator.save() - // Commit the transaction + await transaction.commit() console.log(`Exporting districtID ${districtID} done`) } catch (error) { await transaction.rollback() - console.error(`Exporting districtID ${districtID} failed: ${error}`) + console.error(`Exporting districtID ${districtID} failed: ${error.message}`) + throw error } } @@ -274,19 +328,44 @@ const calculateFantoirCode = (fantoirFinder, labelValue, codeAncienneCommune) => // Helpers to calculate the postal code const calculateCommonToponymPostalCode = (commonToponymIDFantoirCodeMap, commonToponym, cog) => { - const fantoirCode = commonToponymIDFantoirCodeMap.get(commonToponym.id) - const {codePostal, libelleAcheminement} = findCodePostal(cog, fantoirCode) - return {codePostal, libelleAcheminement} + try { + const fantoirCode = commonToponymIDFantoirCodeMap.get(commonToponym.id) + let codePostal + let libelleAcheminement + if (commonToponym.postal_code === null || commonToponym.libelleAcheminement === null || commonToponym.source_cp === 'DGFIP') { + ({codePostal, libelleAcheminement} = findCodePostal(cog, fantoirCode)) + } else { + codePostal = commonToponym.postal_code + libelleAcheminement = commonToponym.libelleAcheminement + } + + return {codePostal, libelleAcheminement} + } catch (error) { + console.error('Error querying database:', error) + throw error + } } const calculateAddressPostalCode = (commonToponymIDFantoirCodeMap, address, cog) => { - const fantoirCode = commonToponymIDFantoirCodeMap.get(address.mainCommonToponymID) - const {number, suffix} = address - const {codePostal, libelleAcheminement} = findCodePostal(cog, fantoirCode, number, suffix) - return {codePostal, libelleAcheminement} + try { + const fantoirCode = commonToponymIDFantoirCodeMap.get(address.mainCommonToponymID) + const {number, suffix} = address + let codePostal + let libelleAcheminement + if (address.postal_code === null || address.libelleAcheminement === null || address.source_cp === 'DGFIP') { + ({codePostal, libelleAcheminement} = findCodePostal(cog, fantoirCode, number, suffix)) + } else { + codePostal = address.postal_code + libelleAcheminement = address.libelleAcheminement + } + + return {codePostal, libelleAcheminement} + } catch (error) { + console.error('Error querying database:', error) + throw error + } } -// Helpers to calculate the tiles const calculateCommonToponymGeometryAndTiles = commonToponym => { const {geometry: geometryFromCommonToponym, centroid} = commonToponym let geometryFromCentroid