Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions apps/server/src/lib/openrouter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Buffer } from "node:buffer";
import { createCipheriv, createDecipheriv, createHash, randomBytes } from "node:crypto";
import { createCipheriv, createDecipheriv, pbkdf2Sync, randomBytes } from "node:crypto";
import { eq } from "drizzle-orm";

import { db } from "../db";
Expand Down Expand Up @@ -67,7 +67,10 @@ function getEncryptionKey() {
if (!secret || secret.length < 16) {
throw new Error("Missing OPENROUTER_API_KEY_SECRET env for encrypting OpenRouter API keys");
}
return createHash("sha256").update(secret).digest();
const salt = "openrouter:key-derivation-salt";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Using a fixed salt reduces security compared to random salts. Consider generating unique salts per user or per environment.

const iterations = 100_000;
const keyLength = 32; // AES-256-GCM expects 32-byte key
return pbkdf2Sync(secret, salt, iterations, keyLength, "sha256");
}

function base64UrlEncode(buffer: Buffer) {
Expand Down
48 changes: 41 additions & 7 deletions apps/server/src/lib/posthog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,32 @@ import { withTracing } from "@posthog/ai";

let client: PostHog | null = null;

const APP_VERSION =
process.env.SERVER_APP_VERSION ??
process.env.APP_VERSION ??
process.env.NEXT_PUBLIC_APP_VERSION ??
process.env.VERCEL_GIT_COMMIT_SHA ??
"dev";

const DEPLOYMENT =
process.env.SERVER_DEPLOYMENT ??
process.env.DEPLOYMENT ??
process.env.POSTHOG_DEPLOYMENT ??
process.env.VERCEL_ENV ??
(process.env.NODE_ENV === "production" ? "prod" : "local");

const ENVIRONMENT = process.env.POSTHOG_ENVIRONMENT ?? process.env.NODE_ENV ?? "development";
const DEPLOYMENT_REGION =
process.env.POSTHOG_DEPLOYMENT_REGION ?? process.env.VERCEL_REGION ?? "local";

const BASE_SUPER_PROPERTIES = Object.freeze({
app: "openchat-server",
app_version: APP_VERSION,
deployment: DEPLOYMENT,
environment: ENVIRONMENT,
deployment_region: DEPLOYMENT_REGION,
});

function buildClient() {
const apiKey = process.env.POSTHOG_API_KEY;
if (!apiKey) return null;
Expand All @@ -13,6 +39,7 @@ function buildClient() {
flushAt: 1,
flushInterval: 5_000,
});
client.register(BASE_SUPER_PROPERTIES);
return client;
}

Expand All @@ -27,13 +54,20 @@ export function capturePosthogEvent(
) {
const instance = buildClient();
if (!instance || !distinctId) return;
instance.capture({
distinctId,
event,
properties,
}).catch((error: unknown) => {
console.error("[posthog] capture failed", error);
});
const sanitized: Record<string, unknown> = {};
for (const [key, value] of Object.entries(properties)) {
if (value === undefined) continue;
sanitized[key] = value;
}
instance
.capture({
distinctId,
event,
properties: sanitized,
})
.catch((error: unknown) => {
console.error("[posthog] capture failed", error);
});
}

export function withPosthogTracing<Model extends (...args: any[]) => any>(
Expand Down
95 changes: 68 additions & 27 deletions apps/server/src/routers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,45 +113,55 @@ export const appRouter = {
.optional(),
)
.handler(async ({ context, input }) => {
const userId = context.session!.user.id;
const id = input?.id ?? cuid();
const now = new Date();
const title = input?.title ?? "New Chat";
let storageBackend: "postgres" | "memory_fallback" = "postgres";
try {
await db.insert(chat).values({
id,
userId: context.session!.user.id,
userId,
title,
createdAt: now,
updatedAt: now,
lastMessageAt: now,
});
// emit sidebar add
publish(
`chats:index:${context.session!.user.id}`,
`chats:index:${userId}`,
"chats.index.add",
{ chatId: id, title, updatedAt: now, lastMessageAt: now },
);
} catch {
addFallbackChat(context.session!.user.id, {
storageBackend = "memory_fallback";
addFallbackChat(userId, {
id,
userId: context.session!.user.id,
userId,
title,
createdAt: now,
updatedAt: now,
lastMessageAt: now,
});
publish(
`chats:index:${context.session!.user.id}`,
`chats:index:${userId}`,
"chats.index.add",
{ chatId: id, title, updatedAt: now, lastMessageAt: now },
);
capturePosthogEvent("workspace.fallback_storage_used", userId, {
operation: "create",
chat_id: id,
fallback_size: (memChatsByUser.get(userId) ?? []).length,
workspace_id: userId,
});
}
capturePosthogEvent("chat_created", context.session!.user.id, {
chatId: id,
title,
recordedAt: now.toISOString(),
capturePosthogEvent("chat.created", userId, {
chat_id: id,
title_length: title.length,
storage_backend: storageBackend,
source: "server_router",
workspace_id: userId,
});
return { id };
return { id, storageBackend };
}),
// List chats for the current user (sorted by last activity)
list: protectedProcedure.handler(async ({ context }) => {
Expand All @@ -163,8 +173,15 @@ export const appRouter = {
.orderBy(desc(chat.lastMessageAt), desc(chat.updatedAt));
return rows;
} catch {
pruneUserChats(context.session!.user.id);
const list = memChatsByUser.get(context.session!.user.id) ?? [];
const userId = context.session!.user.id;
pruneUserChats(userId);
const list = memChatsByUser.get(userId) ?? [];
capturePosthogEvent("workspace.fallback_storage_used", userId, {
operation: "list",
chat_id: null,
fallback_size: list.length,
workspace_id: userId,
});
return list.map(({ id, title, lastMessageAt, updatedAt }) => ({ id, title, lastMessageAt, updatedAt }));
}
}),
Expand Down Expand Up @@ -231,13 +248,21 @@ export const appRouter = {
} else {
memMsgsByChat.delete(input.chatId);
}
const permissibleChats = pruneChatList(memChatsByUser.get(context.session!.user.id) ?? []);
const userId = context.session!.user.id;
const permissibleChats = pruneChatList(memChatsByUser.get(userId) ?? []);
if (permissibleChats.length > 0) {
memChatsByUser.set(context.session!.user.id, permissibleChats);
memChatsByUser.set(userId, permissibleChats);
}
const hasAccess = permissibleChats.some((c) => c.id === input.chatId);
if (!hasAccess) return [];
return (memMsgsByChat.get(input.chatId) ?? prunedMessages)
const fallbackMessages = memMsgsByChat.get(input.chatId) ?? prunedMessages;
capturePosthogEvent("workspace.fallback_storage_used", userId, {
operation: "list",
chat_id: input.chatId,
fallback_size: fallbackMessages.length,
workspace_id: userId,
});
return fallbackMessages
.map(({ id, role, content, createdAt }) => ({ id, role, content, createdAt }));
}
}),
Expand All @@ -261,6 +286,7 @@ export const appRouter = {
}),
)
.handler(async ({ context, input }) => {
const userId = context.session!.user.id;
const userCreatedAt = input.userMessage.createdAt ? new Date(input.userMessage.createdAt) : new Date();
const assistantProvided = input.assistantMessage != null;
const assistantCreatedAt = assistantProvided
Expand All @@ -275,7 +301,7 @@ export const appRouter = {
const owned = await db
.select({ id: chat.id })
.from(chat)
.where(and(eq(chat.id, input.chatId), eq(chat.userId, context.session!.user.id)));
.where(and(eq(chat.id, input.chatId), eq(chat.userId, userId)));
if (owned.length === 0) return { ok: false as const };

await db
Expand Down Expand Up @@ -333,14 +359,14 @@ export const appRouter = {
.set({ updatedAt: lastActivity, lastMessageAt: lastActivity })
.where(eq(chat.id, input.chatId));
publish(
`chats:index:${context.session!.user.id}`,
`chats:index:${userId}`,
"chats.index.update",
{ chatId: input.chatId, updatedAt: lastActivity, lastMessageAt: lastActivity },
);
return { ok: true as const, userMessageId: userMsgId, assistantMessageId: assistantMsgId };
} catch {
pruneUserChats(context.session!.user.id);
const userChats = memChatsByUser.get(context.session!.user.id) ?? [];
pruneUserChats(userId);
const userChats = memChatsByUser.get(userId) ?? [];
if (!userChats.some((c) => c.id === input.chatId)) return { ok: false as const };
addFallbackMessage(input.chatId, {
id: userMsgId,
Expand Down Expand Up @@ -377,12 +403,19 @@ export const appRouter = {
record.updatedAt = latest;
record.lastMessageAt = latest;
}
memChatsByUser.set(context.session!.user.id, pruneChatList(owned));
memChatsByUser.set(userId, pruneChatList(owned));
publish(
`chats:index:${context.session!.user.id}`,
`chats:index:${userId}`,
"chats.index.update",
{ chatId: input.chatId, updatedAt: assistantCreatedAt ?? userCreatedAt, lastMessageAt: assistantCreatedAt ?? userCreatedAt },
);
const fallbackMessages = memMsgsByChat.get(input.chatId) ?? [];
capturePosthogEvent("workspace.fallback_storage_used", userId, {
operation: "send",
chat_id: input.chatId,
fallback_size: fallbackMessages.length,
workspace_id: userId,
});
return { ok: true as const, userMessageId: userMsgId, assistantMessageId: assistantMsgId };
}
}),
Expand All @@ -398,6 +431,7 @@ export const appRouter = {
}),
)
.handler(async ({ context, input }) => {
const userId = context.session!.user.id;
const createdAt = input.createdAt ? new Date(input.createdAt) : new Date();
const now = new Date();
const content = input.content ?? '';
Expand All @@ -408,7 +442,7 @@ export const appRouter = {
const owned = await db
.select({ id: chat.id })
.from(chat)
.where(and(eq(chat.id, input.chatId), eq(chat.userId, context.session!.user.id)));
.where(and(eq(chat.id, input.chatId), eq(chat.userId, userId)));
if (owned.length === 0) return { ok: false as const };

let inserted = false;
Expand Down Expand Up @@ -456,7 +490,7 @@ export const appRouter = {
}

publish(
`chats:index:${context.session!.user.id}`,
`chats:index:${userId}`,
'chats.index.update',
sidebarPayload,
);
Expand All @@ -477,8 +511,8 @@ export const appRouter = {

return { ok: true as const };
} catch {
pruneUserChats(context.session!.user.id);
const userChats = memChatsByUser.get(context.session!.user.id) ?? [];
pruneUserChats(userId);
const userChats = memChatsByUser.get(userId) ?? [];
if (!userChats.some((c) => c.id === input.chatId)) return { ok: false as const };

const existingMessages = memMsgsByChat.get(input.chatId) ?? [];
Expand Down Expand Up @@ -514,7 +548,7 @@ export const appRouter = {
record.lastMessageAt = createdAt;
}
}
memChatsByUser.set(context.session!.user.id, pruneChatList(userChats));
memChatsByUser.set(userId, pruneChatList(userChats));

publish(
`chat:${input.chatId}`,
Expand All @@ -529,6 +563,13 @@ export const appRouter = {
updatedAt: now,
},
);
const fallbackMessages = memMsgsByChat.get(input.chatId) ?? [];
capturePosthogEvent("workspace.fallback_storage_used", userId, {
operation: "streamUpsert",
chat_id: input.chatId,
fallback_size: fallbackMessages.length,
workspace_id: userId,
});
return { ok: true as const };
}
}),
Expand Down
Loading