Skip to content

Commit

Permalink
Merge pull request #163 from CheckMateSG/idempotency
Browse files Browse the repository at this point in the history
Idempotency
  • Loading branch information
sarge1989 authored Sep 28, 2023
2 parents 25d6504 + 505f3b2 commit 720308d
Show file tree
Hide file tree
Showing 8 changed files with 13,413 additions and 11,230 deletions.
7 changes: 2 additions & 5 deletions functions/src/definitions/checkerHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ if (!admin.apps.length) {
admin.initializeApp()
}

const db = admin.firestore()

const checkerHandlerWhatsapp = async function (message: Message) {
const from = message.from // extract the phone number from the webhook payload
const type = message.type
const db = admin.firestore()
let responses

switch (type) {
Expand Down Expand Up @@ -99,7 +100,6 @@ const checkerHandlerWhatsapp = async function (message: Message) {

async function onSignUp(from: string, platform = "whatsapp") {
const responses = await getResponsesObj("factChecker")
const db = admin.firestore()
let res = await sendTextMessage(
"factChecker",
from,
Expand Down Expand Up @@ -135,7 +135,6 @@ async function onMsgReplyReceipt(
platform = "whatsapp"
) {
const responses = await getResponsesObj("factChecker")
const db = admin.firestore()
const factCheckerSnap = await db.collection("factCheckers").doc(from).get()
if (factCheckerSnap.get("getNameMessageId") === messageId) {
await factCheckerSnap.ref.update({
Expand Down Expand Up @@ -164,7 +163,6 @@ async function onFactCheckerYes(
from: string,
platform = "whatsapp"
) {
const db = admin.firestore()
if (!voteRequestPath.includes("/")) {
throw new Error("The voteRequestPath does not contain a forward slash (/).")
}
Expand Down Expand Up @@ -430,7 +428,6 @@ async function onTextListReceipt(
}

async function onContinue(factCheckerId: string) {
const db = admin.firestore()
await db.collection("factCheckers").doc(factCheckerId).update({
isActive: true,
})
Expand Down
8 changes: 8 additions & 0 deletions functions/src/definitions/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ const checkUrl = function (urlString: string) {
return url.protocol === "http:" || url.protocol === "https:"
}

async function checkMessageId(messageId: string) {
const db = admin.firestore()
const messageRef = db.collection("messageIds").doc(messageId)
const messageSnap = await messageRef.get()
return messageSnap.exists
}

function stripPhone(originalStr: string, includePlaceholder = false) {
const phoneNumbers = findPhoneNumbersInText(originalStr)
let newStr = originalStr
Expand Down Expand Up @@ -91,4 +98,5 @@ export {
getThresholds,
checkUrl,
normalizeSpaces,
checkMessageId,
}
105 changes: 84 additions & 21 deletions functions/src/definitions/userHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ import {
sendWhatsappContactMessage,
} from "./common/sendWhatsappMessage"
import { sendDisputeNotification } from "./common/sendMessage"
import { sleep, hashMessage, normalizeSpaces } from "./common/utils"
import {
sleep,
hashMessage,
normalizeSpaces,
checkMessageId,
} from "./common/utils"
import {
getResponsesObj,
sendMenuMessage,
Expand Down Expand Up @@ -44,6 +49,15 @@ const hashids = new Hashids(salt)
const db = admin.firestore()

const userHandlerWhatsapp = async function (message: Message) {
if (!message?.id) {
functions.logger.error("No message id")
return
}
if (await checkMessageId(message.id)) {
functions.logger.warn(`Message ${message.id} already exists`)
return
}

let from = message.from // extract the phone number from the webhook payload
let type = message.type
const responses = await getResponsesObj("user")
Expand Down Expand Up @@ -161,7 +175,7 @@ const userHandlerWhatsapp = async function (message: Message) {
step = await newTextInstanceHandler({
text: message.text.body,
timestamp: messageTimestamp,
id: message.id || null,
id: message.id,
from: from || null,
isForwarded: message?.context?.forwarded || null,
isFrequentlyForwarded: message?.context?.frequently_forwarded || null,
Expand All @@ -173,7 +187,7 @@ const userHandlerWhatsapp = async function (message: Message) {
step = await newImageInstanceHandler({
caption: message?.image?.caption || null,
timestamp: messageTimestamp,
id: message.id || null,
id: message.id,
mediaId: message?.image?.id || null,
mimeType: message?.image?.mime_type || null,
from: from || null,
Expand Down Expand Up @@ -253,14 +267,15 @@ async function newTextInstanceHandler({
}: {
text: string
timestamp: Timestamp
id: string | null
id: string
from: string | null
isForwarded: boolean | null
isFrequentlyForwarded: boolean | null
isFirstTimeUser: boolean
}) {
let hasMatch = false
let messageRef
let messageRef: FirebaseFirestore.DocumentReference | null = null
let messageUpdateObj: Object | null = null
const machineCategory = (await classifyText(text)) ?? "error"
if (from && isFirstTimeUser && machineCategory.includes("irrelevant")) {
await db.collection("users").doc(from).update({
Expand All @@ -286,7 +301,7 @@ async function newTextInstanceHandler({
let bestMatchingDocumentRef
let bestMatchingText
let similarityScore = 0
let matchedParentMessageRef
let matchedParentMessageRef = null

if (
similarity.ref &&
Expand All @@ -306,7 +321,8 @@ async function newTextInstanceHandler({
}

if (!hasMatch) {
let writeResult = await db.collection("messages").add({
messageRef = db.collection("messages").doc()
messageUpdateObj = {
machineCategory: machineCategory, //Can be "fake news" or "scam"
isMachineCategorised: !!(
machineCategory &&
Expand Down Expand Up @@ -349,17 +365,18 @@ async function newTextInstanceHandler({
: null,
customReply: null, //string
instanceCount: 0,
})
messageRef = writeResult
}
} else {
messageRef = matchedParentMessageRef
}
if (!messageRef) {
throw new Error(
functions.logger.error(
`No messageRef created or matched for whatsapp message with id ${id}`
)
return
}
const _ = await messageRef.collection("instances").add({
const instanceRef = messageRef.collection("instances").doc()
const instanceUpdateObj = {
source: "whatsapp",
id: id || null, //taken from webhook object, needed to reply
timestamp: timestamp, //timestamp, taken from webhook object (firestore timestamp data type)
Expand Down Expand Up @@ -394,7 +411,15 @@ async function newTextInstanceHandler({
},
isSatisfactionSurveySent: null,
satisfactionScore: null,
})
}
await addInstanceToDb(
id,
hasMatch,
messageRef,
messageUpdateObj,
instanceRef,
instanceUpdateObj
)
return Promise.resolve(`text_machine_${machineCategory}`)
}

Expand All @@ -413,14 +438,15 @@ async function newImageInstanceHandler({
mediaId: string | null
mimeType: string | null
timestamp: Timestamp
id: string | null
id: string
from: string | null
isForwarded: boolean | null
isFrequentlyForwarded: boolean | null
isFirstTimeUser: boolean
}) {
let filename
let messageRef
let messageRef: FirebaseFirestore.DocumentReference | null = null
let messageUpdateObj: Object | null = null
let hasMatch = false
let matchType = "none" // will be set to either "similarity" or "image" or "none"
let matchedInstanceSnap
Expand Down Expand Up @@ -512,7 +538,7 @@ async function newImageInstanceHandler({
let bestMatchingDocumentRef
let bestMatchingText
let similarityScore = 0
let matchedParentMessageRef
let matchedParentMessageRef = null
let textHash = null

if (ocrSuccess && isConvo && !!extractedMessage && !hasMatch) {
Expand Down Expand Up @@ -547,7 +573,8 @@ async function newImageInstanceHandler({
}

if (!hasMatch || (!matchedInstanceSnap && !matchedParentMessageRef)) {
let writeResult = await db.collection("messages").add({
messageRef = db.collection("messages").doc()
messageUpdateObj = {
machineCategory: machineCategory, //Can be "fake news" or "scam"
isMachineCategorised: !!(
machineCategory &&
Expand Down Expand Up @@ -587,8 +614,7 @@ async function newImageInstanceHandler({
: null,
customReply: null, //string
instanceCount: 0,
})
messageRef = writeResult
}
} else {
if (matchType === "image" && matchedInstanceSnap) {
messageRef = matchedInstanceSnap.ref.parent.parent
Expand All @@ -600,11 +626,13 @@ async function newImageInstanceHandler({
}
}
if (!messageRef) {
throw new Error(
functions.logger.error(
`No messageRef created or matched for whatsapp message with id ${id}`
)
return
}
const _ = await messageRef.collection("instances").add({
const instanceRef = messageRef.collection("instances").doc()
const instanceUpdateObj = {
source: "whatsapp",
id: id || null, //taken from webhook object, needed to reply
timestamp: timestamp, //timestamp, taken from webhook object (firestore timestamp data type)
Expand Down Expand Up @@ -644,7 +672,15 @@ async function newImageInstanceHandler({
},
isSatisfactionSurveySent: null,
satisfactionScore: null,
})
}
await addInstanceToDb(
id,
hasMatch,
messageRef,
messageUpdateObj,
instanceRef,
instanceUpdateObj
)
return Promise.resolve("image")
}

Expand Down Expand Up @@ -792,6 +828,33 @@ async function onTextListReceipt(messageObj: Message, platform = "whatsapp") {
return Promise.resolve(step)
}

async function addInstanceToDb(
id: string,
hasMatch: boolean,
messageRef: FirebaseFirestore.DocumentReference | null,
messageUpdateObj: Object | null = null,
instanceRef: FirebaseFirestore.DocumentReference,
instanceUpdateObj: Object
) {
const messageIdRef = db.collection("messageIds").doc(id)
try {
await db.runTransaction(async (t) => {
const doc = await t.get(messageIdRef)
if (doc.exists) {
return
}
if (!hasMatch && !!messageRef && !!messageUpdateObj) {
t.set(messageRef, messageUpdateObj)
}
t.set(instanceRef, instanceUpdateObj)
t.set(messageIdRef, { instanceRef: instanceRef })
})
functions.logger.log(`Transaction success for messageId ${id}!`)
} catch (e) {
functions.logger.error(`Transaction failure for messageId ${id}!`, e)
}
}

async function createNewUser(
userRef: admin.firestore.DocumentReference<admin.firestore.DocumentData>,
messageTimestamp: Timestamp
Expand Down
30 changes: 14 additions & 16 deletions functions/src/definitions/webhookHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { defineString } from "firebase-functions/params"
import { handleSpecialCommands } from "./specialCommands"
import { publishToTopic } from "./common/pubsub"
import { onRequest } from "firebase-functions/v2/https"
import { checkMessageId } from "./common/utils"

const runtimeEnvironment = defineString("ENVIRONMENT")
const testUserPhoneNumberId = defineString(
Expand Down Expand Up @@ -57,6 +58,18 @@ app.post("/whatsapp", async (req, res) => {
//handle db commands
await handleSpecialCommands(message)
} else {
if (message?.id) {
//if message has been processed before, don't even put it in queue.
if (await checkMessageId(message.id)) {
functions.logger.warn(`message ${message.id} already processed`)
res.sendStatus(200)
return
}
} else {
functions.logger.error(`message ${message.id} has no id`)
res.sendStatus(200)
return
}
if (
(type == "button" || type == "interactive" || type == "text") &&
phoneNumberId === checkerPhoneNumberId
Expand Down Expand Up @@ -122,21 +135,6 @@ app.get("/whatsapp", (req, res) => {
}
})

const webhookHandler = functions
.region("asia-southeast1")
.runWith({
secrets: [
"WHATSAPP_USER_BOT_PHONE_NUMBER_ID",
"WHATSAPP_CHECKERS_BOT_PHONE_NUMBER_ID",
"WHATSAPP_TOKEN",
"VERIFY_TOKEN",
"TYPESENSE_TOKEN",
"ML_SERVER_TOKEN",
"TELEGRAM_REPORT_BOT_TOKEN",
],
})
.https.onRequest(app)

const webhookHandlerV2 = onRequest(
{
secrets: [
Expand All @@ -148,4 +146,4 @@ const webhookHandlerV2 = onRequest(
app
)

export { app, webhookHandlerV2, webhookHandler }
export { app, webhookHandlerV2 }
1 change: 0 additions & 1 deletion functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ export { onInstanceCreate } from "./definitions/onInstanceCreate"
export { onInstanceDelete } from "./definitions/onInstanceDelete"
export { onInstanceUpdate } from "./definitions/onInstanceUpdate"
export { onMessageUpdate } from "./definitions/onMessageUpdate"
export { webhookHandler } from "./definitions/webhookHandler"
export { webhookHandlerV2 } from "./definitions/webhookHandler"
export { onVoteRequestUpdate } from "./definitions/onVoteRequestUpdate"
export { healthcheck } from "./definitions/healthcheck"
Expand Down
Loading

0 comments on commit 720308d

Please sign in to comment.