Skip to content

Commit

Permalink
Merge pull request #70 from DavidWeiss2/fix/batch-backfill-v18
Browse files Browse the repository at this point in the history
Fix/batch backfill v18
  • Loading branch information
jasonbosco authored Sep 24, 2024
2 parents f5b137d + c76ba51 commit 5611e56
Showing 1 changed file with 58 additions and 59 deletions.
117 changes: 58 additions & 59 deletions functions/src/backfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,84 +10,83 @@ admin.initializeApp({

const validateBackfillRun = (snapshot) => {
if (![true, "true"].includes(snapshot.after.get("trigger"))) {
functions.logger.error(
"Skipping backfill. `trigger: true` key " +
`was not found in Firestore document ${config.typesenseBackfillTriggerDocumentInFirestore}.`);
functions.logger.error("Skipping backfill. `trigger: true` key " + `was not found in Firestore document ${config.typesenseBackfillTriggerDocumentInFirestore}.`);
return false;
}

// Check if there's a collection specific sync setup
const collectionsToSync = snapshot.after.get("firestore_collections");
if (Array.isArray(collectionsToSync) && !collectionsToSync.includes(config.firestoreCollectionPath)) {
functions.logger.error(
"Skipping backfill. The `firestore_collections` key in " +
`${config.typesenseBackfillTriggerDocumentInFirestore} did not contain collection ${config.firestoreCollectionPath}.`);
"Skipping backfill. The `firestore_collections` key in " + `${config.typesenseBackfillTriggerDocumentInFirestore} did not contain collection ${config.firestoreCollectionPath}.`,
);
return false;
}

return true;
};

module.exports = functions.firestore.document(config.typesenseBackfillTriggerDocumentInFirestore)
.onWrite(async (snapshot, context) => {
functions.logger.info("Backfilling " +
module.exports = functions.firestore.document(config.typesenseBackfillTriggerDocumentInFirestore).onWrite(async (snapshot, context) => {
functions.logger.info(
"Backfilling " +
`${config.firestoreCollectionFields.join(",")} fields in Firestore documents ` +
`from ${config.firestoreCollectionPath} ` +
`into Typesense Collection ${config.typesenseCollectionName} ` +
`on ${config.typesenseHosts.join(",")}`);
`on ${config.typesenseHosts.join(",")}`,
);

if (!validateBackfillRun(snapshot)) {
return false;
}
if (!validateBackfillRun(snapshot)) {
return false;
}

const typesense = createTypesenseClient();
const typesense = createTypesenseClient();

const querySnapshot =
await admin.firestore().collection(config.firestoreCollectionPath).get();
let currentDocumentNumber = 0;
let currentDocumentsBatch = [];
for (const firestoreDocument of querySnapshot.docs) {
currentDocumentNumber += 1;
currentDocumentsBatch.push(await utils.typesenseDocumentFromSnapshot(firestoreDocument));
const querySnapshot = await admin.firestore().collection(config.firestoreCollectionPath);

if (currentDocumentNumber === config.typesenseBackfillBatchSize) {
try {
await typesense
.collections(encodeURIComponent(config.typesenseCollectionName))
.documents()
.import(currentDocumentsBatch, {action: "upsert"});
currentDocumentsBatch = [];
functions.logger.info(`Imported ${currentDocumentNumber} documents into Typesense`);
} catch (error) {
if (error.importResults) {
const failedItems = error.importResults.filter(
(r) => r.success === false,
);
functions.logger.error("Import failed with document errors", failedItems);
} else {
functions.logger.error("Import error", error);
}
}
}
}
if (currentDocumentsBatch.length > 0) {
try {
await typesense
.collections(encodeURIComponent(config.typesenseCollectionName))
.documents()
.import(currentDocumentsBatch, {action: "upsert"});
functions.logger.info(`Imported ${currentDocumentNumber} documents into Typesense`);
} catch (error) {
if (error.importResults) {
const failedItems = error.importResults.filter(
(r) => r.success === false,
);
functions.logger.error("Import failed with document errors", failedItems);
} else {
functions.logger.error("Import error", error);
}
}
let lastDoc = null;

do {
const queryFotThisBatch = lastDoc ? querySnapshot.startAfter(lastDoc) : querySnapshot;
const thisBatch = await queryFotThisBatch.limit(config.typesenseBackfillBatchSize).get();
if (thisBatch.empty) {
break;
}
const currentDocumentsBatch = await Promise.all(
thisBatch.docs.map(async (doc) => {
return await utils.typesenseDocumentFromSnapshot(doc);
}),
);

lastDoc = thisBatch.docs.at(-1) ?? null;
try {
await typesense.collections(encodeURIComponent(config.typesenseCollectionName)).documents().import(currentDocumentsBatch);
functions.logger.info(`Imported ${currentDocumentsBatch.length} documents into Typesense`);
} catch (error) {
functions.logger.error(`Import error in a batch of documents from ${currentDocumentsBatch[0].id} to ${lastDoc.id}`, error);
if ("importResults" in error) {
logImportErrors(error.importResults);
}
}

if (currentDocumentsBatch.length < config.typesenseBackfillBatchSize) {
break;
}
// Recurse on the next process tick, to avoid
// issues with the event loop on firebase functions related to resource release
await new Promise((resolve) => process.nextTick(resolve));
} while (lastDoc);

functions.logger.info("Done backfilling to Typesense from Firestore");
});

/**
* Log import errors, if any.
* @param {Typesense.ImportError} importResults
*/
function logImportErrors(importResults) {
importResults.forEach((result) => {
if (result.success) return;

functions.logger.info("Done backfilling to Typesense from Firestore");
});
functions.logger.error(`Error importing document with error: ${result.error}`, result);
});
}

0 comments on commit 5611e56

Please sign in to comment.