Skip to content

Commit

Permalink
feat: Embed on-instance Whisper model for audio/mp4 transcribing (#449)
Browse files Browse the repository at this point in the history
* feat: Embed on-instance Whisper model for audio/mp4 transcribing
resolves #329

* additional logging

* add placeholder for tmp folder in collector storage
Add cleanup of hotdir and tmp on collector boot to prevent hanging files
split loading of model and file conversion into concurrency

* update README

* update model size

* update supported filetypes
  • Loading branch information
timothycarambat authored Dec 15, 2023
1 parent 719521c commit 61db981
Show file tree
Hide file tree
Showing 12 changed files with 636 additions and 64 deletions.
2 changes: 2 additions & 0 deletions collector/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const { ACCEPTED_MIMES } = require("./utils/constants");
const { reqBody } = require("./utils/http");
const { processSingleFile } = require("./processSingleFile");
const { processLink } = require("./processLink");
const { wipeCollectorStorage } = require("./utils/files");
const app = express();

app.use(cors({ origin: true }));
Expand Down Expand Up @@ -66,6 +67,7 @@ app.all("*", function (_, response) {

app
.listen(8888, async () => {
await wipeCollectorStorage();
console.log(`Document processor app listening on port 8888`);
})
.on("error", function (_) {
Expand Down
7 changes: 5 additions & 2 deletions collector/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@
"node": ">=18.12.1"
},
"scripts": {
"dev": "NODE_ENV=development nodemon --trace-warnings index.js",
"dev": "NODE_ENV=development nodemon --ignore hotdir --ignore storage --trace-warnings index.js",
"start": "NODE_ENV=production node index.js",
"lint": "yarn prettier --write ./processSingleFile ./processLink ./utils index.js"
},
"dependencies": {
"@googleapis/youtube": "^9.0.0",
"@xenova/transformers": "^2.11.0",
"bcrypt": "^5.1.0",
"body-parser": "^1.20.2",
"cors": "^2.8.5",
"dotenv": "^16.0.3",
"express": "^4.18.2",
"extract-zip": "^2.0.1",
"fluent-ffmpeg": "^2.1.2",
"js-tiktoken": "^1.0.8",
"langchain": "0.0.201",
"mammoth": "^1.6.0",
Expand All @@ -33,7 +35,8 @@
"pdf-parse": "^1.1.1",
"puppeteer": "^21.6.1",
"slugify": "^1.6.6",
"uuid": "^9.0.0"
"uuid": "^9.0.0",
"wavefile": "^11.0.0"
},
"devDependencies": {
"nodemon": "^2.0.22",
Expand Down
146 changes: 146 additions & 0 deletions collector/processSingleFile/convert/asAudio.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
const fs = require("fs");
const path = require("path");
const { v4 } = require("uuid");
const {
createdDate,
trashFile,
writeToServerDocuments,
} = require("../../utils/files");
const { tokenizeString } = require("../../utils/tokenizer");
const { default: slugify } = require("slugify");
const { LocalWhisper } = require("../../utils/WhisperProviders/localWhisper");

async function asAudio({ fullFilePath = "", filename = "" }) {
const whisper = new LocalWhisper();

console.log(`-- Working ${filename} --`);
const transcriberPromise = new Promise((resolve) =>
whisper.client().then((client) => resolve(client))
);
const audioDataPromise = new Promise((resolve) =>
convertToWavAudioData(fullFilePath).then((audioData) => resolve(audioData))
);
const [audioData, transcriber] = await Promise.all([
audioDataPromise,
transcriberPromise,
]);

if (!audioData) {
console.error(`Failed to parse content from ${filename}.`);
trashFile(fullFilePath);
return {
success: false,
reason: `Failed to parse content from ${filename}.`,
};
}

console.log(`[Model Working]: Transcribing audio data to text`);
const { text: content } = await transcriber(audioData, {
chunk_length_s: 30,
stride_length_s: 5,
});

if (!content.length) {
console.error(`Resulting text content was empty for ${filename}.`);
trashFile(fullFilePath);
return { success: false, reason: `No text content found in ${filename}.` };
}

data = {
id: v4(),
url: "file://" + fullFilePath,
title: filename,
docAuthor: "no author found",
description: "No description found.",
docSource: "pdf file uploaded by the user.",
chunkSource: filename,
published: createdDate(fullFilePath),
wordCount: content.split(" ").length,
pageContent: content,
token_count_estimate: tokenizeString(content).length,
};

writeToServerDocuments(data, `${slugify(filename)}-${data.id}`);
trashFile(fullFilePath);
console.log(
`[SUCCESS]: ${filename} transcribed, converted & ready for embedding.\n`
);
return { success: true, reason: null };
}

async function convertToWavAudioData(sourcePath) {
try {
let buffer;
const wavefile = require("wavefile");
const ffmpeg = require("fluent-ffmpeg");
const outFolder = path.resolve(__dirname, `../../storage/tmp`);
if (!fs.existsSync(outFolder)) fs.mkdirSync(outFolder, { recursive: true });

const fileExtension = path.extname(sourcePath).toLowerCase();
if (fileExtension !== ".wav") {
console.log(
`[Conversion Required] ${fileExtension} file detected - converting to .wav`
);
const outputFile = path.resolve(outFolder, `${v4()}.wav`);
const convert = new Promise((resolve) => {
ffmpeg(sourcePath)
.toFormat("wav")
.on("error", (error) => {
console.error(`[Conversion Error] ${error.message}`);
resolve(false);
})
.on("progress", (progress) =>
console.log(
`[Conversion Processing]: ${progress.targetSize}KB converted`
)
)
.on("end", () => {
console.log("[Conversion Complete]: File converted to .wav!");
resolve(true);
})
.save(outputFile);
});
const success = await convert;
if (!success)
throw new Error(
"[Conversion Failed]: Could not convert file to .wav format!"
);

const chunks = [];
const stream = fs.createReadStream(outputFile);
for await (let chunk of stream) chunks.push(chunk);
buffer = Buffer.concat(chunks);
fs.rmSync(outputFile);
} else {
const chunks = [];
const stream = fs.createReadStream(sourcePath);
for await (let chunk of stream) chunks.push(chunk);
buffer = Buffer.concat(chunks);
}

const wavFile = new wavefile.WaveFile(buffer);
wavFile.toBitDepth("32f");
wavFile.toSampleRate(16000);

let audioData = wavFile.getSamples();
if (Array.isArray(audioData)) {
if (audioData.length > 1) {
const SCALING_FACTOR = Math.sqrt(2);

// Merge channels into first channel to save memory
for (let i = 0; i < audioData[0].length; ++i) {
audioData[0][i] =
(SCALING_FACTOR * (audioData[0][i] + audioData[1][i])) / 2;
}
}
audioData = audioData[0];
}

return audioData;
} catch (error) {
console.error(`convertToWavAudioData`, error);
return null;
}
}

module.exports = asAudio;
2 changes: 2 additions & 0 deletions collector/storage/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
tmp/*
!tmp/.placeholder
Empty file.
59 changes: 59 additions & 0 deletions collector/utils/WhisperProviders/localWhisper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
const path = require("path");
const fs = require("fs");

class LocalWhisper {
constructor() {
// Model Card: https://huggingface.co/Xenova/whisper-small
this.model = "Xenova/whisper-small";
this.cacheDir = path.resolve(
process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, `models`)
: path.resolve(__dirname, `../../../server/storage/models`)
);

this.modelPath = path.resolve(this.cacheDir, "Xenova", "whisper-small");

// Make directory when it does not exist in existing installations
if (!fs.existsSync(this.cacheDir))
fs.mkdirSync(this.cacheDir, { recursive: true });
}

async client() {
if (!fs.existsSync(this.modelPath)) {
console.log(
"\x1b[34m[INFO]\x1b[0m The native whisper model has never been run and will be downloaded right now. Subsequent runs will be faster. (~250MB)\n\n"
);
}

try {
// Convert ESM to CommonJS via import so we can load this library.
const pipeline = (...args) =>
import("@xenova/transformers").then(({ pipeline }) =>
pipeline(...args)
);
return await pipeline("automatic-speech-recognition", this.model, {
cache_dir: this.cacheDir,
...(!fs.existsSync(this.modelPath)
? {
// Show download progress if we need to download any files
progress_callback: (data) => {
if (!data.hasOwnProperty("progress")) return;
console.log(
`\x1b[34m[Embedding - Downloading Model Files]\x1b[0m ${
data.file
} ${~~data?.progress}%`
);
},
}
: {}),
});
} catch (error) {
console.error("Failed to load the native whisper model:", error);
throw error;
}
}
}

module.exports = {
LocalWhisper,
};
50 changes: 0 additions & 50 deletions collector/utils/asDocx.js

This file was deleted.

11 changes: 11 additions & 0 deletions collector/utils/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ const ACCEPTED_MIMES = {

"application/pdf": [".pdf"],
"application/mbox": [".mbox"],

"audio/wav": [".wav"],
"audio/mpeg": [".mp3"],

"video/mp4": [".mp4"],
"video/mpeg": [".mpeg"],
};

const SUPPORTED_FILETYPE_CONVERTERS = {
Expand All @@ -31,6 +37,11 @@ const SUPPORTED_FILETYPE_CONVERTERS = {
".odp": "./convert/asOfficeMime.js",

".mbox": "./convert/asMbox.js",

".mp3": "./convert/asAudio.js",
".wav": "./convert/asAudio.js",
".mp4": "./convert/asAudio.js",
'.mpeg': "./convert/asAudio.js",
};

module.exports = {
Expand Down
40 changes: 40 additions & 0 deletions collector/utils/files/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,48 @@ function writeToServerDocuments(
return;
}

// When required we can wipe the entire collector hotdir and tmp storage in case
// there were some large file failures that we unable to be removed a reboot will
// force remove them.
async function wipeCollectorStorage() {
const cleanHotDir = new Promise((resolve) => {
const directory = path.resolve(__dirname, "../../hotdir");
fs.readdir(directory, (err, files) => {
if (err) resolve();

for (const file of files) {
if (file === "__HOTDIR__.md") continue;
try {
fs.rmSync(path.join(directory, file));
} catch {}
}
resolve();
});
});

const cleanTmpDir = new Promise((resolve) => {
const directory = path.resolve(__dirname, "../../storage/tmp");
fs.readdir(directory, (err, files) => {
if (err) resolve();

for (const file of files) {
if (file === ".placeholder") continue;
try {
fs.rmSync(path.join(directory, file));
} catch {}
}
resolve();
});
});

await Promise.all([cleanHotDir, cleanTmpDir]);
console.log(`Collector hot directory and tmp storage wiped!`);
return;
}

module.exports = {
trashFile,
createdDate,
writeToServerDocuments,
wipeCollectorStorage,
};
Loading

0 comments on commit 61db981

Please sign in to comment.