Skip to content

Commit

Permalink
fix: removing schedule instance no longer crashes (#1430)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericallam authored Oct 24, 2024
1 parent 9bc641d commit cfb9685
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { CronPattern, UpsertSchedule } from "~/v3/schedules";
import { UpsertTaskScheduleService } from "~/v3/services/upsertTaskSchedule.server";
import { AIGeneratedCronField } from "../resources.orgs.$organizationSlug.projects.$projectParam.schedules.new.natural-language";
import { TimezoneList } from "~/components/scheduled/timezones";
import { logger } from "~/services/logger.server";

const cronFormat = `* * * * *
┬ ┬ ┬ ┬ ┬
Expand Down Expand Up @@ -94,9 +95,9 @@ export const action = async ({ request, params }: ActionFunctionArgs) => {
submission.value?.friendlyId === result.id ? "Schedule updated" : "Schedule created"
);
} catch (error: any) {
const errorMessage = `Failed: ${
error instanceof Error ? error.message : JSON.stringify(error)
}`;
logger.error("Failed to create schedule", error);

const errorMessage = `Something went wrong. Please try again.`;
return redirectWithErrorMessage(
v3SchedulesPath({ slug: organizationSlug }, { slug: projectParam }),
request,
Expand Down
290 changes: 146 additions & 144 deletions apps/webapp/app/v3/services/upsertTaskSchedule.server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Prisma, TaskSchedule } from "@trigger.dev/database";
import cronstrue from "cronstrue";
import { nanoid } from "nanoid";
import { $transaction, PrismaClientOrTransaction } from "~/db.server";
import { $transaction } from "~/db.server";
import { generateFriendlyId } from "../friendlyIdentifiers";
import { UpsertSchedule } from "../schedules";
import { calculateNextScheduledTimestamp } from "../utils/calculateNextSchedule.server";
Expand Down Expand Up @@ -31,124 +31,45 @@ export class UpsertTaskScheduleService extends BaseService {
const checkSchedule = new CheckScheduleService(this._prisma);
await checkSchedule.call(projectId, schedule);

const result = await $transaction(this._prisma, async (tx) => {
const deduplicationKey =
typeof schedule.deduplicationKey === "string" && schedule.deduplicationKey !== ""
? schedule.deduplicationKey
: nanoid(24);
const deduplicationKey =
typeof schedule.deduplicationKey === "string" && schedule.deduplicationKey !== ""
? schedule.deduplicationKey
: nanoid(24);

const existingSchedule = schedule.friendlyId
? await tx.taskSchedule.findUnique({
where: {
friendlyId: schedule.friendlyId,
},
})
: await tx.taskSchedule.findUnique({
where: {
projectId_deduplicationKey: {
projectId,
deduplicationKey,
},
const existingSchedule = schedule.friendlyId
? await this._prisma.taskSchedule.findUnique({
where: {
friendlyId: schedule.friendlyId,
},
})
: await this._prisma.taskSchedule.findUnique({
where: {
projectId_deduplicationKey: {
projectId,
deduplicationKey,
},
});
},
});

const result = await (async (tx) => {
if (existingSchedule) {
if (existingSchedule.type === "DECLARATIVE") {
throw new ServiceValidationError("Cannot update a declarative schedule");
}

return await this.#updateExistingSchedule(tx, existingSchedule, schedule, projectId);
return await this.#updateExistingSchedule(existingSchedule, schedule);
} else {
return await this.#createNewSchedule(tx, schedule, projectId, deduplicationKey);
return await this.#createNewSchedule(schedule, projectId, deduplicationKey);
}
});
})();

if (!result) {
throw new Error("Failed to create or update the schedule");
}

const { scheduleRecord, instances } = result;

return this.#createReturnObject(scheduleRecord, instances);
}

async #createNewSchedule(
tx: PrismaClientOrTransaction,
options: UpsertTaskScheduleServiceOptions,
projectId: string,
deduplicationKey: string
) {
const scheduleRecord = await tx.taskSchedule.create({
data: {
projectId,
friendlyId: generateFriendlyId("sched"),
taskIdentifier: options.taskIdentifier,
deduplicationKey,
userProvidedDeduplicationKey:
options.deduplicationKey !== undefined && options.deduplicationKey !== "",
generatorExpression: options.cron,
generatorDescription: cronstrue.toString(options.cron),
timezone: options.timezone ?? "UTC",
externalId: options.externalId ? options.externalId : undefined,
},
});

const registerNextService = new RegisterNextTaskScheduleInstanceService(tx);

//create the instances (links to environments)
let instances: InstanceWithEnvironment[] = [];
for (const environmentId of options.environments) {
const instance = await tx.taskScheduleInstance.create({
data: {
taskScheduleId: scheduleRecord.id,
environmentId,
},
include: {
environment: {
include: {
orgMember: {
include: {
user: true,
},
},
},
},
},
});

await registerNextService.call(instance.id);

instances.push(instance);
throw new ServiceValidationError("Failed to create or update schedule");
}

return { scheduleRecord, instances };
}
const { scheduleRecord } = result;

async #updateExistingSchedule(
tx: PrismaClientOrTransaction,
existingSchedule: TaskSchedule,
options: UpsertTaskScheduleServiceOptions,
projectId: string
) {
//update the schedule
const scheduleRecord = await tx.taskSchedule.update({
where: {
id: existingSchedule.id,
},
data: {
generatorExpression: options.cron,
generatorDescription: cronstrue.toString(options.cron),
timezone: options.timezone ?? "UTC",
externalId: options.externalId ? options.externalId : null,
},
});

const scheduleHasChanged =
scheduleRecord.generatorExpression !== existingSchedule.generatorExpression ||
scheduleRecord.timezone !== existingSchedule.timezone;

// find the existing instances
const existingInstances = await tx.taskScheduleInstance.findMany({
const instances = await this._prisma.taskScheduleInstance.findMany({
where: {
taskScheduleId: scheduleRecord.id,
},
Expand All @@ -165,18 +86,35 @@ export class UpsertTaskScheduleService extends BaseService {
},
});

// create the new instances
const newInstances: InstanceWithEnvironment[] = [];
const updatingInstances: InstanceWithEnvironment[] = [];
return this.#createReturnObject(scheduleRecord, instances);
}

async #createNewSchedule(
options: UpsertTaskScheduleServiceOptions,
projectId: string,
deduplicationKey: string
) {
return await $transaction(this._prisma, async (tx) => {
const scheduleRecord = await tx.taskSchedule.create({
data: {
projectId,
friendlyId: generateFriendlyId("sched"),
taskIdentifier: options.taskIdentifier,
deduplicationKey,
userProvidedDeduplicationKey:
options.deduplicationKey !== undefined && options.deduplicationKey !== "",
generatorExpression: options.cron,
generatorDescription: cronstrue.toString(options.cron),
timezone: options.timezone ?? "UTC",
externalId: options.externalId ? options.externalId : undefined,
},
});

const registerNextService = new RegisterNextTaskScheduleInstanceService(tx);

for (const environmentId of options.environments) {
const existingInstance = existingInstances.find((i) => i.environmentId === environmentId);
//create the instances (links to environments)

if (existingInstance) {
// Update the existing instance
updatingInstances.push(existingInstance);
} else {
// Create a new instance
for (const environmentId of options.environments) {
const instance = await tx.taskScheduleInstance.create({
data: {
taskScheduleId: scheduleRecord.id,
Expand All @@ -195,39 +133,21 @@ export class UpsertTaskScheduleService extends BaseService {
},
});

newInstances.push(instance);
await registerNextService.call(instance.id);
}
}

// find the instances that need to be removed
const instancesToDeleted = existingInstances.filter(
(i) => !options.environments.includes(i.environmentId)
);

// delete the instances no longer selected
for (const instance of instancesToDeleted) {
await tx.taskScheduleInstance.delete({
where: {
id: instance.id,
},
});
}

const registerService = new RegisterNextTaskScheduleInstanceService(tx);

for (const instance of newInstances) {
await registerService.call(instance.id);
}

if (scheduleHasChanged) {
for (const instance of updatingInstances) {
await registerService.call(instance.id);
}
}
return { scheduleRecord };
});
}

const instances = await tx.taskScheduleInstance.findMany({
async #updateExistingSchedule(
existingSchedule: TaskSchedule,
options: UpsertTaskScheduleServiceOptions
) {
// find the existing instances
const existingInstances = await this._prisma.taskScheduleInstance.findMany({
where: {
taskScheduleId: scheduleRecord.id,
taskScheduleId: existingSchedule.id,
},
include: {
environment: {
Expand All @@ -242,7 +162,89 @@ export class UpsertTaskScheduleService extends BaseService {
},
});

return { scheduleRecord, instances };
return await $transaction(
this._prisma,
async (tx) => {
const scheduleRecord = await tx.taskSchedule.update({
where: {
id: existingSchedule.id,
},
data: {
generatorExpression: options.cron,
generatorDescription: cronstrue.toString(options.cron),
timezone: options.timezone ?? "UTC",
externalId: options.externalId ? options.externalId : null,
},
});

const scheduleHasChanged =
scheduleRecord.generatorExpression !== existingSchedule.generatorExpression ||
scheduleRecord.timezone !== existingSchedule.timezone;

// create the new instances
const newInstances: InstanceWithEnvironment[] = [];
const updatingInstances: InstanceWithEnvironment[] = [];

for (const environmentId of options.environments) {
const existingInstance = existingInstances.find((i) => i.environmentId === environmentId);

if (existingInstance) {
// Update the existing instance
updatingInstances.push(existingInstance);
} else {
// Create a new instance
const instance = await tx.taskScheduleInstance.create({
data: {
taskScheduleId: scheduleRecord.id,
environmentId,
},
include: {
environment: {
include: {
orgMember: {
include: {
user: true,
},
},
},
},
},
});

newInstances.push(instance);
}
}

// find the instances that need to be removed
const instancesToDeleted = existingInstances.filter(
(i) => !options.environments.includes(i.environmentId)
);

// delete the instances no longer selected
for (const instance of instancesToDeleted) {
await tx.taskScheduleInstance.delete({
where: {
id: instance.id,
},
});
}

const registerService = new RegisterNextTaskScheduleInstanceService(tx);

for (const instance of newInstances) {
await registerService.call(instance.id);
}

if (scheduleHasChanged) {
for (const instance of updatingInstances) {
await registerService.call(instance.id);
}
}

return { scheduleRecord };
},
{ timeout: 10_000 }
);
}

#createReturnObject(taskSchedule: TaskSchedule, instances: InstanceWithEnvironment[]) {
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/database/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This is the internal database package for the Trigger.dev project. It exports a
### How to add a new index on a large table

1. Modify the Prisma.schema with a single index change (no other changes, just one index at a time)
2. Create a Prisma migration using `cd packages/database && pnpm run db:migrate:dev --create-only`
2. Create a Prisma migration using `cd internal-packages/database && pnpm run db:migrate:dev --create-only`
3. Modify the SQL file: add IF NOT EXISTS to it and CONCURRENTLY:

```sql
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- CreateIndex
CREATE INDEX CONCURRENTLY IF NOT EXISTS "TaskRun_scheduleInstanceId_idx" ON "TaskRun"("scheduleInstanceId");
1 change: 1 addition & 0 deletions internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,7 @@ model TaskRun {
@@index([projectId, taskIdentifier, status])
//Schedules
@@index([scheduleId])
@@index([scheduleInstanceId])
// Run page inspector
@@index([spanId])
@@index([parentSpanId])
Expand Down
4 changes: 2 additions & 2 deletions references/v3-catalog/src/management.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ async function doTriggerUnfriendlyTaskId() {
}

// doRuns().catch(console.error);
doListRuns().catch(console.error);
// doListRuns().catch(console.error);
// doScheduleLists().catch(console.error);
// doSchedules().catch(console.error);
doSchedules().catch(console.error);
// doEnvVars().catch(console.error);
// doTriggerUnfriendlyTaskId().catch(console.error);

0 comments on commit cfb9685

Please sign in to comment.