Skip to content

Commit

Permalink
Refactor worker entrypoint (#794)
Browse files Browse the repository at this point in the history
* refactor

* move initial tag

* Adjust tests

* Fix groupVersions
  • Loading branch information
ludeeus authored Oct 23, 2023
1 parent 58a2b78 commit e69241e
Show file tree
Hide file tree
Showing 9 changed files with 380 additions and 255 deletions.
20 changes: 20 additions & 0 deletions worker/src/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@ export const BRANDS_DOMAINS_URL =
"https://brands.home-assistant.io/domains.json";
export const VERSION_URL = "https://version.home-assistant.io/dev.json";

export interface WorkerEnv {
KV: KVNamespace;
NETLIFY_BUILD_HOOK: string;
SENTRY_DSN: string;
WORKER_ENV: string;
}

export interface WorkerEvent {
env: WorkerEnv;
ctx: ExecutionContext;
}

export interface FetchWorkerEvent extends WorkerEvent {
request: CfRequest;
}

export interface ScheduledWorkerEvent extends WorkerEvent {
controller: ScheduledController;
}

export enum UuidMetadataKey {
ADDED = "a",
COUNTRY = "c",
Expand Down
17 changes: 11 additions & 6 deletions worker/src/handlers/post.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Receive data from a Home Assistant installation
import { Toucan } from "toucan-js";
import {
CfRequest,
FetchWorkerEvent,
generateUuidMetadata,
IncomingPayload,
KV_PREFIX_UUID,
Expand All @@ -16,11 +16,11 @@ const expirationTtl = 5184000;
const withRegion = new Set(["US"]);

export async function handlePostWrapper(
request: CfRequest,
event: FetchWorkerEvent,
sentry: Toucan
): Promise<Response> {
try {
return await handlePost(request, sentry);
return await handlePost(event, sentry);
} catch (e: any) {
const captureId = sentry.captureException(e);
console.error(`${e?.message} (${captureId})`);
Expand All @@ -29,9 +29,10 @@ export async function handlePostWrapper(
}

export async function handlePost(
request: CfRequest,
event: FetchWorkerEvent,
sentry: Toucan
): Promise<Response> {
const { request } = event;
let incomingPayload;
sentry.addBreadcrumb({ message: "Process started" });
const request_json = await request.json<Record<string, any>>();
Expand Down Expand Up @@ -64,11 +65,12 @@ export async function handlePost(
const stored: {
value?: IncomingPayload | null;
metadata?: UuidMetadata | null;
} = await KV.getWithMetadata(storageKey, "json");
} = await event.env.KV.getWithMetadata(storageKey, "json");

if (!stored || !stored.value) {
sentry.addBreadcrumb({ message: "First contact for UUID, store payload" });
await storePayload(
event,
storageKey,
incomingPayload,
stringifiedPayload,
Expand All @@ -89,6 +91,7 @@ export async function handlePost(
if (!deepEqual(stored.value, JSON.parse(stringifiedPayload))) {
sentry.addBreadcrumb({ message: "Payload changed, update stored data" });
await storePayload(
event,
storageKey,
incomingPayload,
stringifiedPayload,
Expand All @@ -107,6 +110,7 @@ export async function handlePost(
});

await storePayload(
event,
storageKey,
incomingPayload,
stringifiedPayload,
Expand All @@ -120,13 +124,14 @@ export async function handlePost(
}

async function storePayload(
event: FetchWorkerEvent,
storageKey: string,
payload: IncomingPayload,
stringifiedPayload: string,
currentTimestamp: number,
metadata?: UuidMetadata | null
) {
await KV.put(storageKey, stringifiedPayload, {
await event.env.KV.put(storageKey, stringifiedPayload, {
expirationTtl,
metadata: generateUuidMetadata(payload, currentTimestamp, metadata),
});
Expand Down
82 changes: 49 additions & 33 deletions worker/src/handlers/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,28 @@ import {
BRANDS_DOMAINS_URL,
VERSION_URL,
VersionResponse,
ScheduledWorkerEvent,
} from "../data";
import { groupVersions } from "../utils/group-versions";
import { median } from "../utils/median";
import { migrateAnalyticsData } from "../utils/migrate";

export async function handleSchedule(
event: ScheduledEvent,
event: ScheduledWorkerEvent,
sentry: Toucan
): Promise<void> {
const scheduledTask = event.cron;
const scheduledTask = event.controller.cron;

try {
if (scheduledTask === ScheduledTask.PROCESS_QUEUE) {
// Runs every 2 minutes
await processQueue(sentry);
await processQueue(event, sentry);
} else if (scheduledTask === ScheduledTask.RESET_QUEUE) {
// Runs every day
await resetQueue(sentry);
await resetQueue(event, sentry);
} else if (scheduledTask === ScheduledTask.UPDATE_HISTORY) {
// Runs every hour
await updateHistory(sentry);
await updateHistory(event, sentry);
} else {
throw new Error(`Unexpected schedule task: ${scheduledTask}`);
}
Expand All @@ -55,46 +56,55 @@ export async function handleSchedule(
}
}

const getQueueData = async (): Promise<Queue> =>
(await KV.get<Queue>(KV_KEY_QUEUE, "json")) || createQueueDefaults();
const getQueueData = async (event: ScheduledWorkerEvent): Promise<Queue> =>
(await event.env.KV.get<Queue>(KV_KEY_QUEUE, "json")) ||
createQueueDefaults();

const getAnalyticsData = async (): Promise<AnalyticsData> => {
const data = await KV.get(KV_KEY_CORE_ANALYTICS, "json");
const getAnalyticsData = async (
event: ScheduledWorkerEvent
): Promise<AnalyticsData> => {
const data = await event.env.KV.get(KV_KEY_CORE_ANALYTICS, "json");
return migrateAnalyticsData(data);
};

async function resetQueue(sentry: Toucan): Promise<void> {
async function resetQueue(
event: ScheduledWorkerEvent,
sentry: Toucan
): Promise<void> {
sentry.setTag("scheduled-task", "RESET_QUEUE");
sentry.addBreadcrumb({ message: "Process started" });
const queue = await getQueueData();
const queue = await getQueueData(event);

sentry.setExtra("queue", queue);

if (queue.entries.length === 0 && queue.process_complete) {
sentry.addBreadcrumb({ message: "Store reset queue" });
await KV.put(KV_KEY_QUEUE, JSON.stringify(createQueueDefaults()));
await event.env.KV.put(KV_KEY_QUEUE, JSON.stringify(createQueueDefaults()));
}
sentry.addBreadcrumb({ message: "Process complete" });
}

async function updateHistory(sentry: Toucan): Promise<void> {
async function updateHistory(
event: ScheduledWorkerEvent,
sentry: Toucan
): Promise<void> {
sentry.setTag("scheduled-task", "UPDATE_HISTORY");
sentry.addBreadcrumb({ message: "Process started" });
let data = createQueueData();

sentry.addBreadcrumb({ message: "Get current data" });
const analyticsData = await getAnalyticsData();
const analyticsData = await getAnalyticsData(event);
const timestamp = new Date().getTime();
const timestampString = String(timestamp);

sentry.addBreadcrumb({ message: "List UUID entries" });
const kv_list = await listKV(KV_PREFIX_UUID);
const kv_list = await listKV(event, KV_PREFIX_UUID);

const missingMetata = kv_list.filter((entry) => !entry.metadata);

async function handleMissingMetadata(entry: ListEntry) {
const value = await KV.get<IncomingPayload>(entry.name, "json");
await KV.put(entry.name, JSON.stringify(value), {
const value = await event.env.KV.get<IncomingPayload>(entry.name, "json");
await event.env.KV.put(entry.name, JSON.stringify(value), {
expiration: entry.expiration,
metadata: generateUuidMetadata(value!, timestamp),
});
Expand Down Expand Up @@ -136,26 +146,29 @@ async function updateHistory(sentry: Toucan): Promise<void> {
timestamp: timestampString,
active_installations,
installation_types: data.installation_types,
versions: groupVersions(data.versions),
versions: groupVersions(event, data.versions),
});

sentry.addBreadcrumb({ message: "Trigger Netlify build" });
const resp = await fetch(NETLIFY_BUILD_HOOK, { method: "POST" });
const resp = await fetch(event.env.NETLIFY_BUILD_HOOK, { method: "POST" });
if (!resp.ok) {
throw new Error("Failed to call Netlify build hook");
}

sentry.addBreadcrumb({ message: "Store data" });
await KV.put(KV_KEY_CORE_ANALYTICS, JSON.stringify(analyticsData));
await event.env.KV.put(KV_KEY_CORE_ANALYTICS, JSON.stringify(analyticsData));

sentry.addBreadcrumb({ message: "Process complete" });
}

async function processQueue(sentry: Toucan): Promise<void> {
async function processQueue(
event: ScheduledWorkerEvent,
sentry: Toucan
): Promise<void> {
sentry.setTag("scheduled-task", "PROCESS_QUEUE");
sentry.addBreadcrumb({ message: "Process started" });
let maxWorkerInvocations = KV_MAX_PROCESS_ENTRIES;
let queue = await getQueueData();
let queue = await getQueueData(event);

sentry.setExtra("queue", queue);

Expand All @@ -174,7 +187,7 @@ async function processQueue(sentry: Toucan): Promise<void> {
// Reset queue
queue = createQueueDefaults();

const kv_list = await listKV(KV_PREFIX_UUID);
const kv_list = await listKV(event, KV_PREFIX_UUID);
maxWorkerInvocations -= Math.floor(kv_list.length / 1000);

sentry.addBreadcrumb({
Expand Down Expand Up @@ -227,7 +240,7 @@ async function processQueue(sentry: Toucan): Promise<void> {

async function handleEntry(entryKey: string) {
let entryData;
entryData = await KV.get<IncomingPayload>(entryKey, "json");
entryData = await event.env.KV.get<IncomingPayload>(entryKey, "json");

if (entryData !== undefined && entryData !== null) {
queue.data = combineEntryData(
Expand Down Expand Up @@ -257,7 +270,7 @@ async function processQueue(sentry: Toucan): Promise<void> {
const timestampString = String(timestamp);

const queue_data = processQueueData(queue.data);
const storedAnalytics = await getAnalyticsData();
const storedAnalytics = await getAnalyticsData(event);

storedAnalytics.current = {
...queue_data,
Expand All @@ -266,38 +279,41 @@ async function processQueue(sentry: Toucan): Promise<void> {
};

sentry.addBreadcrumb({ message: "Trigger Netlify build" });
const resp = await fetch(NETLIFY_BUILD_HOOK, { method: "POST" });
const resp = await fetch(event.env.NETLIFY_BUILD_HOOK, { method: "POST" });
if (!resp.ok) {
throw new Error("Failed to call Netlify build hook");
}

sentry.addBreadcrumb({ message: "Store data" });
await Promise.all([
KV.put(
event.env.KV.put(
`${KV_PREFIX_HISTORY}:${timestampString}`,
JSON.stringify(queue_data)
),
KV.put(KV_KEY_CORE_ANALYTICS, JSON.stringify(storedAnalytics)),
KV.put(
event.env.KV.put(KV_KEY_CORE_ANALYTICS, JSON.stringify(storedAnalytics)),
event.env.KV.put(
KV_KEY_CUSTOM_INTEGRATIONS,
JSON.stringify(queue.data.custom_integrations)
),
KV.put(KV_KEY_ADDONS, JSON.stringify(queue.data.addons)),
event.env.KV.put(KV_KEY_ADDONS, JSON.stringify(queue.data.addons)),
]);

queue = createQueueDefaults();
queue.process_complete = true;
}
sentry.addBreadcrumb({ message: "Process complete" });
await KV.put(KV_KEY_QUEUE, JSON.stringify(queue));
await event.env.KV.put(KV_KEY_QUEUE, JSON.stringify(queue));
}

async function listKV(prefix: string): Promise<ListEntry[]> {
async function listKV(
event: ScheduledWorkerEvent,
prefix: string
): Promise<ListEntry[]> {
let entries: ListEntry[] = [];

let lastResponse;
while (lastResponse === undefined || !lastResponse.list_complete) {
lastResponse = await KV.list({
lastResponse = await event.env.KV.list({
prefix,
cursor: lastResponse !== undefined ? lastResponse.cursor : undefined,
});
Expand Down
60 changes: 35 additions & 25 deletions worker/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,50 @@
import { Toucan } from "toucan-js";
import { handlePostWrapper } from "./handlers/post";
import { handleSchedule } from "./handlers/schedule";
import { FetchWorkerEvent, ScheduledWorkerEvent } from "./data";

declare global {
const KV: KVNamespace;
const NETLIFY_BUILD_HOOK: string;
const SENTRY_DSN: string;
const WORKER_ENV: string;
}

const sentryClient = (event: FetchEvent | ScheduledEvent, handler: string) => {
const sentryClient = (
event: FetchWorkerEvent | ScheduledWorkerEvent,
handler: string
) => {
const client = new Toucan({
dsn: SENTRY_DSN,
dsn: event.env.SENTRY_DSN,
requestDataOptions: {
allowedHeaders: ["user-agent", "cf-ray"],
},
// request does not exist on ScheduledEvent
request: "request" in event ? event.request : undefined,
context: event,
environment: WORKER_ENV,
context: event.ctx,
environment: event.env.WORKER_ENV,
initialScope: {
tags: {
handler,
},
},
});
client.setTag("handler", handler);

return client;
};

addEventListener("fetch", (event: FetchEvent) => {
if (event.request.method === "POST") {
event.respondWith(
handlePostWrapper(event.request, sentryClient(event, "post"))
);
} else {
event.respondWith(new Response(null, { status: 405 }));
}
});

addEventListener("scheduled", (event) => {
event.waitUntil(handleSchedule(event, sentryClient(event, "schedule")));
});
export default {
fetch: async (
request: FetchWorkerEvent["request"],
env: FetchWorkerEvent["env"],
ctx: FetchWorkerEvent["ctx"]
) => {
const event: FetchWorkerEvent = { request, env, ctx };
if (request.method === "POST") {
return await handlePostWrapper(event, sentryClient(event, "post"));
} else {
return new Response(null, { status: 405 });
}
},
scheduled: async (
controller: ScheduledWorkerEvent["controller"],
env: ScheduledWorkerEvent["env"],
ctx: ScheduledWorkerEvent["ctx"]
) => {
const event: ScheduledWorkerEvent = { controller, env, ctx };
await handleSchedule(event, sentryClient(event, "schedule"));
},
} as ExportedHandler;
Loading

0 comments on commit e69241e

Please sign in to comment.