Skip to content

Commit

Permalink
feat: lbac 2302: import raw hellowork (#1329)
Browse files Browse the repository at this point in the history
* feat: lbac 2302: import raw hellowork

* Update server/src/jobs/offrePartenaire/importHelloWork.ts

Co-authored-by: Kevin Barnoin <[email protected]>

---------

Co-authored-by: Kevin Barnoin <[email protected]>
  • Loading branch information
remy-auricoste and kevbarns authored Jul 3, 2024
1 parent 23adad8 commit 70c2cb5
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .talismanrc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ fileignoreconfig:
checksum: 8e4712bae8747c1f0f2ba8b4ac3f3af118772136c16b6318b14a2d8b7e277dc6
- filename: server/src/jobs/lba_recruteur/formulaire/misc/removeVersionKeyFromRecruiters.ts
checksum: 3cd111d8c109cfec357bae48af70d0cf5644d02cd2c4b9afc5b8aa07bccbd535
- filename: server/src/jobs/offrePartenaire/importHelloWork.ts
checksum: 26caf0b5a97ce2ff7451cc2717a3b36f38b20f94db6beb490b98b8311debfd25
- filename: server/src/security/accessApiApprentissageService.ts
checksum: 2f0bf2a0a20e071cbcaec67eebed21acd0de40b7d7a1e8178484ebc14f024ca5
- filename: server/src/security/accessTokenService.ts
Expand Down
7 changes: 7 additions & 0 deletions server/src/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,13 @@ program
.option("-parallelism, [parallelism]", "Number of threads", "10")
.action(createJobAction("referentiel-opco:constructys:import"))

program
.command("import-hellowork")
.description("Importe les offres hellowork")
.option("-q, --queued", "Run job asynchronously", false)
.option("-parallelism, [parallelism]", "Number of threads", "10")
.action(createJobAction("import-hellowork"))

export async function startCLI() {
await program.parseAsync(process.argv)
}
4 changes: 4 additions & 0 deletions server/src/jobs/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import updateGeoLocations from "./lbb/updateGeoLocations"
import updateLbaCompanies from "./lbb/updateLbaCompanies"
import updateOpcoCompanies from "./lbb/updateOpcoCompanies"
import { runGarbageCollector } from "./misc/runGarbageCollector"
import { importHelloWork } from "./offrePartenaire/importHelloWork"
import { exportLbaJobsToS3 } from "./partenaireExport/exportJobsToS3"
import { exportToFranceTravail } from "./partenaireExport/exportToFranceTravail"
import { activateOptoutOnEtablissementAndUpdateReferrersOnETFA } from "./rdv/activateOptoutOnEtablissementAndUpdateReferrersOnETFA"
Expand Down Expand Up @@ -366,6 +367,9 @@ export async function runJob(job: IInternalJobsCronTask | IInternalJobsSimple):
case "crons:scheduler":
return cronsScheduler()

case "import-hellowork":
return importHelloWork()

default: {
logger.warn(`Job not found ${job.name}`)
}
Expand Down
102 changes: 102 additions & 0 deletions server/src/jobs/offrePartenaire/importHelloWork.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { PassThrough, pipeline } from "node:stream"

import axios from "axios"
import { ObjectId } from "mongodb"
import * as xml2j from "xml2js"

import { logger } from "@/common/logger"
import { getDbCollection } from "@/common/utils/mongodbUtils"

const xmlParser = new xml2j.Parser({ explicitArray: false, emptyTag: null })
const offerNodeName = "country"
const openingTag = `<${offerNodeName}`
const closingTag = `</${offerNodeName}>`

const offerHandler = async (offerXml: string, now: Date) => {
logger.info("parsing offer")
const json = await xmlParser.parseStringPromise(offerXml)
await getDbCollection("raw_hellowork").insertOne({ ...json, _id: new ObjectId(), createdAt: now })
}

export const importHelloWork = async () => {
logger.info("deleting old data")
await getDbCollection("raw_hellowork").deleteMany({})

logger.info("import starting...")

const now = new Date()
const url = "https://aiweb.cs.washington.edu/research/projects/xmltk/xmldata/data/mondial/mondial-3.0.xml"
const response = await axios.get(url, {
responseType: "stream",
})

let currentOffer = ""
let offerInsertCount = 0

const readChunk = async (str: string) => {
const stringReader = newStringReader({ str, index: 0 })
while (!stringReader.isDone()) {
if (!currentOffer) {
stringReader.goTo(openingTag)
stringReader.skip(openingTag)
}
const content = stringReader.takeUntil(closingTag)
currentOffer += content
const found = stringReader.skip(closingTag)
if (found) {
offerInsertCount++
await offerHandler(openingTag + currentOffer + closingTag, now)
currentOffer = ""
}
}
}

const xmlToJsonTransform = new PassThrough({
transform(chunk, _encoding, callback) {
readChunk(chunk.toString()).then(() => callback(null, null))
},
})
return new Promise((resolve, reject) => {
pipeline(response.data, xmlToJsonTransform, (err) => {
logger.info(`${offerInsertCount} offers inserted`)
if (err) {
logger.error("Pipeline failed.", err)
reject(err)
} else {
logger.info("Pipeline succeeded.")
resolve({
offerInsertCount,
})
}
})
})
}

const newStringReader = (stringHead: { str: string; index: number }) => ({
isDone() {
return stringHead.index >= stringHead.str.length
},
goTo(part: string) {
this.takeUntil(part)
},
skip(part: string) {
if (stringHead.str.substring(stringHead.index, stringHead.index + part.length) === part) {
stringHead.index += part.length
return true
} else {
return false
}
},
takeUntil(part: string) {
const rightString = stringHead.str.substring(stringHead.index)
const index = rightString.indexOf(part)
if (index === -1) {
stringHead.index = stringHead.str.length
return rightString
} else {
const taken = rightString.substring(0, index)
stringHead.index += index
return taken
}
},
})
2 changes: 2 additions & 0 deletions shared/models/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import lbaCompanyModel from "./lbaCompany.model"
import lbaCompanyLegacyModel from "./lbaCompanyLegacy.model"
import opcoModel from "./opco.model"
import optoutModel from "./optout.model"
import rawHelloWorkModel from "./rawHelloWork.model"
import recruiterModel from "./recruiter.model"
import recruteurLbaUpdateEventModel from "./recruteurLbaUpdateEvent.model"
import referentielOnisepModel from "./referentielOnisep.model"
Expand Down Expand Up @@ -57,6 +58,7 @@ const modelDescriptorMap = {
[lbaCompanyLegacyModel.collectionName]: lbaCompanyLegacyModel,
[opcoModel.collectionName]: opcoModel,
[optoutModel.collectionName]: optoutModel,
[rawHelloWorkModel.collectionName]: rawHelloWorkModel,
[recruiterModel.collectionName]: recruiterModel,
[recruteurLbaUpdateEventModel.collectionName]: recruteurLbaUpdateEventModel,
[referentielOnisepModel.collectionName]: referentielOnisepModel,
Expand Down
11 changes: 11 additions & 0 deletions shared/models/rawHelloWork.model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { z } from "zod"

import { IModelDescriptor } from "./common"

export default {
zod: z.object({
createdAt: z.date(),
}),
indexes: [],
collectionName: "raw_hellowork",
} as const satisfies IModelDescriptor

0 comments on commit 70c2cb5

Please sign in to comment.