diff --git a/apps/pty-proxy/src/controller/agent-socket.ts b/apps/pty-proxy/src/controller/agent-socket.ts index 2b9c2734..87986f5c 100644 --- a/apps/pty-proxy/src/controller/agent-socket.ts +++ b/apps/pty-proxy/src/controller/agent-socket.ts @@ -125,7 +125,7 @@ export class AgentSocket { "name" | "version" | "kind" | "identifier" | "workspaceId" >, ) { - const [res] = await upsertResources(db, [ + const { updated } = await upsertResources(db, [ { ...resource, name: this.name, @@ -136,6 +136,7 @@ export class AgentSocket { updatedAt: new Date(), }, ]); + const res = updated.at(0); if (res == null) throw new Error("Failed to create resource"); this.resource = res; agents.set(res.id, { lastSync: new Date(), agent: this }); diff --git a/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts b/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts index 1c7a3eca..5768603c 100644 --- a/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts +++ b/apps/webservice/src/app/api/v1/resources/[resourceId]/route.ts @@ -93,9 +93,10 @@ export const PATCH = request() { status: 404 }, ); - const t = await upsertResources(db, [_.merge(resource, body)]); - - return NextResponse.json(t[0]); + const { updated } = await upsertResources(db, [_.merge(resource, body)]); + const res = updated.at(0); + if (res == null) throw new Error("Failed to update resource"); + return NextResponse.json(res); }); export const DELETE = request() diff --git a/apps/webservice/src/app/api/v1/resources/route.ts b/apps/webservice/src/app/api/v1/resources/route.ts index d0b2ba0c..7faab0bc 100644 --- a/apps/webservice/src/app/api/v1/resources/route.ts +++ b/apps/webservice/src/app/api/v1/resources/route.ts @@ -63,6 +63,6 @@ export const POST = request() })), ); - return NextResponse.json({ count: resources.length }); + return NextResponse.json({ count: resources.all.length }); }, ); diff --git a/packages/api/src/router/environment.ts b/packages/api/src/router/environment.ts index b7d95f7b..8dffe790 100644 --- a/packages/api/src/router/environment.ts +++ b/packages/api/src/router/environment.ts @@ -28,7 +28,7 @@ import { updateEnvironment, } from "@ctrlplane/db/schema"; import { - dispatchJobsForNewResources, + dispatchJobsForAddedResources, getEventsForEnvironmentDeleted, handleEvent, } from "@ctrlplane/job-dispatch"; @@ -324,7 +324,7 @@ export const environmentRouter = createTRPCRouter({ } if (newResources.length > 0) { - await dispatchJobsForNewResources( + await dispatchJobsForAddedResources( ctx.db, newResources.map((r) => r.id), input.id, diff --git a/packages/job-dispatch/src/events/triggers/resource-deleted.ts b/packages/job-dispatch/src/events/triggers/resource-deleted.ts index e1301c12..d592cbac 100644 --- a/packages/job-dispatch/src/events/triggers/resource-deleted.ts +++ b/packages/job-dispatch/src/events/triggers/resource-deleted.ts @@ -10,8 +10,6 @@ import { ResourceFilterType } from "@ctrlplane/validators/resources"; /** * Get events for a resource that has been deleted. - * NOTE: Because we may need to do inner joins on resource metadata for the filter, - * this actually needs to be called before the resource is actually deleted. * @param resource */ export const getEventsForResourceDeleted = async ( diff --git a/packages/job-dispatch/src/index.ts b/packages/job-dispatch/src/index.ts index f6cfd1e2..378a60f2 100644 --- a/packages/job-dispatch/src/index.ts +++ b/packages/job-dispatch/src/index.ts @@ -6,8 +6,6 @@ export * from "./policy-checker.js"; export * from "./policy-create.js"; export * from "./release-sequencing.js"; export * from "./gradual-rollout.js"; -export * from "./new-resource.js"; -export * from "./resource.js"; export * from "./lock-checker.js"; export * from "./queue.js"; @@ -24,3 +22,4 @@ export * from "./policies/release-window.js"; export * from "./environment-creation.js"; export * from "./pending-job-checker.js"; export * from "./events/index.js"; +export * from "./resource/index.js"; diff --git a/packages/job-dispatch/src/new-resource.ts b/packages/job-dispatch/src/new-resource.ts deleted file mode 100644 index 3487fac7..00000000 --- a/packages/job-dispatch/src/new-resource.ts +++ /dev/null @@ -1,80 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; -import { isPresent } from "ts-is-present"; - -import { and, desc, eq, takeFirstOrNull } from "@ctrlplane/db"; -import * as SCHEMA from "@ctrlplane/db/schema"; - -import { dispatchReleaseJobTriggers } from "./job-dispatch.js"; -import { isPassingAllPolicies } from "./policy-checker.js"; -import { createJobApprovals } from "./policy-create.js"; -import { createReleaseJobTriggers } from "./release-job-trigger.js"; - -/** - * Dispatches jobs for new targets added to an environment. - */ -export async function dispatchJobsForNewResources( - db: Tx, - newResourceIds: string[], - envId: string, -): Promise { - const releaseChannels = await db.query.environment.findFirst({ - where: eq(SCHEMA.environment.id, envId), - with: { - releaseChannels: { with: { releaseChannel: true } }, - policy: { - with: { - environmentPolicyReleaseChannels: { with: { releaseChannel: true } }, - }, - }, - system: { with: { deployments: true } }, - }, - }); - if (releaseChannels == null) return; - - const envReleaseChannels = releaseChannels.releaseChannels; - const policyReleaseChannels = - releaseChannels.policy?.environmentPolicyReleaseChannels ?? []; - const { deployments } = releaseChannels.system; - - const releasePromises = deployments.map(async (deployment) => { - const envReleaseChannel = envReleaseChannels.find( - (erc) => erc.deploymentId === deployment.id, - ); - const policyReleaseChannel = policyReleaseChannels.find( - (prc) => prc.deploymentId === deployment.id, - ); - const { releaseFilter } = - envReleaseChannel?.releaseChannel ?? - policyReleaseChannel?.releaseChannel ?? - {}; - return db - .select() - .from(SCHEMA.release) - .where( - and( - eq(SCHEMA.release.deploymentId, deployment.id), - SCHEMA.releaseMatchesCondition(db, releaseFilter ?? undefined), - ), - ) - .orderBy(desc(SCHEMA.release.createdAt)) - .limit(1) - .then(takeFirstOrNull); - }); - const releases = await Promise.all(releasePromises).then((rows) => - rows.filter(isPresent), - ); - if (releases.length === 0) return; - - const releaseJobTriggers = await createReleaseJobTriggers(db, "new_resource") - .resources(newResourceIds) - .environments([envId]) - .releases(releases.map((r) => r.id)) - .then(createJobApprovals) - .insert(); - if (releaseJobTriggers.length === 0) return; - - await dispatchReleaseJobTriggers(db) - .filter(isPassingAllPolicies) - .releaseTriggers(releaseJobTriggers) - .dispatch(); -} diff --git a/packages/job-dispatch/src/resource.ts b/packages/job-dispatch/src/resource.ts deleted file mode 100644 index caf568a9..00000000 --- a/packages/job-dispatch/src/resource.ts +++ /dev/null @@ -1,549 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; -import type { InsertResource, Resource } from "@ctrlplane/db/schema"; -import _ from "lodash"; - -import { - and, - buildConflictUpdateColumns, - eq, - inArray, - isNotNull, - isNull, - or, -} from "@ctrlplane/db"; -import { db } from "@ctrlplane/db/client"; -import { - deploymentResourceRelationship, - environment, - resource, - resourceMatchesMetadata, - resourceMetadata, - resourceRelationship, - resourceVariable, - system, -} from "@ctrlplane/db/schema"; -import { logger } from "@ctrlplane/logger"; -import { variablesAES256 } from "@ctrlplane/secrets"; - -import { getEventsForResourceDeleted, handleEvent } from "./events/index.js"; -import { dispatchJobsForNewResources } from "./new-resource.js"; - -const log = logger.child({ label: "upsert-resources" }); - -const isNotDeleted = isNull(resource.deletedAt); - -const getExistingResourcesForProvider = (db: Tx, providerId: string) => - db - .select() - .from(resource) - .where(and(eq(resource.providerId, providerId), isNotDeleted)); - -const getDeletedResourcesForProvider = (db: Tx, providerId: string) => - db - .select() - .from(resource) - .where( - and(eq(resource.providerId, providerId), isNotNull(resource.deletedAt)), - ); - -const dispatchChangedResources = async ( - db: Tx, - workspaceId: string, - resourceIds: string[], -) => { - const workspaceEnvs = await db - .select({ id: environment.id, resourceFilter: environment.resourceFilter }) - .from(environment) - .innerJoin(system, eq(system.id, environment.systemId)) - .where( - and( - eq(system.workspaceId, workspaceId), - isNotNull(environment.resourceFilter), - ), - ); - - for (const env of workspaceEnvs) { - db.select() - .from(resource) - .where( - and( - inArray(resource.id, resourceIds), - resourceMatchesMetadata(db, env.resourceFilter), - isNotDeleted, - ), - ) - .then((tgs) => { - if (tgs.length === 0) return; - dispatchJobsForNewResources( - db, - tgs.map((t) => t.id), - env.id, - ); - }); - } -}; - -type ResourceWithVariables = Resource & { - variables?: Array<{ key: string; value: any; sensitive: boolean }>; -}; - -/** - * Upserts resource variables for a list of resources. Updates existing - * variables, adds new ones, and removes deleted ones. Encrypts sensitive - * variable values before storing. - * - * @param tx - Database transaction - * @param resources - Array of resources with their variables - * @returns Array of resources that had their variables changed - */ -const upsertResourceVariables = async ( - tx: Tx, - resources: Array, -) => { - const resourceIds = resources.map((r) => r.id); - const existingResourceVariables = await tx - .select() - .from(resourceVariable) - .where(inArray(resourceVariable.resourceId, resourceIds)); - - const resourceVariablesValues = resources.flatMap(({ id, variables = [] }) => - variables.map(({ key, value, sensitive }) => ({ - resourceId: id, - key, - value: sensitive - ? variablesAES256().encrypt(JSON.stringify(value)) - : value, - sensitive, - })), - ); - - // Track resources with added variables - const resourcesWithAddedVars = new Set( - resourceVariablesValues - .filter( - (newVar) => - !existingResourceVariables.some( - (existing) => - existing.resourceId === newVar.resourceId && - existing.key === newVar.key, - ), - ) - .map((newVar) => newVar.resourceId), - ); - - // Track resources with modified variables - const resourcesWithModifiedVars = new Set( - resourceVariablesValues - .filter((newVar) => { - const existingVar = existingResourceVariables.find( - (existing) => - existing.resourceId === newVar.resourceId && - existing.key === newVar.key, - ); - return existingVar && existingVar.value !== newVar.value; - }) - .map((newVar) => newVar.resourceId), - ); - - // Track resources with deleted variables - const resourcesWithDeletedVars = new Set( - existingResourceVariables - .filter( - (existingVar) => - !resourceVariablesValues.some( - (newVar) => - newVar.resourceId === existingVar.resourceId && - newVar.key === existingVar.key, - ), - ) - .map((v) => v.resourceId), - ); - - const changedResources = new Set([ - ...resourcesWithAddedVars, - ...resourcesWithModifiedVars, - ...resourcesWithDeletedVars, - ]); - - if (resourceVariablesValues.length > 0) - await tx - .insert(resourceVariable) - .values(resourceVariablesValues) - .onConflictDoUpdate({ - target: [resourceVariable.key, resourceVariable.resourceId], - set: buildConflictUpdateColumns(resourceVariable, [ - "value", - "sensitive", - ]), - }); - - const variablesToDelete = existingResourceVariables.filter( - (variable) => - !resourceVariablesValues.some( - (newVariable) => - newVariable.resourceId === variable.resourceId && - newVariable.key === variable.key, - ), - ); - - if (variablesToDelete.length > 0) - await tx - .delete(resourceVariable) - .where( - inArray( - resourceVariable.id, - variablesToDelete.map((m) => m.id), - ), - ) - .catch((err) => { - log.error("Error deleting resource variables", { error: err }); - throw err; - }); - - return changedResources; -}; - -type ResourceWithMetadata = Resource & { metadata?: Record }; - -const upsertResourceMetadata = async ( - tx: Tx, - resources: Array, -) => { - const resourceIds = resources.map((r) => r.id); - const existingResourceMetadata = await tx - .select() - .from(resourceMetadata) - .where(inArray(resourceMetadata.resourceId, resourceIds)); - - const resourceMetadataValues = resources.flatMap((resource) => { - const { id, metadata = {} } = resource; - - return Object.entries(metadata).map(([key, value]) => ({ - resourceId: id, - key, - value, - })); - }); - - const resourcesWithAddedMetadata = new Set( - resourceMetadataValues - .filter( - (newMetadata) => - !existingResourceMetadata.some( - (metadata) => - metadata.resourceId === newMetadata.resourceId && - metadata.key === newMetadata.key, - ), - ) - .map((metadata) => metadata.resourceId), - ); - - const resourcesWithDeletedMetadata = new Set( - existingResourceMetadata - .filter( - (metadata) => - !resourceMetadataValues.some( - (newMetadata) => - newMetadata.resourceId === metadata.resourceId && - newMetadata.key === metadata.key, - ), - ) - .map((metadata) => metadata.resourceId), - ); - - const resourcesWithUpdatedMetadata = new Set( - resourceMetadataValues - .filter((newMetadata) => - existingResourceMetadata.some( - (metadata) => - metadata.resourceId === newMetadata.resourceId && - metadata.key === newMetadata.key && - metadata.value !== newMetadata.value, - ), - ) - .map((metadata) => metadata.resourceId), - ); - - log.info("resourcesWithAddedMetadata", { resourcesWithAddedMetadata }); - log.info("resourcesWithUpdatedMetadata", { resourcesWithUpdatedMetadata }); - log.info("resourcesWithDeletedMetadata", { resourcesWithDeletedMetadata }); - - const changedResources = new Set([ - ...resourcesWithAddedMetadata, - ...resourcesWithUpdatedMetadata, - ...resourcesWithDeletedMetadata, - ]); - - if (resourceMetadataValues.length > 0) - await tx - .insert(resourceMetadata) - .values(resourceMetadataValues) - .onConflictDoUpdate({ - target: [resourceMetadata.resourceId, resourceMetadata.key], - set: buildConflictUpdateColumns(resourceMetadata, ["value"]), - }) - .catch((err) => { - log.error("Error inserting resource metadata", { error: err }); - throw err; - }); - - const metadataToDelete = existingResourceMetadata.filter( - (metadata) => - !resourceMetadataValues.some( - (newMetadata) => - newMetadata.resourceId === metadata.resourceId && - newMetadata.key === metadata.key, - ), - ); - - if (metadataToDelete.length > 0) - await tx.delete(resourceMetadata).where( - inArray( - resourceMetadata.id, - metadataToDelete.map((m) => m.id), - ), - ); - - return changedResources; -}; - -export type ResourceToInsert = InsertResource & { - metadata?: Record; - variables?: Array<{ key: string; value: any; sensitive: boolean }>; -}; - -export const upsertResources = async ( - tx: Tx, - resourcesToInsert: ResourceToInsert[], -) => { - const workspaceId = resourcesToInsert[0]?.workspaceId; - if (workspaceId == null) throw new Error("Workspace ID is required"); - if (!resourcesToInsert.every((r) => r.workspaceId === workspaceId)) { - throw new Error("All resources must belong to the same workspace"); - } - - log.info("upsertResources", { - resourcesToInsert: resourcesToInsert.map((r) => r.identifier), - }); - - try { - // Get existing resources from the database, grouped by providerId. - // - For resources without a providerId, look them up by workspaceId and - // identifier. - // - For resources with a providerId, get all resources for that provider. - log.info("Upserting resources", { - resourcesToInsertCount: resourcesToInsert.length, - }); - const resourcesBeforeInsertPromises = _.chain(resourcesToInsert) - .groupBy((r) => r.providerId) - .map(async (resources) => { - const providerId = resources[0]?.providerId; - - return providerId == null - ? db - .select() - .from(resource) - .where( - or( - ...resources.map((r) => - and( - eq(resource.workspaceId, r.workspaceId), - eq(resource.identifier, r.identifier), - isNotDeleted, - ), - ), - ), - ) - : getExistingResourcesForProvider(tx, providerId); - }) - .value(); - - const deletedResourcesBeforeInsertPromises = _.chain(resourcesToInsert) - .groupBy((r) => r.providerId) - .map(async (resources) => { - const providerId = resources[0]?.providerId; - return providerId == null - ? db - .select() - .from(resource) - .where( - and( - eq(resource.workspaceId, workspaceId), - isNotNull(resource.deletedAt), - ), - ) - : getDeletedResourcesForProvider(tx, providerId); - }) - .value(); - - const resourcesBeforeInsert = await Promise.all( - resourcesBeforeInsertPromises, - ).then((r) => r.flat()); - - log.info("resourcesBeforeInsert", { - resourcesBeforeInsert: resourcesBeforeInsert.map((r) => r.identifier), - }); - - const deletedResourcesBeforeInsert = await Promise.all( - deletedResourcesBeforeInsertPromises, - ).then((r) => r.flat()); - - log.info("deletedResourcesBeforeInsert", { - deletedResourcesBeforeInsert: deletedResourcesBeforeInsert.map( - (r) => r.identifier, - ), - }); - - const resources = await tx - .insert(resource) - .values(resourcesToInsert) - .onConflictDoUpdate({ - target: [resource.identifier, resource.workspaceId], - set: { - ...buildConflictUpdateColumns(resource, [ - "name", - "version", - "kind", - "config", - "providerId", - ]), - updatedAt: new Date(), - deletedAt: null, - }, - }) - .returning() - .then((resources) => - resources.map((r) => ({ - ...r, - ...resourcesToInsert.find( - (ri) => - ri.identifier === r.identifier && - ri.workspaceId === r.workspaceId, - ), - })), - ); - - const [changedResourcesMetadata, changedResourcesVariables] = - await Promise.all([ - upsertResourceMetadata(tx, resources), - upsertResourceVariables(tx, resources), - ]); - - const changedResourceIds = new Set([ - ...Array.from(changedResourcesMetadata), - ...Array.from(changedResourcesVariables), - ]); - - log.info("changedResourceIds", { changedResourceIds }); - - const newResources = resources.filter( - (r) => - !resourcesBeforeInsert.some((er) => er.identifier === r.identifier), - ); - log.info("new resources", { newResources }); - for (const resource of newResources) changedResourceIds.add(resource.id); - - const previouslySoftDeletedResources = resources.filter((r) => - deletedResourcesBeforeInsert.some((er) => er.identifier === r.identifier), - ); - for (const resource of previouslySoftDeletedResources) - changedResourceIds.add(resource.id); - log.info("previouslySoftDeletedResources", { - previouslySoftDeletedResources, - }); - - log.info("new resources and providerId", { - providerId: resourcesToInsert[0]?.providerId, - newResources, - }); - - if (changedResourceIds.size > 0) - await dispatchChangedResources( - db, - workspaceId, - Array.from(changedResourceIds), - ); - - const resourcesToDelete = resourcesBeforeInsert.filter( - (r) => - !resources.some( - (newResource) => newResource.identifier === r.identifier, - ), - ); - - const newResourceCount = newResources.length; - const resourcesToInsertCount = resourcesToInsert.length; - const resourcesToDeleteCount = resourcesToDelete.length; - const resourcesBeforeInsertCount = resourcesBeforeInsert.length; - log.info( - `Found ${newResourceCount} new resources out of ${resourcesToInsertCount} total resources`, - { - newResourceCount, - resourcesToInsertCount, - resourcesToDeleteCount, - resourcesBeforeInsertCount, - }, - ); - - if (resourcesToDelete.length > 0) { - await deleteResources(tx, resourcesToDelete).catch((err) => { - log.error("Error deleting resources", { error: err }); - throw err; - }); - - log.info(`Deleted ${resourcesToDelete.length} resources`, { - resourcesToDelete, - }); - } - - return resources; - } catch (err) { - log.error("Error upserting resources", { error: err }); - throw err; - } -}; - -const deleteObjectsAssociatedWithResource = (tx: Tx, resource: Resource) => - Promise.all([ - tx - .delete(resourceRelationship) - .where( - or( - eq(resourceRelationship.sourceId, resource.id), - eq(resourceRelationship.targetId, resource.id), - ), - ), - tx - .delete(deploymentResourceRelationship) - .where( - eq( - deploymentResourceRelationship.resourceIdentifier, - resource.identifier, - ), - ), - ]); - -/** - * Delete resources from the database. - * - * @param tx - The transaction to use. - * @param resourceIds - The ids of the resources to delete. - */ -export const deleteResources = async (tx: Tx, resources: Resource[]) => { - const eventsPromises = Promise.all( - resources.map(getEventsForResourceDeleted), - ); - const events = await eventsPromises.then((res) => res.flat()); - await Promise.all(events.map(handleEvent)); - const resourceIds = resources.map((r) => r.id); - const deleteAssociatedObjects = Promise.all( - resources.map((r) => deleteObjectsAssociatedWithResource(tx, r)), - ); - await Promise.all([ - deleteAssociatedObjects, - tx - .update(resource) - .set({ deletedAt: new Date() }) - .where(inArray(resource.id, resourceIds)), - ]); -}; diff --git a/packages/job-dispatch/src/resource/delete.ts b/packages/job-dispatch/src/resource/delete.ts new file mode 100644 index 00000000..7ec4fa4c --- /dev/null +++ b/packages/job-dispatch/src/resource/delete.ts @@ -0,0 +1,57 @@ +import type { Tx } from "@ctrlplane/db"; +import type { Resource } from "@ctrlplane/db/schema"; +import _ from "lodash"; + +import { eq, inArray, or } from "@ctrlplane/db"; +import { + deploymentResourceRelationship, + resource, + resourceRelationship, +} from "@ctrlplane/db/schema"; + +import { getEventsForResourceDeleted, handleEvent } from "../events/index.js"; + +const deleteObjectsAssociatedWithResource = (tx: Tx, resource: Resource) => + Promise.all([ + tx + .delete(resourceRelationship) + .where( + or( + eq(resourceRelationship.sourceId, resource.id), + eq(resourceRelationship.targetId, resource.id), + ), + ), + tx + .delete(deploymentResourceRelationship) + .where( + eq( + deploymentResourceRelationship.resourceIdentifier, + resource.identifier, + ), + ), + ]); + +/** + * Delete resources from the database. + * + * @param tx - The transaction to use. + * @param resourceIds - The ids of the resources to delete. + */ +export const deleteResources = async (tx: Tx, resources: Resource[]) => { + const eventsPromises = Promise.all( + resources.map(getEventsForResourceDeleted), + ); + const events = await eventsPromises.then((res) => res.flat()); + await Promise.all(events.map(handleEvent)); + const resourceIds = resources.map((r) => r.id); + const deleteAssociatedObjects = Promise.all( + resources.map((r) => deleteObjectsAssociatedWithResource(tx, r)), + ); + await Promise.all([ + deleteAssociatedObjects, + tx + .update(resource) + .set({ deletedAt: new Date() }) + .where(inArray(resource.id, resourceIds)), + ]); +}; diff --git a/packages/job-dispatch/src/resource/dispatch-resource.ts b/packages/job-dispatch/src/resource/dispatch-resource.ts new file mode 100644 index 00000000..de296dfa --- /dev/null +++ b/packages/job-dispatch/src/resource/dispatch-resource.ts @@ -0,0 +1,192 @@ +import type { Tx } from "@ctrlplane/db"; +import { isPresent } from "ts-is-present"; + +import { and, desc, eq, inArray, takeFirstOrNull } from "@ctrlplane/db"; +import * as SCHEMA from "@ctrlplane/db/schema"; +import { logger } from "@ctrlplane/logger"; + +import { handleEvent } from "../events/index.js"; +import { dispatchReleaseJobTriggers } from "../job-dispatch.js"; +import { isPassingAllPolicies } from "../policy-checker.js"; +import { createJobApprovals } from "../policy-create.js"; +import { createReleaseJobTriggers } from "../release-job-trigger.js"; + +const log = logger.child({ label: "dispatch-resource" }); + +/** + * Gets an environment with its associated release channels, policy, and system information + * @param db - Database transaction + * @param envId - Environment ID to look up + * @returns Promise resolving to the environment with its relationships or null if not found + */ +const getEnvironmentWithReleaseChannels = (db: Tx, envId: string) => + db.query.environment.findFirst({ + where: eq(SCHEMA.environment.id, envId), + with: { + releaseChannels: { with: { releaseChannel: true } }, + policy: { + with: { + environmentPolicyReleaseChannels: { with: { releaseChannel: true } }, + }, + }, + system: { with: { deployments: true } }, + }, + }); + +/** + * Dispatches jobs for newly added resources in an environment + * @param db - Database transaction + * @param resourceIds - IDs of the resources that were added + * @param envId - ID of the environment the resources were added to + */ +export async function dispatchJobsForAddedResources( + db: Tx, + resourceIds: string[], + envId: string, +): Promise { + log.info("Dispatching jobs for added resources", { resourceIds, envId }); + + const environment = await getEnvironmentWithReleaseChannels(db, envId); + if (environment == null) { + log.warn("Environment not found", { envId }); + return; + } + + const { releaseChannels, policy, system } = environment; + const { deployments } = system; + const policyReleaseChannels = policy?.environmentPolicyReleaseChannels ?? []; + const deploymentsWithReleaseFilter = deployments.map((deployment) => { + const envReleaseChannel = releaseChannels.find( + (erc) => erc.deploymentId === deployment.id, + ); + const policyReleaseChannel = policyReleaseChannels.find( + (prc) => prc.deploymentId === deployment.id, + ); + + const { releaseFilter } = + envReleaseChannel?.releaseChannel ?? + policyReleaseChannel?.releaseChannel ?? + {}; + return { ...deployment, releaseFilter }; + }); + + log.debug("Fetching latest releases", { + deploymentCount: deployments.length, + }); + const releasePromises = deploymentsWithReleaseFilter.map( + ({ id, releaseFilter }) => + db + .select() + .from(SCHEMA.release) + .where( + and( + eq(SCHEMA.release.deploymentId, id), + SCHEMA.releaseMatchesCondition(db, releaseFilter ?? undefined), + ), + ) + .orderBy(desc(SCHEMA.release.createdAt)) + .limit(1) + .then(takeFirstOrNull), + ); + + const releases = await Promise.all(releasePromises).then((rows) => + rows.filter(isPresent), + ); + if (releases.length === 0) { + log.info("No releases found for deployments"); + return; + } + + log.debug("Creating release job triggers"); + const releaseJobTriggers = await createReleaseJobTriggers(db, "new_resource") + .resources(resourceIds) + .environments([envId]) + .releases(releases.map((r) => r.id)) + .then(createJobApprovals) + .insert(); + + if (releaseJobTriggers.length === 0) { + log.info("No job triggers created"); + return; + } + + log.debug("Dispatching release job triggers", { + count: releaseJobTriggers.length, + }); + await dispatchReleaseJobTriggers(db) + .filter(isPassingAllPolicies) + .releaseTriggers(releaseJobTriggers) + .dispatch(); + + log.info("Successfully dispatched jobs for added resources", { + resourceCount: resourceIds.length, + triggerCount: releaseJobTriggers.length, + }); +} + +/** + * Gets all deployments associated with an environment + * @param db - Database transaction + * @param envId - Environment ID to get deployments for + * @returns Promise resolving to array of deployments + */ +const getEnvironmentDeployments = (db: Tx, envId: string) => + db + .select() + .from(SCHEMA.deployment) + .innerJoin(SCHEMA.system, eq(SCHEMA.deployment.systemId, SCHEMA.system.id)) + .innerJoin( + SCHEMA.environment, + eq(SCHEMA.system.id, SCHEMA.environment.systemId), + ) + .where(eq(SCHEMA.environment.id, envId)) + .then((rows) => rows.map((r) => r.deployment)); + +/** + * Dispatches hook events for resources that were removed from an environment + * @param db - Database transaction + * @param resourceIds - IDs of the resources that were removed + * @param envId - ID of the environment the resources were removed from + */ +export const dispatchEventsForRemovedResources = async ( + db: Tx, + resourceIds: string[], + envId: string, +): Promise => { + log.info("Dispatching events for removed resources", { resourceIds, envId }); + + const deployments = await getEnvironmentDeployments(db, envId); + if (deployments.length === 0) { + log.info("No deployments found for environment"); + return; + } + + const resources = await db.query.resource.findMany({ + where: inArray(SCHEMA.resource.id, resourceIds), + }); + + log.debug("Creating removal events", { + resourceCount: resources.length, + deploymentCount: deployments.length, + }); + const events = resources.flatMap((resource) => + deployments.map((deployment) => ({ + action: "deployment.resource.removed" as const, + payload: { deployment, resource }, + })), + ); + + log.debug("Handling removal events", { eventCount: events.length }); + const handleEventPromises = events.map(handleEvent); + const results = await Promise.allSettled(handleEventPromises); + + const failures = results.filter((r) => r.status === "rejected").length; + if (failures > 0) + log.warn("Some removal events failed", { failureCount: failures }); + + log.info("Finished dispatching removal events", { + total: events.length, + succeeded: events.length - failures, + failed: failures, + }); +}; diff --git a/packages/job-dispatch/src/resource/index.ts b/packages/job-dispatch/src/resource/index.ts new file mode 100644 index 00000000..40a2163e --- /dev/null +++ b/packages/job-dispatch/src/resource/index.ts @@ -0,0 +1,3 @@ +export * from "./upsert.js"; +export * from "./delete.js"; +export * from "./dispatch-resource.js"; diff --git a/packages/job-dispatch/src/resource/insert-resource-metadata.ts b/packages/job-dispatch/src/resource/insert-resource-metadata.ts new file mode 100644 index 00000000..46aa3cf9 --- /dev/null +++ b/packages/job-dispatch/src/resource/insert-resource-metadata.ts @@ -0,0 +1,32 @@ +import type { Tx } from "@ctrlplane/db"; +import type { Resource } from "@ctrlplane/db/schema"; + +import { buildConflictUpdateColumns } from "@ctrlplane/db"; +import * as schema from "@ctrlplane/db/schema"; + +export type ResourceWithMetadata = Resource & { + metadata?: Record; +}; + +export const insertResourceMetadata = async ( + tx: Tx, + resources: ResourceWithMetadata[], +) => { + const resourceMetadataValues = resources.flatMap((resource) => { + const { id, metadata = {} } = resource; + return Object.entries(metadata).map(([key, value]) => ({ + resourceId: id, + key, + value, + })); + }); + if (resourceMetadataValues.length === 0) return; + + return tx + .insert(schema.resourceMetadata) + .values(resourceMetadataValues) + .onConflictDoUpdate({ + target: [schema.resourceMetadata.key, schema.resourceMetadata.resourceId], + set: buildConflictUpdateColumns(schema.resourceMetadata, ["value"]), + }); +}; diff --git a/packages/job-dispatch/src/resource/insert-resource-variables.ts b/packages/job-dispatch/src/resource/insert-resource-variables.ts new file mode 100644 index 00000000..9743f246 --- /dev/null +++ b/packages/job-dispatch/src/resource/insert-resource-variables.ts @@ -0,0 +1,70 @@ +import type { Tx } from "@ctrlplane/db"; +import type { Resource } from "@ctrlplane/db/schema"; +import _ from "lodash"; + +import { buildConflictUpdateColumns, inArray } from "@ctrlplane/db"; +import * as schema from "@ctrlplane/db/schema"; +import { variablesAES256 } from "@ctrlplane/secrets"; + +export type ResourceWithVariables = Resource & { + variables?: Array<{ key: string; value: any; sensitive: boolean }>; +}; + +export const insertResourceVariables = async ( + tx: Tx, + resources: ResourceWithVariables[], +) => { + const resourceIds = resources.map(({ id }) => id); + const existingVariables = await tx + .select() + .from(schema.resourceVariable) + .where(inArray(schema.resourceVariable.resourceId, resourceIds)); + + const resourceVariablesValues = resources.flatMap(({ id, variables = [] }) => + variables.map(({ key, value, sensitive }) => ({ + resourceId: id, + key, + value: sensitive + ? variablesAES256().encrypt(JSON.stringify(value)) + : value, + sensitive, + })), + ); + + if (resourceVariablesValues.length === 0) return; + + const updatedVariables = await tx + .insert(schema.resourceVariable) + .values(resourceVariablesValues) + .onConflictDoUpdate({ + target: [schema.resourceVariable.key, schema.resourceVariable.resourceId], + set: buildConflictUpdateColumns(schema.resourceVariable, [ + "value", + "sensitive", + ]), + }) + .returning(); + + const created = _.differenceWith( + updatedVariables, + existingVariables, + (a, b) => a.resourceId === b.resourceId && a.key === b.key, + ); + + const deleted = _.differenceWith( + existingVariables, + updatedVariables, + (a, b) => a.resourceId === b.resourceId && a.key === b.key, + ); + + const updated = _.intersectionWith( + updatedVariables, + existingVariables, + (a, b) => + a.resourceId === b.resourceId && + a.key === b.key && + (a.value !== b.value || a.sensitive !== b.sensitive), + ); + + return { created, deleted, updated }; +}; diff --git a/packages/job-dispatch/src/resource/insert-resources.ts b/packages/job-dispatch/src/resource/insert-resources.ts new file mode 100644 index 00000000..ebcc753f --- /dev/null +++ b/packages/job-dispatch/src/resource/insert-resources.ts @@ -0,0 +1,120 @@ +import type { Tx } from "@ctrlplane/db"; +import type { InsertResource, Resource } from "@ctrlplane/db/schema"; +import _ from "lodash"; + +import { and, buildConflictUpdateColumns, eq, or } from "@ctrlplane/db"; +import { resource } from "@ctrlplane/db/schema"; + +/** + * Gets resources for a specific provider + * @param tx - Database transaction + * @param providerId - ID of the provider to get resources for + * @param options - Options object + * @returns Promise resolving to array of resources + */ +const getResourcesByProvider = (tx: Tx, providerId: string) => + tx.select().from(resource).where(eq(resource.providerId, providerId)); + +const getResourcesByWorkspaceIdAndIdentifier = ( + tx: Tx, + resources: { workspaceId: string; identifier: string }[], +) => + tx + .select() + .from(resource) + .where( + or( + ...resources.map((r) => + and( + eq(resource.workspaceId, r.workspaceId), + eq(resource.identifier, r.identifier), + ), + ), + ), + ); + +/** + * Fetches existing resources from the database that match the resources to be + * inserted. For resources without a providerId, looks them up by workspaceId + * and identifier. For resources with a providerId, gets all resources for that + * provider. + * + * @param tx - Database transaction + * @param resourcesToInsert - Array of resources to be inserted + * @returns Promise resolving to array of existing resources + */ +const findExistingResources = async ( + tx: Tx, + resourcesToInsert: InsertResource[], +): Promise => { + const resourcesByProvider = _.groupBy( + resourcesToInsert, + (r) => r.providerId ?? "null", + ); + + const promises = Object.entries(resourcesByProvider).map( + ([providerId, resources]) => + providerId === "null" + ? getResourcesByWorkspaceIdAndIdentifier(tx, resources) + : getResourcesByProvider(tx, providerId), + ); + + const results = await Promise.all(promises); + return results.flat(); +}; + +/** + * Inserts or updates resources in the database. Note that this function only + * handles the core resource fields - it does not insert/update associated + * metadata or variables. Those must be handled separately. + * + * @param tx - Database transaction + * @param resourcesToInsert - Array of resources to insert/update. Can include + * metadata and variables but these will not be + * persisted by this function. + * @returns Promise resolving to array of inserted/updated resources, with any + * metadata/variables from the input merged onto the DB records + */ +export const insertResources = async ( + tx: Tx, + resourcesToInsert: InsertResource[], +) => { + const existingResources = await findExistingResources(tx, resourcesToInsert); + + const resources = await tx + .insert(resource) + .values(resourcesToInsert) + .onConflictDoUpdate({ + target: [resource.identifier, resource.workspaceId], + set: { + ...buildConflictUpdateColumns(resource, [ + "name", + "version", + "kind", + "config", + "providerId", + ]), + updatedAt: new Date(), + deletedAt: null, + }, + }) + .returning(); + + const created = _.differenceBy( + resources, + existingResources, + (t) => t.identifier, + ); + const deleted = _.differenceBy( + existingResources, + resources, + (t) => t.identifier, + ); + const updated = _.intersectionBy( + resources, + existingResources, + (t) => t.identifier, + ); + + return { all: resources, created, deleted, updated }; +}; diff --git a/packages/job-dispatch/src/resource/upsert.ts b/packages/job-dispatch/src/resource/upsert.ts new file mode 100644 index 00000000..5984d529 --- /dev/null +++ b/packages/job-dispatch/src/resource/upsert.ts @@ -0,0 +1,142 @@ +import type { Tx } from "@ctrlplane/db"; +import type { InsertResource } from "@ctrlplane/db/schema"; +import _ from "lodash"; + +import { logger } from "@ctrlplane/logger"; + +import { deleteResources } from "./delete.js"; +import { + dispatchEventsForRemovedResources, + dispatchJobsForAddedResources, +} from "./dispatch-resource.js"; +import { insertResourceMetadata } from "./insert-resource-metadata.js"; +import { insertResourceVariables } from "./insert-resource-variables.js"; +import { insertResources } from "./insert-resources.js"; +import { getEnvironmentsByResourceWithIdentifiers } from "./utils.js"; + +const log = logger.child({ label: "upsert-resources" }); + +export type ResourceToInsert = InsertResource & { + metadata?: Record; + variables?: Array<{ key: string; value: any; sensitive: boolean }>; +}; + +export const upsertResources = async ( + tx: Tx, + resourcesToInsert: ResourceToInsert[], +) => { + log.info("Starting resource upsert", { + count: resourcesToInsert.length, + identifiers: resourcesToInsert.map((r) => r.identifier), + }); + + const workspaceId = resourcesToInsert[0]?.workspaceId; + if (workspaceId == null) throw new Error("Workspace ID is required"); + if (!resourcesToInsert.every((r) => r.workspaceId === workspaceId)) + throw new Error("All resources must belong to the same workspace"); + + try { + const resourceIdentifiers = resourcesToInsert.map((r) => r.identifier); + log.info("Getting environments before insert", { resourceIdentifiers }); + const envsBeforeInsert = await getEnvironmentsByResourceWithIdentifiers( + tx, + workspaceId, + resourceIdentifiers, + ); + + log.debug("Envs before insert", { + envs: envsBeforeInsert.map((e) => ({ + id: e.id, + resources: e.resources.map((r) => r.identifier), + })), + }); + + log.debug("Inserting resources"); + const resources = await insertResources(tx, resourcesToInsert); + const resourcesWithId = resources.all.map((r) => ({ + ...r, + ...resourcesToInsert.find( + (ri) => + ri.identifier === r.identifier && ri.workspaceId === r.workspaceId, + ), + })); + + log.debug("Inserting resource metadata and variables"); + await Promise.all([ + insertResourceMetadata(tx, resourcesWithId), + insertResourceVariables(tx, resourcesWithId), + ]); + + log.debug("Getting environments after insert"); + const envsAfterInsert = await getEnvironmentsByResourceWithIdentifiers( + tx, + workspaceId, + resourceIdentifiers, + ); + + log.debug("Envs after insert", { + envs: envsAfterInsert.map((e) => ({ + id: e.id, + resources: e.resources.map((r) => r.identifier), + })), + }); + const changedEnvs = envsAfterInsert.map((env) => { + const beforeEnv = envsBeforeInsert.find((e) => e.id === env.id); + const beforeResources = beforeEnv?.resources ?? []; + const afterResources = env.resources; + const removedResources = beforeResources.filter( + (br) => !afterResources.some((ar) => ar.id === br.id), + ); + const addedResources = afterResources.filter( + (ar) => !beforeResources.some((br) => br.id === ar.id), + ); + return { ...env, removedResources, addedResources }; + }); + + const deletedResourceIds = new Set(resources.deleted.map((r) => r.id)); + if (resources.deleted.length > 0) { + log.info("Deleting resources", { count: resources.deleted.length }); + await deleteResources(tx, resources.deleted).catch((err) => { + log.error("Error deleting resources", { error: err }); + throw err; + }); + } + + for (const env of changedEnvs) { + if (env.addedResources.length > 0) { + log.info("Dispatching jobs for added resources", { + envId: env.id, + count: env.addedResources.length, + }); + await dispatchJobsForAddedResources( + tx, + env.addedResources.map((r) => r.id), + env.id, + ); + } + + if (env.removedResources.length > 0) { + const removedIds = env.removedResources + .map((r) => r.id) + .filter((id) => !deletedResourceIds.has(id)); + + if (removedIds.length > 0) { + log.info("Dispatching hook events for removed resources", { + envId: env.id, + count: removedIds.length, + }); + await dispatchEventsForRemovedResources(tx, removedIds, env.id); + } + } + } + + log.info("Resource upsert completed successfully", { + added: resources.all.length, + deleted: resources.deleted.length, + }); + return resources; + } catch (err) { + log.error("Error upserting resources", { error: err }); + throw err; + } +}; diff --git a/packages/job-dispatch/src/resource/utils.ts b/packages/job-dispatch/src/resource/utils.ts new file mode 100644 index 00000000..22e71ff9 --- /dev/null +++ b/packages/job-dispatch/src/resource/utils.ts @@ -0,0 +1,106 @@ +import type { Tx } from "@ctrlplane/db"; + +import { and, eq, inArray, isNotNull, isNull, or } from "@ctrlplane/db"; +import * as schema from "@ctrlplane/db/schema"; + +/** + * Gets resources for a specific provider + * @param tx - Database transaction + * @param providerId - ID of the provider to get resources for + * @param options - Options object + * @param options.deleted - If true, returns deleted resources. If false, returns non-deleted resources + * @returns Promise resolving to array of resources + */ +export const getResourcesByProvider = ( + tx: Tx, + providerId: string, + options: { deleted: boolean } = { deleted: false }, +) => + tx + .select() + .from(schema.resource) + .where( + and( + eq(schema.resource.providerId, providerId), + options.deleted + ? isNotNull(schema.resource.deletedAt) + : isNull(schema.resource.deletedAt), + ), + ); + +/** + * Gets resources matching the provided workspace IDs and identifiers + * Can filter for either deleted or non-deleted resources + * + * @param tx - Database transaction + * @param resources - Array of objects containing workspaceId and identifier to look up + * @param options - Options object + * @param options.deleted - If true, returns deleted resources. If false, returns non-deleted resources + * @returns Promise resolving to array of matching resources + */ +export const getResourcesByWorkspaceIdAndIdentifier = ( + tx: Tx, + resources: { workspaceId: string; identifier: string }[], + options: { deleted: boolean } = { deleted: false }, +) => + tx + .select() + .from(schema.resource) + .where( + or( + ...resources.map((r) => + and( + eq(schema.resource.workspaceId, r.workspaceId), + eq(schema.resource.identifier, r.identifier), + options.deleted + ? isNotNull(schema.resource.deletedAt) + : isNull(schema.resource.deletedAt), + ), + ), + ), + ); + +/** + * Groups provided resources by workspace environments matching them + * + * @param tx - Database transaction + * @param workspaceId - ID of the workspace to get environments for + * @param resourceIdentifiers - Array of resource identifiers to look up + * @returns Promise resolving to array of environments + */ +export const getEnvironmentsByResourceWithIdentifiers = ( + tx: Tx, + workspaceId: string, + resourceIdentifiers: string[], +) => + tx + .select({ + id: schema.environment.id, + resourceFilter: schema.environment.resourceFilter, + }) + .from(schema.environment) + .innerJoin(schema.system, eq(schema.environment.systemId, schema.system.id)) + .where( + and( + eq(schema.system.workspaceId, workspaceId), + isNotNull(schema.environment.resourceFilter), + ), + ) + .then((envs) => + Promise.all( + envs.map(async (env) => ({ + ...env, + resources: await tx + .select() + .from(schema.resource) + .where( + and( + inArray(schema.resource.identifier, resourceIdentifiers), + eq(schema.resource.workspaceId, workspaceId), + schema.resourceMatchesMetadata(tx, env.resourceFilter), + isNull(schema.resource.deletedAt), + ), + ), + })), + ), + );