From cfb96859b3057665457e90eca4ddd2d76cc8a6e8 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 24 Oct 2024 10:41:49 +0100 Subject: [PATCH] fix: removing schedule instance no longer crashes (#1430) --- .../route.tsx | 7 +- .../v3/services/upsertTaskSchedule.server.ts | 290 +++++++++--------- internal-packages/database/README.md | 2 +- .../migration.sql | 2 + .../database/prisma/schema.prisma | 1 + references/v3-catalog/src/management.ts | 4 +- 6 files changed, 156 insertions(+), 150 deletions(-) create mode 100644 internal-packages/database/prisma/migrations/20241023154826_add_schedule_instance_id_index_to_task_run/migration.sql diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.schedules.new/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.schedules.new/route.tsx index 402912320b..cfea229d40 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.schedules.new/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.schedules.new/route.tsx @@ -44,6 +44,7 @@ import { CronPattern, UpsertSchedule } from "~/v3/schedules"; import { UpsertTaskScheduleService } from "~/v3/services/upsertTaskSchedule.server"; import { AIGeneratedCronField } from "../resources.orgs.$organizationSlug.projects.$projectParam.schedules.new.natural-language"; import { TimezoneList } from "~/components/scheduled/timezones"; +import { logger } from "~/services/logger.server"; const cronFormat = `* * * * * ┬ ┬ ┬ ┬ ┬ @@ -94,9 +95,9 @@ export const action = async ({ request, params }: ActionFunctionArgs) => { submission.value?.friendlyId === result.id ? "Schedule updated" : "Schedule created" ); } catch (error: any) { - const errorMessage = `Failed: ${ - error instanceof Error ? error.message : JSON.stringify(error) - }`; + logger.error("Failed to create schedule", error); + + const errorMessage = `Something went wrong. Please try again.`; return redirectWithErrorMessage( v3SchedulesPath({ slug: organizationSlug }, { slug: projectParam }), request, diff --git a/apps/webapp/app/v3/services/upsertTaskSchedule.server.ts b/apps/webapp/app/v3/services/upsertTaskSchedule.server.ts index 9eb73a2f19..2d7585e23a 100644 --- a/apps/webapp/app/v3/services/upsertTaskSchedule.server.ts +++ b/apps/webapp/app/v3/services/upsertTaskSchedule.server.ts @@ -1,7 +1,7 @@ import { Prisma, TaskSchedule } from "@trigger.dev/database"; import cronstrue from "cronstrue"; import { nanoid } from "nanoid"; -import { $transaction, PrismaClientOrTransaction } from "~/db.server"; +import { $transaction } from "~/db.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { UpsertSchedule } from "../schedules"; import { calculateNextScheduledTimestamp } from "../utils/calculateNextSchedule.server"; @@ -31,124 +31,45 @@ export class UpsertTaskScheduleService extends BaseService { const checkSchedule = new CheckScheduleService(this._prisma); await checkSchedule.call(projectId, schedule); - const result = await $transaction(this._prisma, async (tx) => { - const deduplicationKey = - typeof schedule.deduplicationKey === "string" && schedule.deduplicationKey !== "" - ? schedule.deduplicationKey - : nanoid(24); + const deduplicationKey = + typeof schedule.deduplicationKey === "string" && schedule.deduplicationKey !== "" + ? schedule.deduplicationKey + : nanoid(24); - const existingSchedule = schedule.friendlyId - ? await tx.taskSchedule.findUnique({ - where: { - friendlyId: schedule.friendlyId, - }, - }) - : await tx.taskSchedule.findUnique({ - where: { - projectId_deduplicationKey: { - projectId, - deduplicationKey, - }, + const existingSchedule = schedule.friendlyId + ? await this._prisma.taskSchedule.findUnique({ + where: { + friendlyId: schedule.friendlyId, + }, + }) + : await this._prisma.taskSchedule.findUnique({ + where: { + projectId_deduplicationKey: { + projectId, + deduplicationKey, }, - }); + }, + }); + const result = await (async (tx) => { if (existingSchedule) { if (existingSchedule.type === "DECLARATIVE") { throw new ServiceValidationError("Cannot update a declarative schedule"); } - return await this.#updateExistingSchedule(tx, existingSchedule, schedule, projectId); + return await this.#updateExistingSchedule(existingSchedule, schedule); } else { - return await this.#createNewSchedule(tx, schedule, projectId, deduplicationKey); + return await this.#createNewSchedule(schedule, projectId, deduplicationKey); } - }); + })(); if (!result) { - throw new Error("Failed to create or update the schedule"); - } - - const { scheduleRecord, instances } = result; - - return this.#createReturnObject(scheduleRecord, instances); - } - - async #createNewSchedule( - tx: PrismaClientOrTransaction, - options: UpsertTaskScheduleServiceOptions, - projectId: string, - deduplicationKey: string - ) { - const scheduleRecord = await tx.taskSchedule.create({ - data: { - projectId, - friendlyId: generateFriendlyId("sched"), - taskIdentifier: options.taskIdentifier, - deduplicationKey, - userProvidedDeduplicationKey: - options.deduplicationKey !== undefined && options.deduplicationKey !== "", - generatorExpression: options.cron, - generatorDescription: cronstrue.toString(options.cron), - timezone: options.timezone ?? "UTC", - externalId: options.externalId ? options.externalId : undefined, - }, - }); - - const registerNextService = new RegisterNextTaskScheduleInstanceService(tx); - - //create the instances (links to environments) - let instances: InstanceWithEnvironment[] = []; - for (const environmentId of options.environments) { - const instance = await tx.taskScheduleInstance.create({ - data: { - taskScheduleId: scheduleRecord.id, - environmentId, - }, - include: { - environment: { - include: { - orgMember: { - include: { - user: true, - }, - }, - }, - }, - }, - }); - - await registerNextService.call(instance.id); - - instances.push(instance); + throw new ServiceValidationError("Failed to create or update schedule"); } - return { scheduleRecord, instances }; - } + const { scheduleRecord } = result; - async #updateExistingSchedule( - tx: PrismaClientOrTransaction, - existingSchedule: TaskSchedule, - options: UpsertTaskScheduleServiceOptions, - projectId: string - ) { - //update the schedule - const scheduleRecord = await tx.taskSchedule.update({ - where: { - id: existingSchedule.id, - }, - data: { - generatorExpression: options.cron, - generatorDescription: cronstrue.toString(options.cron), - timezone: options.timezone ?? "UTC", - externalId: options.externalId ? options.externalId : null, - }, - }); - - const scheduleHasChanged = - scheduleRecord.generatorExpression !== existingSchedule.generatorExpression || - scheduleRecord.timezone !== existingSchedule.timezone; - - // find the existing instances - const existingInstances = await tx.taskScheduleInstance.findMany({ + const instances = await this._prisma.taskScheduleInstance.findMany({ where: { taskScheduleId: scheduleRecord.id, }, @@ -165,18 +86,35 @@ export class UpsertTaskScheduleService extends BaseService { }, }); - // create the new instances - const newInstances: InstanceWithEnvironment[] = []; - const updatingInstances: InstanceWithEnvironment[] = []; + return this.#createReturnObject(scheduleRecord, instances); + } + + async #createNewSchedule( + options: UpsertTaskScheduleServiceOptions, + projectId: string, + deduplicationKey: string + ) { + return await $transaction(this._prisma, async (tx) => { + const scheduleRecord = await tx.taskSchedule.create({ + data: { + projectId, + friendlyId: generateFriendlyId("sched"), + taskIdentifier: options.taskIdentifier, + deduplicationKey, + userProvidedDeduplicationKey: + options.deduplicationKey !== undefined && options.deduplicationKey !== "", + generatorExpression: options.cron, + generatorDescription: cronstrue.toString(options.cron), + timezone: options.timezone ?? "UTC", + externalId: options.externalId ? options.externalId : undefined, + }, + }); + + const registerNextService = new RegisterNextTaskScheduleInstanceService(tx); - for (const environmentId of options.environments) { - const existingInstance = existingInstances.find((i) => i.environmentId === environmentId); + //create the instances (links to environments) - if (existingInstance) { - // Update the existing instance - updatingInstances.push(existingInstance); - } else { - // Create a new instance + for (const environmentId of options.environments) { const instance = await tx.taskScheduleInstance.create({ data: { taskScheduleId: scheduleRecord.id, @@ -195,39 +133,21 @@ export class UpsertTaskScheduleService extends BaseService { }, }); - newInstances.push(instance); + await registerNextService.call(instance.id); } - } - - // find the instances that need to be removed - const instancesToDeleted = existingInstances.filter( - (i) => !options.environments.includes(i.environmentId) - ); - // delete the instances no longer selected - for (const instance of instancesToDeleted) { - await tx.taskScheduleInstance.delete({ - where: { - id: instance.id, - }, - }); - } - - const registerService = new RegisterNextTaskScheduleInstanceService(tx); - - for (const instance of newInstances) { - await registerService.call(instance.id); - } - - if (scheduleHasChanged) { - for (const instance of updatingInstances) { - await registerService.call(instance.id); - } - } + return { scheduleRecord }; + }); + } - const instances = await tx.taskScheduleInstance.findMany({ + async #updateExistingSchedule( + existingSchedule: TaskSchedule, + options: UpsertTaskScheduleServiceOptions + ) { + // find the existing instances + const existingInstances = await this._prisma.taskScheduleInstance.findMany({ where: { - taskScheduleId: scheduleRecord.id, + taskScheduleId: existingSchedule.id, }, include: { environment: { @@ -242,7 +162,89 @@ export class UpsertTaskScheduleService extends BaseService { }, }); - return { scheduleRecord, instances }; + return await $transaction( + this._prisma, + async (tx) => { + const scheduleRecord = await tx.taskSchedule.update({ + where: { + id: existingSchedule.id, + }, + data: { + generatorExpression: options.cron, + generatorDescription: cronstrue.toString(options.cron), + timezone: options.timezone ?? "UTC", + externalId: options.externalId ? options.externalId : null, + }, + }); + + const scheduleHasChanged = + scheduleRecord.generatorExpression !== existingSchedule.generatorExpression || + scheduleRecord.timezone !== existingSchedule.timezone; + + // create the new instances + const newInstances: InstanceWithEnvironment[] = []; + const updatingInstances: InstanceWithEnvironment[] = []; + + for (const environmentId of options.environments) { + const existingInstance = existingInstances.find((i) => i.environmentId === environmentId); + + if (existingInstance) { + // Update the existing instance + updatingInstances.push(existingInstance); + } else { + // Create a new instance + const instance = await tx.taskScheduleInstance.create({ + data: { + taskScheduleId: scheduleRecord.id, + environmentId, + }, + include: { + environment: { + include: { + orgMember: { + include: { + user: true, + }, + }, + }, + }, + }, + }); + + newInstances.push(instance); + } + } + + // find the instances that need to be removed + const instancesToDeleted = existingInstances.filter( + (i) => !options.environments.includes(i.environmentId) + ); + + // delete the instances no longer selected + for (const instance of instancesToDeleted) { + await tx.taskScheduleInstance.delete({ + where: { + id: instance.id, + }, + }); + } + + const registerService = new RegisterNextTaskScheduleInstanceService(tx); + + for (const instance of newInstances) { + await registerService.call(instance.id); + } + + if (scheduleHasChanged) { + for (const instance of updatingInstances) { + await registerService.call(instance.id); + } + } + + return { scheduleRecord }; + }, + { timeout: 10_000 } + ); } #createReturnObject(taskSchedule: TaskSchedule, instances: InstanceWithEnvironment[]) { diff --git a/internal-packages/database/README.md b/internal-packages/database/README.md index 8f15e7ebb1..41612ac2bd 100644 --- a/internal-packages/database/README.md +++ b/internal-packages/database/README.md @@ -5,7 +5,7 @@ This is the internal database package for the Trigger.dev project. It exports a ### How to add a new index on a large table 1. Modify the Prisma.schema with a single index change (no other changes, just one index at a time) -2. Create a Prisma migration using `cd packages/database && pnpm run db:migrate:dev --create-only` +2. Create a Prisma migration using `cd internal-packages/database && pnpm run db:migrate:dev --create-only` 3. Modify the SQL file: add IF NOT EXISTS to it and CONCURRENTLY: ```sql diff --git a/internal-packages/database/prisma/migrations/20241023154826_add_schedule_instance_id_index_to_task_run/migration.sql b/internal-packages/database/prisma/migrations/20241023154826_add_schedule_instance_id_index_to_task_run/migration.sql new file mode 100644 index 0000000000..0843732786 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20241023154826_add_schedule_instance_id_index_to_task_run/migration.sql @@ -0,0 +1,2 @@ +-- CreateIndex +CREATE INDEX CONCURRENTLY IF NOT EXISTS "TaskRun_scheduleInstanceId_idx" ON "TaskRun"("scheduleInstanceId"); \ No newline at end of file diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 9db032f752..92f255ebc5 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1793,6 +1793,7 @@ model TaskRun { @@index([projectId, taskIdentifier, status]) //Schedules @@index([scheduleId]) + @@index([scheduleInstanceId]) // Run page inspector @@index([spanId]) @@index([parentSpanId]) diff --git a/references/v3-catalog/src/management.ts b/references/v3-catalog/src/management.ts index 661f00acab..cb12c947e9 100644 --- a/references/v3-catalog/src/management.ts +++ b/references/v3-catalog/src/management.ts @@ -256,8 +256,8 @@ async function doTriggerUnfriendlyTaskId() { } // doRuns().catch(console.error); -doListRuns().catch(console.error); +// doListRuns().catch(console.error); // doScheduleLists().catch(console.error); -// doSchedules().catch(console.error); +doSchedules().catch(console.error); // doEnvVars().catch(console.error); // doTriggerUnfriendlyTaskId().catch(console.error);