diff --git a/front/lib/api/assistant/email_trigger.ts b/front/lib/api/assistant/email_trigger.ts new file mode 100644 index 000000000000..2b2f40497657 --- /dev/null +++ b/front/lib/api/assistant/email_trigger.ts @@ -0,0 +1,465 @@ +import type { + AgentMessageType, + ConversationType, + LightAgentConfigurationType, + LightWorkspaceType, + Result, + UserType, +} from "@dust-tt/types"; +import { Err, isAgentMessageType, isDevelopment, Ok } from "@dust-tt/types"; +import { marked } from "marked"; +import sanitizeHtml from "sanitize-html"; +import { Op } from "sequelize"; + +import { getAgentConfigurations } from "@app/lib/api/assistant/configuration"; +import { + createConversation, + getConversation, + postNewContentFragment, +} from "@app/lib/api/assistant/conversation"; +import { postUserMessageWithPubSub } from "@app/lib/api/assistant/pubsub"; +import { sendEmail } from "@app/lib/api/email"; +import type { Authenticator } from "@app/lib/auth"; +import { + Conversation, + ConversationParticipant, +} from "@app/lib/models/assistant/conversation"; +import { User } from "@app/lib/models/user"; +import { Workspace } from "@app/lib/models/workspace"; +import { MembershipModel } from "@app/lib/resources/storage/models/membership"; +import { filterAndSortAgents } from "@app/lib/utils"; +import { renderLightWorkspaceType } from "@app/lib/workspace"; +import logger from "@app/logger/logger"; + +function renderUserType(user: User): UserType { + return { + sId: user.sId, + id: user.id, + createdAt: user.createdAt.getTime(), + provider: user.provider, + username: user.username, + email: user.email, + firstName: user.firstName, + lastName: user.lastName, + fullName: user.firstName + (user.lastName ? ` ${user.lastName}` : ""), + image: user.imageUrl, + }; +} + +export const ASSISTANT_EMAIL_SUBDOMAIN = isDevelopment() + ? "dev.dust.help" + : "run.dust.help"; + +export type InboundEmail = { + subject: string; + text: string; + auth: { SPF: string; dkim: string }; + envelope: { + to: string[]; + cc: string[]; + bcc: string[]; + from: string; + full: string; + }; +}; + +export type EmailTriggerError = { + type: + | "unexpected_error" + | "unauthenticated_error" + | "user_not_found" + | "workspace_not_found" + | "invalid_email_error" + | "assistant_not_found" + | "message_creation_error"; + message: string; +}; + +export function getTargetEmailsForWorkspace({ + allTargetEmails, + workspace, + isDefault, +}: { + allTargetEmails: string[]; + workspace: LightWorkspaceType; + isDefault: boolean; +}): string[] { + return allTargetEmails.filter( + (email) => + email.split("@")[0].endsWith(`[${workspace.sId}]`) || + // calls with no brackets go to default workspace + (!email.split("@")[0].endsWith("]") && isDefault) + ); +} + +export async function userAndWorkspacesFromEmail({ + email, +}: { + email: string; +}): Promise< + Result< + { + workspaces: LightWorkspaceType[]; + user: UserType; + defaultWorkspace: LightWorkspaceType; + }, + EmailTriggerError + > +> { + const user = await User.findOne({ + where: { email }, + }); + + if (!user) { + return new Err({ + type: "user_not_found", + message: + `Failed to match a valid Dust user for email: ${email}. ` + + `Please sign up for Dust at https://dust.tt to interact with assitsants over email.`, + }); + } + const workspaces = await Workspace.findAll({ + include: [ + { + model: MembershipModel, + where: { + userId: user.id, + endAt: { + [Op.or]: [{ [Op.is]: null }, { [Op.gte]: new Date() }], + }, + }, + }, + ], + }); + + if (!workspaces) { + return new Err({ + type: "workspace_not_found", + message: + `Failed to match a valid Dust workspace associated with email: ${email}. ` + + `Please sign up for Dust at https://dust.tt to interact with assistants over email.`, + }); + } + + // get latest conversation participation from user + const latestParticipation = await ConversationParticipant.findOne({ + where: { + userId: user.id, + }, + include: [ + { + model: Conversation, + }, + ], + order: [["createdAt", "DESC"]], + }); + + // TODO: implement good default logic + // a. most members? + // b. most recent activity? + const workspace = workspaces.find( + (w) => w.id === latestParticipation?.conversation?.workspaceId + ); + if (!workspace) { + return new Err({ + type: "unexpected_error", + message: "Failed to find a valid default workspace for user.", + }); + } + + const defaultWorkspace = renderLightWorkspaceType({ + workspace, + }); + + return new Ok({ + workspaces: workspaces.map((workspace) => + renderLightWorkspaceType({ workspace }) + ), + user: renderUserType(user), + defaultWorkspace, + }); +} + +export async function emailAssistantMatcher({ + auth, + targetEmail, +}: { + auth: Authenticator; + targetEmail: string; +}): Promise< + Result< + { + agentConfiguration: LightAgentConfigurationType; + }, + EmailTriggerError + > +> { + const agentConfigurations = await getAgentConfigurations({ + auth, + agentsGetView: "list", + variant: "light", + limit: undefined, + sort: undefined, + }); + + const agentPrefix = targetEmail.split("@")[0]; + + const matchingAgents = filterAndSortAgents(agentConfigurations, agentPrefix); + if (matchingAgents.length === 0) { + return new Err({ + type: "assistant_not_found", + message: `Failed to match a valid assistant with name prefix: '${agentPrefix}'.`, + }); + } + const agentConfiguration = matchingAgents[0]; + + return new Ok({ + agentConfiguration, + }); +} + +export async function splitThreadContent( + threadContent: string +): Promise<{ userMessage: string; restOfThread: string }> { + const lines = threadContent.split("\n"); + let userMessage = ""; + let restOfThread = ""; + let foundUserMessage = false; + for (const line of lines) { + if (foundUserMessage) { + restOfThread += line + "\n"; + } else { + if (line.startsWith("On ") && line.includes(" wrote:")) { + foundUserMessage = true; + } else if (line.startsWith("---------- Forwarded message ---------")) { + foundUserMessage = true; + } else { + userMessage += line + "\n"; + } + } + } + + return { userMessage: userMessage.trim(), restOfThread: restOfThread.trim() }; +} + +export async function triggerFromEmail({ + auth, + agentConfigurations, + email, +}: { + auth: Authenticator; + agentConfigurations: LightAgentConfigurationType[]; + email: InboundEmail; +}): Promise< + Result< + { + conversation: ConversationType; + answers: { + agentConfiguration: LightAgentConfigurationType; + agentMessage: AgentMessageType; + html: string; + }[]; + }, + EmailTriggerError + > +> { + const localLogger = logger.child({}); + const user = auth.user(); + if (!user) { + return new Err({ + type: "unexpected_error", + message: + "An unexpected error occurred. Please try again or contact us at team@dust.tt.", + }); + } + + let conversation = await createConversation(auth, { + title: `Email: ${email.subject}`, + visibility: "unlisted", + }); + + const { userMessage, restOfThread } = await splitThreadContent(email.text); + + // console.log("USER_MESSAGE", userMessage); + // console.log("REST_OF_THREAD", restOfThread, restOfThread.length); + + if (restOfThread.length > 0) { + await postNewContentFragment( + auth, + conversation, + { + title: `Email thread: ${email.subject}`, + content: restOfThread, + url: null, + contentType: "file_attachment", + }, + { + username: user.username, + fullName: user.fullName, + email: user.email, + profilePictureUrl: user.image, + } + ); + + const updatedConversationRes = await getConversation( + auth, + conversation.sId + ); + if (updatedConversationRes.isErr()) { + // if no conversation found, we just keep the conversation as is but do + // not err + if (updatedConversationRes.error.type !== "conversation_not_found") { + return new Err({ + type: "unexpected_error", + message: "Failed to update conversation with email thread.", + }); + } + } else { + conversation = updatedConversationRes.value; + } + } + + const content = + agentConfigurations + .map((agent) => { + return `:mention[${agent.name}]{sId=${agent.sId}}`; + }) + .join(" ") + + " " + + userMessage; + + const mentions = agentConfigurations.map((agent) => { + return { configurationId: agent.sId }; + }); + + const messageRes = await postUserMessageWithPubSub( + auth, + { + conversation, + content, + mentions, + context: { + timezone: Intl.DateTimeFormat().resolvedOptions().timeZone ?? "UTC", + username: user.username, + fullName: user.fullName, + email: user.email, + profilePictureUrl: user.image, + origin: "email", + }, + }, + { resolveAfterFullGeneration: true } + ); + + if (messageRes.isErr()) { + return new Err({ + type: "message_creation_error", + message: + `Error interacting with assistant: ` + + messageRes.error.api_error.message, + }); + } + + const updatedConversationRes = await getConversation(auth, conversation.sId); + + if (updatedConversationRes.isErr()) { + if (updatedConversationRes.error.type !== "conversation_not_found") { + return new Err({ + type: "unexpected_error", + message: "Failed to update conversation with user message.", + }); + } + } else { + conversation = updatedConversationRes.value; + } + + localLogger.info( + { + conversation: { + sId: conversation.sId, + }, + }, + "[email] Created conversation." + ); + + // console.log(conversation.content); + + // Last versions of each agent messages. + const agentMessages = agentConfigurations.map((ac) => { + const agentMessages = conversation.content.find((versions) => { + const item = versions[versions.length - 1]; + return ( + item && isAgentMessageType(item) && item.configuration.sId === ac.sId + ); + }) as AgentMessageType[]; + const last = agentMessages[agentMessages.length - 1]; + return { agentConfiguration: ac, agentMessage: last }; + }); + + const answers = await Promise.all( + agentMessages.map(async ({ agentConfiguration, agentMessage }) => { + return { + agentConfiguration, + agentMessage, + html: sanitizeHtml(await marked.parse(agentMessage.content || ""), { + // Allow images on top of all defaults from https://www.npmjs.com/package/sanitize-html + allowedTags: sanitizeHtml.defaults.allowedTags.concat(["img"]), + }), + }; + }) + ); + + return new Ok({ conversation, answers }); +} + +export async function replyToEmail({ + email, + agentConfiguration, + htmlContent, +}: { + email: InboundEmail; + agentConfiguration?: LightAgentConfigurationType; + htmlContent: string; +}) { + const name = agentConfiguration + ? `Dust Assistant (${agentConfiguration.name})` + : "Dust Assistant"; + const sender = agentConfiguration + ? `${agentConfiguration.name}@${ASSISTANT_EMAIL_SUBDOMAIN}` + : `assistants@${ASSISTANT_EMAIL_SUBDOMAIN}`; + + // subject: if Re: is there, we don't add it. + const subject = email.subject + .toLowerCase() + .replaceAll(" ", "") + .startsWith("re:") + ? email.subject + : `Re: ${email.subject}`; + + const quote = email.text + .replaceAll(">", ">") + .replaceAll("<", "<") + .split("\n") + .join("
\n"); + + const html = + "
\n" + + htmlContent + + `

` + + `On ${new Date().toUTCString()} ${email.envelope.full} wrote:
\n` + + `
\n` + + `${quote}` + + `
\n` + + "
\n"; + + const msg = { + from: { + name, + email: sender, + }, + reply_to: sender, + subject, + html, + }; + + await sendEmail(email.envelope.from, msg); +} diff --git a/front/pages/api/email/webhook.ts b/front/pages/api/email/webhook.ts new file mode 100644 index 000000000000..a434d334ff9c --- /dev/null +++ b/front/pages/api/email/webhook.ts @@ -0,0 +1,277 @@ +import type { Result, WithAPIErrorResponse } from "@dust-tt/types"; +import { Err, Ok, removeNulls } from "@dust-tt/types"; +import { IncomingForm } from "formidable"; +import type { NextApiRequest, NextApiResponse } from "next"; + +import type { + EmailTriggerError, + InboundEmail, +} from "@app/lib/api/assistant/email_trigger"; +import { + ASSISTANT_EMAIL_SUBDOMAIN, + emailAssistantMatcher, + getTargetEmailsForWorkspace, + replyToEmail, + triggerFromEmail, + userAndWorkspacesFromEmail, +} from "@app/lib/api/assistant/email_trigger"; +import { Authenticator } from "@app/lib/auth"; +import logger from "@app/logger/logger"; +import { apiError, withLogging } from "@app/logger/withlogging"; + +const { DUST_CLIENT_FACING_URL = "", EMAIL_WEBHOOK_SECRET = "" } = process.env; + +// Disabling Next.js's body parser as formidable has its own +export const config = { + api: { + bodyParser: false, + }, +}; + +// Parses the Sendgid webhook form data and validates it returning a fully formed InboundEmail. +const parseSendgridWebhookContent = async ( + req: NextApiRequest +): Promise> => { + const form = new IncomingForm(); + const [fields] = await form.parse(req); + + try { + const subject = fields["subject"] ? fields["subject"][0] : null; + const text = fields["text"] ? fields["text"][0] : null; + const full = fields["from"] ? fields["from"][0] : null; + const SPF = fields["SPF"] ? fields["SPF"][0] : null; + const dkim = fields["dkim"] ? fields["dkim"][0] : null; + const envelope = fields["envelope"] + ? JSON.parse(fields["envelope"][0]) + : null; + + if (!envelope) { + return new Err(new Error("Failed to parse envelope")); + } + + const from = envelope.from; + + if (!from || typeof from !== "string") { + return new Err(new Error("Failed to parse envelope.from")); + } + if (!full || typeof full !== "string") { + return new Err(new Error("Failed to parse from")); + } + + return new Ok({ + subject: subject || "(no subject)", + text: text || "", + auth: { SPF: SPF || "", dkim: dkim || "" }, + envelope: { + to: envelope.to || [], + cc: envelope.cc || [], + bcc: envelope.bcc || [], + from, + full, + }, + }); + } catch (e) { + return new Err(new Error("Failed to parse email content")); + } +}; + +const replyToError = async ( + email: InboundEmail, + error: EmailTriggerError +): Promise => { + logger.error( + { error, envelope: email.envelope }, + "[email] Error handling email." + ); + const htmlContent = + `

Error running assistant:

\n` + + `

(${error.type}) ${error.message}

\n`; + await replyToEmail({ email, htmlContent }); +}; + +export type PostResponseBody = { + success: boolean; +}; + +async function handler( + req: NextApiRequest, + res: NextApiResponse> +): Promise { + switch (req.method) { + case "POST": + const authHeader = req.headers.authorization; + + if (!authHeader || !authHeader.startsWith("Basic ")) { + return apiError(req, res, { + status_code: 401, + api_error: { + type: "missing_authorization_header_error", + message: "Missing Authorization header", + }, + }); + } + + const base64Credentials = authHeader.split(" ")[1]; + const credentials = Buffer.from(base64Credentials, "base64").toString( + "ascii" + ); + const [username, password] = credentials.split(":"); + + if (username !== "sendgrid" || password !== EMAIL_WEBHOOK_SECRET) { + return apiError(req, res, { + status_code: 403, + api_error: { + type: "invalid_basic_authorization_error", + message: "Invalid Authorization header", + }, + }); + } + + const emailRes = await parseSendgridWebhookContent(req); + if (emailRes.isErr()) { + return apiError(req, res, { + status_code: 401, + api_error: { + type: "invalid_request_error", + message: emailRes.error.message, + }, + }); + } + + const email = emailRes.value; + + // temporary gating: only dust.tt emails are allowed to trigger the assistant + if (!email.envelope.from.endsWith("@dust.tt")) { + return apiError(req, res, { + status_code: 401, + api_error: { + type: "invalid_request_error", + message: "Only dust.tt emails are allowed to trigger the assistant", + }, + }); + } + + // At this stage we have a valid email in we can respond 200 to the webhook, no more apiError + // possible below this point, errors should be reported to the sender. + res.status(200).json({ success: true }); + + // Check SPF is pass. + if ( + email.auth.SPF !== "pass" || + email.auth.dkim !== `{@${email.envelope.from.split("@")[1]} : pass}` + ) { + await replyToError(email, { + type: "unauthenticated_error", + message: + "Failed to authenticate your email (SPF/dkim validation failed).", + }); + return; + } + + const userRes = await userAndWorkspacesFromEmail({ + email: email.envelope.from, + }); + if (userRes.isErr()) { + await replyToError(email, userRes.error); + return; + } + + const { user, workspaces, defaultWorkspace } = userRes.value; + + // find target email in [...to, ...cc, ...bcc], that is email whose domain is + // ASSISTANT_EMAIL_SUBDOMAIN. + const allTargetEmails = [ + ...(email.envelope.to ?? []), + ...(email.envelope.cc ?? []), + ...(email.envelope.bcc ?? []), + ].filter((email) => email.endsWith(`@${ASSISTANT_EMAIL_SUBDOMAIN}`)); + + const workspacesAndEmails = workspaces + .map((workspace) => { + return { + workspace, + targetEmails: getTargetEmailsForWorkspace({ + allTargetEmails, + workspace, + isDefault: workspace.sId === defaultWorkspace.sId, + }), + }; + }) + .filter(({ targetEmails }) => (targetEmails as string[]).length > 0); + + if (workspacesAndEmails.length === 0) { + await replyToError(email, { + type: "invalid_email_error", + message: + `Failed to match any valid assistant email. ` + + `Expected assistant email format: {ASSISTANT_NAME}@${ASSISTANT_EMAIL_SUBDOMAIN}.`, + }); + } + + for (const { workspace, targetEmails } of workspacesAndEmails) { + const auth = await Authenticator.fromUserIdAndWorkspaceId( + user.sId, + workspace.sId + ); + + const agentConfigurations = removeNulls( + await Promise.all( + targetEmails.map(async (targetEmail) => { + const matchRes = await emailAssistantMatcher({ + auth, + targetEmail, + }); + if (matchRes.isErr()) { + await replyToError(email, matchRes.error); + return null; + } + + return matchRes.value.agentConfiguration; + }) + ) + ); + + if (agentConfigurations.length === 0) { + return; + } + + const answerRes = await triggerFromEmail({ + auth, + agentConfigurations, + email, + }); + + if (answerRes.isErr()) { + await replyToError(email, answerRes.error); + return; + } + + const { conversation, answers } = answerRes.value; + + answers.forEach(async (answer) => { + void replyToEmail({ + email, + agentConfiguration: answer.agentConfiguration, + htmlContent: `
${ + answer.html + }

Open in Dust
`, + }); + }); + } + return; + + default: + return apiError(req, res, { + status_code: 405, + api_error: { + type: "method_not_supported_error", + message: + "The method passed is not supported, GET or POST is expected.", + }, + }); + } +} + +export default withLogging(handler); diff --git a/types/src/front/assistant/conversation.ts b/types/src/front/assistant/conversation.ts index d8ce48b6a6bb..8bc82aca9283 100644 --- a/types/src/front/assistant/conversation.ts +++ b/types/src/front/assistant/conversation.ts @@ -57,6 +57,7 @@ export type UserMessageOrigin = | "slack" | "web" | "api" + | "email" | "gsheet" | "zapier" | "make" diff --git a/types/src/front/lib/error.ts b/types/src/front/lib/error.ts index b6799070b974..373dc9df7dab 100644 --- a/types/src/front/lib/error.ts +++ b/types/src/front/lib/error.ts @@ -10,6 +10,7 @@ export type APIErrorType = | "not_authenticated" | "missing_authorization_header_error" | "malformed_authorization_header_error" + | "invalid_basic_authorization_error" | "invalid_api_key_error" | "internal_server_error" | "invalid_request_error"