Skip to content

Commit

Permalink
refactor: Use reply-to queue
Browse files Browse the repository at this point in the history
  • Loading branch information
amaury1093 committed Dec 11, 2023
1 parent d51e0de commit 1b9bece
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 138 deletions.
13 changes: 0 additions & 13 deletions src/components/Demo.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,6 @@ export function Demo({ onVerified }: DemoProps): React.ReactElement {
setLoading(false);
return onVerified && onVerified(r);
})
.catch((err: Error) => {
// Message can be:
// Results contain 0 rows
// The result contains 0 rows
if (err.message.includes("0 rows")) {
throw new Error(
`The email ${email} can't be verified within 1 minute. This is because the email provider imposes obstacles to prevent real-time email verification, such as greylisting.
Please try again later.`
);
}

throw err;
})
.catch((err: Error) => {
sentryException(err);
alertError(email, err.message);
Expand Down
69 changes: 0 additions & 69 deletions src/pages/api/calls/webhook.ts

This file was deleted.

125 changes: 69 additions & 56 deletions src/pages/api/v0/check_email.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import type { CheckEmailInput, CheckEmailOutput } from "@reacherhq/api";
import { PostgrestError } from "@supabase/supabase-js";
import { NextApiRequest, NextApiResponse } from "next";
import { v4 } from "uuid";
import amqplib from "amqplib";
import dns from "dns/promises";

import { checkUserInDB, cors } from "@/util/api";
import { getWebappURL } from "@/util/helpers";
import { updateSendinblue } from "@/util/sendinblue";
import { sentryException } from "@/util/sentry";
import { SupabaseCall } from "@/util/supabaseClient";
import { supabaseAdmin } from "@/util/supabaseServer";
import { WebhookExtra } from "../calls/webhook";

const TIMEOUT = 50000;
const MAX_PRIORITY = 5; // Higher is faster, 5 is max.
Expand Down Expand Up @@ -54,6 +51,55 @@ const POST = async (
const ch1 = await conn.createChannel().catch((err) => {
throw new Error(`Error creating RabbitMQ channel: ${err.message}`);
});

// Listen to the reply on this reply queue.
// Follow https://www.rabbitmq.com/tutorials/tutorial-six-javascript.html
const replyQ = await ch1.assertQueue("", {
exclusive: true,
});
await ch1.consume(
replyQ.queue,
async function (msg) {
if (msg?.properties.correlationId === verificationId) {
const output = JSON.parse(msg.content.toString());

// Add to supabase
const response = await supabaseAdmin
.from<SupabaseCall>("calls")
.insert({
endpoint: "/v0/check_email",
user_id: user.id,
backend: output.debug?.server_name,
domain: output.syntax.domain,
verification_id: verificationId,
duration: Math.round(
(output.debug?.duration.secs || 0) * 1000 +
(output.debug?.duration.nanos || 0) /
1000000
),
is_reachable: output.is_reachable,
verif_method:
output.debug?.smtp?.verif_method?.type,
result: removeSensitiveData(output),
});
if (response.error) {
res.status(response.status).json(response.error);
return;
}

await ch1.close();
await conn.close();

await updateSendinblue(user);

res.status(200).json(output);
}
},
{
noAck: true,
}
);

const verifMethod = await getVerifMethod(req.body as CheckEmailInput);
const queueName = `check_email.${
// If the verifMethod is "Api", we use the "Headless" queue instead,
Expand All @@ -74,67 +120,23 @@ const POST = async (
Buffer.from(
JSON.stringify({
input: req.body as CheckEmailInput,
webhook: {
url: `${getWebappURL()}/api/calls/webhook`,
extra: {
userId: user.id,
endpoint: "/v0/check_email",
verificationId: verificationId,
} as WebhookExtra,
},
})
),
{
contentType: "application/json",
priority: MAX_PRIORITY,
correlationId: verificationId,
replyTo: replyQ.queue,
}
);

await ch1.close();

// Poll the database to make sure the call was added.
let checkEmailOutput: CheckEmailOutput | undefined;
let lastError: PostgrestError | Error | null = new Error(
"Timeout verifying email."
);

const startTime = Date.now();
while (!checkEmailOutput && Date.now() - startTime < TIMEOUT - 2000) {
await new Promise((resolve) => setTimeout(resolve, 500));

const response = await supabaseAdmin
.from<SupabaseCall>("calls")
.select("*")
.eq("verification_id", verificationId)
.single();

// If there's no error, it means the result has been added to the
// database.
lastError = response.error;
if (!response.error) {
checkEmailOutput = response.data.result;
break;
}
}

if (lastError) {
res.status(500).json({
...lastError,
error: lastError.message,
});
return;
}

if (!checkEmailOutput) {
res.status(500).json({
error: "Column result was not populated.",
setTimeout(() => {
res.status(504).json({
error: `The email ${
(req.body as CheckEmailInput).to_email
} can't be verified within 1 minute. This is because the email provider imposes obstacles to prevent real-time email verification, such as greylisting. Please try again later.`,
});
return;
}

res.status(200).json(checkEmailOutput);

// Update the LAST_API_CALL field in Sendinblue.
await updateSendinblue(user);
}, TIMEOUT);
} catch (err) {
sentryException(err as Error);
res.status(500).json({
Expand Down Expand Up @@ -177,3 +179,14 @@ async function getVerifMethod(input: CheckEmailInput): Promise<string> {
return "Smtp";
}
}

// Remove sensitive data before storing to DB.
function removeSensitiveData(output: CheckEmailOutput): CheckEmailOutput {
const newOutput = { ...output };

// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
delete newOutput.debug?.server_name;

return newOutput;
}

1 comment on commit 1b9bece

@vercel
Copy link

@vercel vercel bot commented on 1b9bece Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

webapp – ./

webapp-reacher.vercel.app
api.reacher.email
webapp-git-master-reacher.vercel.app
app.reacher.email

Please sign in to comment.