Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync workspace usage w/ hourly cron instead of on-demand #1997

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions apps/web/app/api/cron/usage/sync/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { handleAndReturnErrorResponse } from "@/lib/api/errors";
import { verifyQstashSignature } from "@/lib/cron/verify-qstash";
import { verifyVercelSignature } from "@/lib/cron/verify-vercel";
import { conn } from "@/lib/planetscale";
import { tb } from "@/lib/tinybird";
import z from "@/lib/zod";
import { log } from "@dub/utils";
import { NextResponse } from "next/server";

/*
This route is used to sync the usage stats of each workspace.
Runs once every hour (0 * * * *)
*/
export const dynamic = "force-dynamic";

async function handler(req: Request) {
try {
if (req.method === "GET") {
await verifyVercelSignature(req);
} else if (req.method === "POST") {
await verifyQstashSignature({
req,
rawBody: await req.text(),
});
}

const pipe = tb.buildPipe({
pipe: "v2_usage_sync",
data: z.object({
workspaceId: z.string(),
clicks: z.number(),
}),
});

const response = await pipe({});
const data = response.data;

// Process in batches of 100, since this can be >1000 rows to update
const BATCH_SIZE = 100;
for (let i = 0; i < data.length; i += BATCH_SIZE) {
const batch = data.slice(i, i + BATCH_SIZE);
await Promise.all(
batch.map(async (data) => {
await conn.execute(
`UPDATE Project SET usage = usage + ? WHERE id = ?`,
[data.clicks, data.workspaceId],
);
}),
);
// Add a 1 second delay between batches
await new Promise((resolve) => setTimeout(resolve, 1000));
}

return NextResponse.json(response.data);
} catch (error) {
await log({
message: `Error updating usage: ${error.message}`,
type: "cron",
});

return handleAndReturnErrorResponse(error);
}
}

export { handler as GET, handler as POST };
10 changes: 1 addition & 9 deletions apps/web/lib/tinybird/record-click.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export async function recordClick({

const hasWebhooks = webhookIds && webhookIds.length > 0;

const [, , , , workspaceRows] = await Promise.all([
const [, , , workspaceRows] = await Promise.all([
fetch(
`${process.env.TINYBIRD_API_URL}/v0/events?name=dub_click_events&wait=true`,
{
Expand All @@ -154,14 +154,6 @@ export async function recordClick({
"UPDATE Link SET clicks = clicks + 1, lastClicked = NOW() WHERE id = ?",
[linkId],
),
// if the link has a destination URL, increment the usage count for the workspace
// and then we have a cron that will reset it at the start of new billing cycle
url &&
conn.execute(
"UPDATE Project p JOIN Link l ON p.id = l.projectId SET p.usage = p.usage + 1 WHERE l.id = ?",
[linkId],
),

// fetch the workspace usage for the workspace
workspaceId && hasWebhooks
? conn.execute(
Expand Down
4 changes: 4 additions & 0 deletions apps/web/vercel.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
"path": "/api/cron/usage",
"schedule": "0 12 * * *"
},
{
"path": "/api/cron/usage/sync",
"schedule": "0 * * * *"
},
{
"path": "/api/cron/cleanup",
"schedule": "0 */6 * * *"
Expand Down
Loading