From 12f36b5904239a3d3a6fab3409aa783247ccb5dd Mon Sep 17 00:00:00 2001 From: Moroine Bentefrit Date: Fri, 6 Dec 2024 16:36:26 +0100 Subject: [PATCH] fix: sauvegarde de la formation avant fiabilisation --- .../src/common/actions/effectifs.actions.ts | 28 +++- .../actions/effectifs.statut.actions.ts | 14 +- .../fiabilisation-certification.ts | 27 ++-- .../src/jobs/hydrate/deca/hydrate-deca-raw.ts | 20 +-- .../update-effectifs-lieu-de-formation.ts | 7 +- .../src/jobs/ingestion/process-ingestion.ts | 121 ++++++++++-------- server/src/jobs/jobs.ts | 28 ++-- shared/models/data/effectifs.model.ts | 6 +- 8 files changed, 143 insertions(+), 108 deletions(-) diff --git a/server/src/common/actions/effectifs.actions.ts b/server/src/common/actions/effectifs.actions.ts index 08a416369..1c34923d4 100644 --- a/server/src/common/actions/effectifs.actions.ts +++ b/server/src/common/actions/effectifs.actions.ts @@ -1,7 +1,7 @@ import type { ICertification } from "api-alternance-sdk"; import Boom from "boom"; import { cloneDeep, isObject, merge, mergeWith, reduce, set, uniqBy } from "lodash-es"; -import { ObjectId, WithoutId } from "mongodb"; +import { ObjectId, type WithoutId } from "mongodb"; import { IOpcos, IRncp } from "shared/models"; import { IEffectif } from "shared/models/data/effectifs.model"; import { IEffectifDECA } from "shared/models/data/effectifsDECA.model"; @@ -127,23 +127,23 @@ export const lockEffectif = async (effectif: IEffectif) => { return updated.value; }; -export const addComputedFields = async ({ +export const addComputedFields = async >({ organisme, effectif, certification, }: { organisme?: IOrganisme; - effectif?: IEffectif | WithoutId; + effectif?: T; certification: ICertification | null; -}): Promise> => { - const computedFields: Partial = {}; +}): Promise => { + const computedFields: IEffectif["_computed"] = {}; if (organisme) { computedFields.organisme = generateOrganismeComputed(organisme); } if (effectif) { - const statut = createComputedStatutObject(effectif as IEffectif, new Date()); + const statut = createComputedStatutObject(effectif, new Date()); computedFields.statut = statut; } @@ -161,6 +161,22 @@ export const addComputedFields = async ({ return computedFields; }; +export const withComputedFields = async >( + effectif: T, + { + organisme, + certification, + }: { + organisme?: IOrganisme; + certification: ICertification | null; + } +): Promise => { + return { + ...effectif, + _computed: await addComputedFields({ organisme, effectif, certification }), + }; +}; + export async function getEffectifForm(effectifId: ObjectId): Promise { let effectif: IEffectif | IEffectifDECA | null = await effectifsDb().findOne({ _id: effectifId }); diff --git a/server/src/common/actions/effectifs.statut.actions.ts b/server/src/common/actions/effectifs.statut.actions.ts index 17bf8ca51..7f93ea2f4 100644 --- a/server/src/common/actions/effectifs.statut.actions.ts +++ b/server/src/common/actions/effectifs.statut.actions.ts @@ -1,9 +1,10 @@ import { captureException } from "@sentry/node"; import Boom from "boom"; import { cloneDeep } from "lodash-es"; -import { MongoServerError, UpdateFilter } from "mongodb"; +import { MongoServerError, UpdateFilter, type WithoutId } from "mongodb"; import { STATUT_APPRENANT, StatutApprenant } from "shared/constants"; import { IEffectif, IEffectifApprenant, IEffectifComputedStatut } from "shared/models/data/effectifs.model"; +import type { IEffectifDECA } from "shared/models/data/effectifsDECA.model"; import { addDaysUTC } from "shared/utils"; import logger from "../logger"; @@ -43,7 +44,10 @@ function shouldUpdateStatut(effectif: IEffectif): boolean { * @param {Date} evaluationDate La date à laquelle l'évaluation du statut est effectuée. * @returns {IEffectifComputedStatut} L'objet de statut calculé pour l'effectif. */ -export function createComputedStatutObject(effectif: IEffectif, evaluationDate: Date): IEffectifComputedStatut | null { +export function createComputedStatutObject( + effectif: IEffectif | WithoutId, + evaluationDate: Date +): IEffectifComputedStatut | null { try { const parcours = generateUnifiedParcours(effectif, evaluationDate); @@ -59,7 +63,7 @@ export function createComputedStatutObject(effectif: IEffectif, evaluationDate: { context: "createComputedStatutObject", evaluationDate, - effectifId: effectif._id, + effectifId: "_id" in effectif ? effectif._id : null, errorStack: error instanceof Error ? error.stack : undefined, } ); @@ -97,7 +101,7 @@ function handleUpdateError(err: unknown, effectif: IEffectif) { } const generateUnifiedParcours = ( - effectif: IEffectif, + effectif: IEffectif | WithoutId, evaluationDate: Date ): { valeur: StatutApprenant; date: Date }[] => { let parcours: { valeur: StatutApprenant; date: Date }[] = []; @@ -139,7 +143,7 @@ function deduplicateAndSortParcours(parcours: { valeur: StatutApprenant; date: D } function determineStatutsByContrats( - effectif: IEffectif, + effectif: IEffectif | WithoutId, evaluationDate?: Date ): { valeur: StatutApprenant; date: Date }[] { if (!effectif.formation?.date_entree && !effectif.formation?.date_fin) { diff --git a/server/src/jobs/fiabilisation/certification/fiabilisation-certification.ts b/server/src/jobs/fiabilisation/certification/fiabilisation-certification.ts index b7d6319ff..e2cb0e08e 100644 --- a/server/src/jobs/fiabilisation/certification/fiabilisation-certification.ts +++ b/server/src/jobs/fiabilisation/certification/fiabilisation-certification.ts @@ -210,28 +210,25 @@ export async function getEffectifCertification(effectif: Pick>( +export function fiabilisationEffectifFormation>( effectif: T, certification: ICertification | null -): T { +): T["formation"] { if (!certification) { - return effectif; + return effectif.formation; } const niveau = certification.intitule.niveau.rncp?.europeen ?? certification.intitule.niveau.cfd?.europeen ?? null; return { - ...effectif, - formation: { - ...effectif.formation, - cfd: certification.identifiant.cfd, - rncp: certification.identifiant.rncp, - libelle_long: - certification.intitule.cfd?.long ?? certification.intitule.rncp ?? effectif.formation?.libelle_long ?? null, - libelle_court: - certification.intitule.cfd?.court ?? certification.intitule.rncp ?? effectif.formation?.libelle_court ?? null, - niveau, - niveau_libelle: getNiveauFormationFromLibelle(niveau), - }, + ...effectif.formation, + cfd: certification.identifiant.cfd, + rncp: certification.identifiant.rncp, + libelle_long: + certification.intitule.cfd?.long ?? certification.intitule.rncp ?? effectif.formation?.libelle_long ?? null, + libelle_court: + certification.intitule.cfd?.court ?? certification.intitule.rncp ?? effectif.formation?.libelle_court ?? null, + niveau, + niveau_libelle: getNiveauFormationFromLibelle(niveau), }; } diff --git a/server/src/jobs/hydrate/deca/hydrate-deca-raw.ts b/server/src/jobs/hydrate/deca/hydrate-deca-raw.ts index 18fd999f5..df84349e2 100644 --- a/server/src/jobs/hydrate/deca/hydrate-deca-raw.ts +++ b/server/src/jobs/hydrate/deca/hydrate-deca-raw.ts @@ -3,7 +3,7 @@ import { normalize } from "path"; import { captureException } from "@sentry/node"; import { MongoClient, WithoutId } from "mongodb"; import { SOURCE_APPRENANT } from "shared/constants"; -import { IEffectif, IOrganisme } from "shared/models"; +import { IOrganisme } from "shared/models"; import { IAirbyteRawBalDeca } from "shared/models/data/airbyteRawBalDeca.model"; import { zApprenant } from "shared/models/data/effectifs/apprenant.part"; import { zContrat } from "shared/models/data/effectifs/contrat.part"; @@ -11,7 +11,7 @@ import { IEffectifDECA } from "shared/models/data/effectifsDECA.model"; import { zodOpenApi } from "shared/models/zodOpenApi"; import { cyrb53Hash, getYearFromDate } from "shared/utils"; -import { addComputedFields } from "@/common/actions/effectifs.actions"; +import { withComputedFields } from "@/common/actions/effectifs.actions"; import { checkIfEffectifExists } from "@/common/actions/engine/engine.actions"; import { getOrganismeByUAIAndSIRET } from "@/common/actions/organismes/organismes.actions"; import parentLogger from "@/common/logger"; @@ -20,8 +20,8 @@ import { getMongodbUri } from "@/common/mongodb"; import { __dirname } from "@/common/utils/esmUtils"; import config from "@/config"; import { + fiabilisationEffectifFormation, getEffectifCertification, - withEffectifFormation, } from "@/jobs/fiabilisation/certification/fiabilisation-certification"; const logger = parentLogger.child({ module: "job:hydrate:contrats-deca-raw" }); @@ -191,11 +191,13 @@ async function transformDocument(document: IAirbyteRawBalDeca): Promise, certification }), - is_deca_compatible: !organisme.is_transmission_target, - }; + return withComputedFields( + { + ...effectif, + formation: fiabilisationEffectifFormation(effectif, certification), + is_deca_compatible: !organisme.is_transmission_target, + }, + { organisme, certification } + ); } diff --git a/server/src/jobs/hydrate/effectifs/update-effectifs-lieu-de-formation.ts b/server/src/jobs/hydrate/effectifs/update-effectifs-lieu-de-formation.ts index 6ce3c9ebf..b5b4b9051 100644 --- a/server/src/jobs/hydrate/effectifs/update-effectifs-lieu-de-formation.ts +++ b/server/src/jobs/hydrate/effectifs/update-effectifs-lieu-de-formation.ts @@ -3,10 +3,7 @@ import { captureException } from "@sentry/node"; import { addComputedFields } from "@/common/actions/effectifs.actions"; import logger from "@/common/logger"; import { effectifsDb, organismesDb } from "@/common/model/collections"; -import { - getEffectifCertification, - withEffectifFormation, -} from "@/jobs/fiabilisation/certification/fiabilisation-certification"; +import { getEffectifCertification } from "@/jobs/fiabilisation/certification/fiabilisation-certification"; export async function hydrateEffectifsLieuDeFormation() { let nbEffectifsMisAJour = 0; @@ -91,7 +88,7 @@ export async function hydrateEffectifsLieuDeFormationVersOrganismeFormateur() { organisme_formateur_id: organismeFormateur._id, _computed: await addComputedFields({ organisme: organismeFormateur, - effectif: withEffectifFormation(effectif, certification), + effectif, certification, }), }; diff --git a/server/src/jobs/ingestion/process-ingestion.ts b/server/src/jobs/ingestion/process-ingestion.ts index b0907817f..54edfb1c2 100644 --- a/server/src/jobs/ingestion/process-ingestion.ts +++ b/server/src/jobs/ingestion/process-ingestion.ts @@ -14,7 +14,7 @@ import { IOrganisme } from "shared/models/data/organismes.model"; import { NEVER, SafeParseReturnType, ZodIssueCode } from "zod"; import { updateVoeuxAffelnetEffectif } from "@/common/actions/affelnet.actions"; -import { lockEffectif, addComputedFields, mergeEffectifWithDefaults } from "@/common/actions/effectifs.actions"; +import { lockEffectif, mergeEffectifWithDefaults, withComputedFields } from "@/common/actions/effectifs.actions"; import { buildNewHistoriqueStatutApprenant, mapEffectifQueueToEffectif, @@ -41,8 +41,8 @@ import dossierApprenantSchemaV3, { } from "@/common/validation/dossierApprenantSchemaV3"; import { + fiabilisationEffectifFormation, getEffectifCertification, - withEffectifFormation, } from "../fiabilisation/certification/fiabilisation-certification"; import { fiabilisationUaiSiret } from "../fiabilisation/uai-siret/updateFiabilisation"; @@ -277,7 +277,7 @@ async function transformEffectifQueueV3ToEffectif(rawEffectifQueued: IEffectifQu let organismeTarget: any; const result = await dossierApprenantSchemaV3() - .transform(async (effectifQueued, ctx) => { + .transform(async (effectifQueued, ctx): Promise<{ effectif: IEffectif; organisme: IOrganisme }> => { let [effectif, organismeFormateur, organismeResponsable] = await Promise.all([ transformEffectifQueueToEffectif(effectifQueued), (async () => { @@ -334,7 +334,6 @@ async function transformEffectifQueueV3ToEffectif(rawEffectifQueued: IEffectifQu } const certification = await getEffectifCertification(effectif); - effectif = withEffectifFormation(effectif, certification); // Source: https://mission-apprentissage.slack.com/archives/C02FR2L1VB8/p1695295051135549 // We compute the real duration of the formation in months, only if we have both date_entree and date_fin @@ -345,19 +344,25 @@ async function transformEffectifQueueV3ToEffectif(rawEffectifQueued: IEffectifQu } return { - effectif: { - ...effectif, - organisme_id: organismeFormateur?._id, - organisme_formateur_id: organismeFormateur?._id, - organisme_responsable_id: organismeResponsable?._id, - lieu_de_formation: { - uai: effectifQueued.etablissement_lieu_de_formation_uai, - siret: effectifQueued.etablissement_lieu_de_formation_siret, - adresse: effectifQueued.etablissement_lieu_de_formation_adresse, - code_postal: effectifQueued.etablissement_lieu_de_formation_code_postal, + effectif: await withComputedFields( + { + ...effectif, + formation: fiabilisationEffectifFormation(effectif, certification), + organisme_id: organismeFormateur?._id, + organisme_formateur_id: organismeFormateur?._id, + organisme_responsable_id: organismeResponsable?._id, + lieu_de_formation: { + uai: effectifQueued.etablissement_lieu_de_formation_uai, + siret: effectifQueued.etablissement_lieu_de_formation_siret, + adresse: effectifQueued.etablissement_lieu_de_formation_adresse, + code_postal: effectifQueued.etablissement_lieu_de_formation_code_postal, + }, + _raw: { + formation: effectif.formation, + }, }, - _computed: await addComputedFields({ organisme: organismeFormateur, effectif, certification }), - }, + { organisme: organismeFormateur, certification } + ), organisme: organismeFormateur, }; }) @@ -378,47 +383,53 @@ async function transformEffectifQueueV1V2ToEffectif(rawEffectifQueued: IEffectif const itemProcessingInfos: ItemProcessingInfos = {}; let organismeTarget; - const result = await dossierApprenantSchemaV1V2() - .transform(async (effectifQueued, ctx) => { - let [effectif, organisme] = await Promise.all([ - transformEffectifQueueToEffectif(effectifQueued), - (async () => { - const { organisme, stats } = await findOrganismeWithStats( - effectifQueued.uai_etablissement, - effectifQueued.siret_etablissement - ); - organismeTarget = organisme; - Object.assign(itemProcessingInfos, addPrefixToProperties("organisme_", stats)); - return organisme; - })(), - ]); - - if (!organisme) { - ctx.addIssue({ - code: ZodIssueCode.custom, - message: "organisme non trouvé", - path: ["uai_etablissement", "siret_etablissement"], - params: { - uai: effectifQueued.uai_etablissement, - siret: effectifQueued.siret_etablissement, - }, - }); - return NEVER; - } + const result: SafeParseReturnType = + await dossierApprenantSchemaV1V2() + .transform(async (effectifQueued, ctx): Promise<{ effectif: IEffectif; organisme: IOrganisme }> => { + let [effectif, organisme] = await Promise.all([ + transformEffectifQueueToEffectif(effectifQueued), + (async () => { + const { organisme, stats } = await findOrganismeWithStats( + effectifQueued.uai_etablissement, + effectifQueued.siret_etablissement + ); + organismeTarget = organisme; + Object.assign(itemProcessingInfos, addPrefixToProperties("organisme_", stats)); + return organisme; + })(), + ]); + + if (!organisme) { + ctx.addIssue({ + code: ZodIssueCode.custom, + message: "organisme non trouvé", + path: ["uai_etablissement", "siret_etablissement"], + params: { + uai: effectifQueued.uai_etablissement, + siret: effectifQueued.siret_etablissement, + }, + }); + return NEVER; + } - const certification = await getEffectifCertification(effectif); - effectif = withEffectifFormation(effectif, certification); + const certification = await getEffectifCertification(effectif); - return { - effectif: { - ...effectif, - organisme_id: organisme?._id, - _computed: await addComputedFields({ organisme, effectif, certification }), - }, - organisme: organisme, - }; - }) - .safeParseAsync(rawEffectifQueued); + return { + effectif: await withComputedFields( + { + ...effectif, + organisme_id: organisme?._id, + formation: fiabilisationEffectifFormation(effectif, certification), + _raw: { + formation: effectif.formation, + }, + }, + { organisme, certification } + ), + organisme: organisme, + }; + }) + .safeParseAsync(rawEffectifQueued); return { result, itemProcessingInfos, diff --git a/server/src/jobs/jobs.ts b/server/src/jobs/jobs.ts index e4c92cfc0..5cc792bc1 100644 --- a/server/src/jobs/jobs.ts +++ b/server/src/jobs/jobs.ts @@ -19,8 +19,8 @@ import { recreateIndexes } from "./db/recreateIndexes"; import { validateModels } from "./db/schemaValidation"; import { sendReminderEmails } from "./emails/reminder"; import { + fiabilisationEffectifFormation, getEffectifCertification, - withEffectifFormation, } from "./fiabilisation/certification/fiabilisation-certification"; import { transformSansContratsToAbandonsDepuis, transformRupturantsToAbandonsDepuis } from "./fiabilisation/effectifs"; import { hydrateRaisonSocialeEtEnseigneOFAInconnus } from "./fiabilisation/ofa-inconnus"; @@ -333,26 +333,24 @@ export async function setupJobProcessor() { // In case of interruption, we can restart the job from the last processed effectif // Any updated effectif has either been updated by the job or has been updated by the processing queue - const cursor = effectifsDb().find({ updated_at: { $lte: job.updated_at } }); + const cursor = effectifsDb().find({ created_at: { $lte: job.created_at } }, { sort: { created_at: -1 } }); let bulk: IEffectif[] = []; const processEffectif = async (effectif: IEffectif) => { const certification = await getEffectifCertification(effectif); - const update: Partial = { - formation: withEffectifFormation(effectif, certification).formation, - }; - - if (effectif._computed?.formation) { - update._computed = { + const update = { + formation: fiabilisationEffectifFormation(effectif, certification), + "_raw.formation": effectif.formation, + _computed: { ...effectif._computed, formation: { - ...effectif._computed.formation, + ...effectif._computed?.formation, codes_rome: certification?.domaines.rome.rncp?.map(({ code }) => code) ?? null, }, - }; - } + }, + }; await effectifsDb() .updateOne({ _id: effectif._id }, { $set: update }) @@ -363,6 +361,7 @@ export async function setupJobProcessor() { motif: MOTIF_SUPPRESSION.Doublon, description: "Suppression du doublon suite à la migration des formations", }); + return; } throw err; @@ -370,9 +369,14 @@ export async function setupJobProcessor() { }; for await (const effectif of cursor) { + if (effectif._raw?.formation) { + // Already migrated + continue; + } + bulk.push(effectif); - if (bulk.length > 200) { + if (bulk.length > 100) { await Promise.all(bulk.map(processEffectif)); if (signal.aborted) { return; diff --git a/shared/models/data/effectifs.model.ts b/shared/models/data/effectifs.model.ts index 12c4c58a0..a90c4f318 100644 --- a/shared/models/data/effectifs.model.ts +++ b/shared/models/data/effectifs.model.ts @@ -77,7 +77,6 @@ const indexes: [IndexSpecification, CreateIndexesOptions][] = [ [{ source: 1 }, { name: "source" }], [{ source_organisme_id: 1 }, { name: "source_organisme_id" }], [{ created_at: 1 }, { name: "created_at" }], - [{ updated_at: 1 }, {}], [{ "_computed.organisme.region": 1 }, {}], [{ "_computed.organisme.departement": 1 }, {}], [{ "_computed.organisme.academie": 1 }, {}], @@ -180,6 +179,11 @@ export const zEffectif = z.object({ .nullish(), }) .nullish(), + _raw: z + .object({ + formation: zFormationEffectif.nullish(), + }) + .nullish(), _computed: z .object( {