Skip to content

Commit

Permalink
fix: build S3 export files as stream (#1388)
Browse files Browse the repository at this point in the history
* fix: add logs

* fix: build file as stream

* Update server/src/jobs/partenaireExport/exportJobsToS3.ts

Co-authored-by: Moroine Bentefrit <[email protected]>

* fix: remove unused deps

---------

Co-authored-by: Moroine Bentefrit <[email protected]>
  • Loading branch information
kevbarns and moroine authored Aug 1, 2024
1 parent e43920c commit c2d912d
Showing 1 changed file with 44 additions and 5 deletions.
49 changes: 44 additions & 5 deletions server/src/jobs/partenaireExport/exportJobsToS3.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { writeFileSync } from "fs"
import { createWriteStream } from "fs"
import { Transform } from "stream"
import { pipeline } from "stream/promises"

import { LBA_ITEM_TYPE } from "shared/constants/lbaitem"

import { logger } from "../../common/logger"
import { uploadFileToS3 } from "../../common/utils/awsUtils"
import { getDbCollection } from "../../common/utils/mongodbUtils"

Expand All @@ -13,9 +16,35 @@ interface IGeneratorParams {
}

async function generateJsonExport({ collection, query, projection, fileName }: IGeneratorParams): Promise<string> {
logger.info(`Generating file ${fileName}`)
const filePath = new URL(`./${fileName}.json`, import.meta.url)
const data = await getDbCollection(collection).find(query).project(projection).toArray()
writeFileSync(filePath, JSON.stringify(data, null, 4))
const cursor = await getDbCollection(collection).find(query).project(projection)
const writable = createWriteStream(filePath)

// Transform stream to add commas between objects and brackets at the beginning and end
let isFirst = true
const transform = new Transform({
writableObjectMode: true,
readableObjectMode: true,
transform(chunk, encoding, callback) {
if (isFirst) {
this.push("[")
this.push(JSON.stringify(chunk, null, 4))
isFirst = false
} else {
this.push(",")
this.push(JSON.stringify(chunk, null, 4))
}
callback()
},
flush(callback) {
this.push("]")
callback()
},
})

await pipeline(cursor, transform, writable)

return filePath.pathname
}

Expand Down Expand Up @@ -54,8 +83,18 @@ async function exportLbaJobsToS3() {
fileName: LBA_ITEM_TYPE.RECRUTEURS_LBA,
}
await Promise.all([
generateJsonExport(offres_emploi_lba).then((path) => uploadFileToS3({ key: path.split("/").pop() as string, filePath: path, noCache: true })),
generateJsonExport(recruteurs_lba).then((path) => uploadFileToS3({ key: path.split("/").pop() as string, filePath: path, noCache: true })),
generateJsonExport(offres_emploi_lba).then((path) => {
const key = path.split("/").pop() as string
logger.info(`Uploading file ${key} to S3`)
uploadFileToS3({ key, filePath: path, noCache: true })
logger.info(`file ${key} uploaded`)
}),
generateJsonExport(recruteurs_lba).then((path) => {
const key = path.split("/").pop() as string
logger.info(`Uploading file ${key} to S3`)
uploadFileToS3({ key, filePath: path, noCache: true })
logger.info(`File ${key} uploaded`)
}),
])
}

Expand Down

0 comments on commit c2d912d

Please sign in to comment.