diff --git a/bun.lockb b/bun.lockb index 4591e9f..2e5c1d7 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/mikro-orm.config.ts b/mikro-orm.config.ts index a0c3e39..3d46ccf 100644 --- a/mikro-orm.config.ts +++ b/mikro-orm.config.ts @@ -47,7 +47,7 @@ export default defineConfig({ WorkflowEditEvent, Attachment ], - dbName: 'storage/comfyui.manager.db', + dbName: process.cwd() + '/storage/comfyui.manager.db', debug: process.env.NODE_ENV === 'development', discovery: { disableDynamicFileAccess: false } }) diff --git a/package.json b/package.json index 5673dfa..d951bf2 100644 --- a/package.json +++ b/package.json @@ -28,10 +28,10 @@ "@hookform/resolvers": "^3.9.1", "@langchain/core": "^0.3.25", "@langchain/openai": "^0.3.16", - "@mikro-orm/cli": "6.4.1", - "@mikro-orm/core": "6.4.1", - "@mikro-orm/libsql": "6.4.1", - "@mikro-orm/reflection": "6.4.1", + "@mikro-orm/cli": "^6.4.3", + "@mikro-orm/core": "^6.4.3", + "@mikro-orm/libsql": "^6.4.3", + "@mikro-orm/reflection": "^6.4.3", "@radix-ui/react-accordion": "^1.2.2", "@radix-ui/react-alert-dialog": "^1.1.3", "@radix-ui/react-avatar": "^1.1.2", diff --git a/package.prod.json b/package.prod.json index 0e8c2e0..7bb231f 100644 --- a/package.prod.json +++ b/package.prod.json @@ -18,10 +18,10 @@ "@hookform/resolvers": "^3.9.1", "@langchain/core": "^0.3.25", "@langchain/openai": "^0.3.16", - "@mikro-orm/cli": "6.4.1", - "@mikro-orm/core": "6.4.1", - "@mikro-orm/libsql": "6.4.1", - "@mikro-orm/reflection": "6.4.1", + "@mikro-orm/cli": "^6.4.3", + "@mikro-orm/core": "^6.4.3", + "@mikro-orm/libsql": "^6.4.3", + "@mikro-orm/reflection": "^6.4.3", "@saintno/comfyui-sdk": "^0.2.43", "@saintno/needed-tools": "^0.3.5", "sharp": "^0.33.5", diff --git a/server/services/comfyui.ts b/server/services/comfyui.ts index afbc22a..41c7478 100644 --- a/server/services/comfyui.ts +++ b/server/services/comfyui.ts @@ -275,198 +275,233 @@ export class ComfyPoolInstance { } private async pickingJob() { + let tries = 1 const pool = this.pool - const em = await MikroORMInstance.getInstance().getEM() - const userRep = em.getRepository(User) - - const queuingTasks = await em.find( - WorkflowTask, - { - status: { - $in: [ETaskStatus.Queuing] + const emRaw = await MikroORMInstance.getInstance().getEM() + while (true) { + const em = emRaw.fork() + const userRep = em.getRepository(User) + const queuingTasks = await em.find( + WorkflowTask, + { + status: ETaskStatus.Queuing + }, + { + limit: 10, + populate: ['workflow', 'parent', 'trigger.user.weightOffset', 'workflow.rawWorkflow'], + orderBy: { createdAt: 'ASC' } } - }, - { - limit: 10, - populate: ['workflow', 'parent', 'trigger.user.weightOffset', 'workflow.rawWorkflow'], - orderBy: { createdAt: 'ASC' } - } - ) - try { - if (queuingTasks.length > 0) { - for (let i = 0; i < queuingTasks.length; i++) { - const task = queuingTasks[i] - const user = task.trigger?.user - const workflow = task.workflow - try { - const input = task.inputValues - let builder = getBuilder(workflow) - await this.updateTaskEventFn(em, task, ETaskStatus.Pending) - if (user) { - this.cachingService.set('USER_EXECUTING_TASK', user.id, Date.now()) - } - pool.run(async (api) => { - if (api.ext.manager.isSupported) { - // Set preview method before execute - // TODO: Add setting for this - void api.ext.manager.previewMethod('latent2rgb').catch((e) => { - console.error(e) - }) + ) + try { + if (queuingTasks.length > 0) { + tries = 0 + for (let i = 0; i < queuingTasks.length; i++) { + const task = queuingTasks[i] + const user = task.trigger?.user + const workflow = task.workflow + try { + const input = task.inputValues + let builder = getBuilder(workflow) + await this.updateTaskEventFn(em, task, ETaskStatus.Pending) + if (user) { + this.cachingService.set('USER_EXECUTING_TASK', user.id, Date.now()) } - const start = performance.now() - try { - const client = await em.findOne(Client, { id: api.id }) - if (client) { - task.client = client - await em.persist(task).flush() - await this.cachingService.set('LAST_TASK_CLIENT', api.id, Date.now()) + pool.run(async (api) => { + if (api.ext.manager.isSupported) { + // Set preview method before execute + // TODO: Add setting for this + void api.ext.manager.previewMethod('latent2rgb').catch((e) => { + console.error(e) + }) } - for (const key in input) { - if (!workflow.mapInput?.[key]) { - this.logger.w('pickingJob', `Input key ${key} not found in workflow map`, { - key, - workflowId: workflow.id - }) - continue - } - const inputData = input[key] || workflow.mapInput?.[key].default - if (!inputData) { - continue + const start = performance.now() + try { + const client = await em.findOne(Client, { id: api.id }) + if (client) { + task.client = client + await em.persist(task).flush() + await this.cachingService.set('LAST_TASK_CLIENT', api.id, Date.now()) } - switch (workflow.mapInput?.[key].type) { - case EValueType.Number: - case EValueUtilityType.Seed: - builder.input(key, Number(inputData)) - break - case EValueUtilityType.Prefixer: - builder.input(key, task.id) - break - case EValueType.String: - builder.input(key, String(inputData)) - break - case EValueType.File: - case EValueType.Video: - case EValueType.Image: - const attachmentId = inputData as string - const file = await em.findOneOrFail(Attachment, { id: attachmentId }) - const fileBlob = await AttachmentService.getInstance().getFileBlob(file.fileName) - if (!fileBlob) { - await this.updateTaskEventFn(em, task, ETaskStatus.Failed, { - details: `Can not load attachments ${file.fileName}`, - clientId: api.id - }) - await this.cachingService.set('LAST_TASK_CLIENT', api.id, Date.now()) - return - } - const uploadedImg = await api.uploadImage(fileBlob, file.fileName) - if (!uploadedImg) { - await this.updateTaskEventFn(em, task, ETaskStatus.Failed, { - details: `Failed to upload attachment into comfy server, ${file.fileName}`, - clientId: api.id - }) - await this.cachingService.set('LAST_TASK_CLIENT', api.id, Date.now()) - return - } - builder.input(key, uploadedImg.info.filename) - break - default: - builder.input(key, inputData) - break + for (const key in input) { + if (!workflow.mapInput?.[key]) { + this.logger.w('pickingJob', `Input key ${key} not found in workflow map`, { + key, + workflowId: workflow.id + }) + continue + } + const inputData = input[key] || workflow.mapInput?.[key].default + if (!inputData) { + continue + } + switch (workflow.mapInput?.[key].type) { + case EValueType.Number: + case EValueUtilityType.Seed: + builder.input(key, Number(inputData)) + break + case EValueUtilityType.Prefixer: + builder.input(key, task.id) + break + case EValueType.String: + builder.input(key, String(inputData)) + break + case EValueType.File: + case EValueType.Video: + case EValueType.Image: + const attachmentId = inputData as string + const file = await em.findOneOrFail(Attachment, { id: attachmentId }) + const fileBlob = await AttachmentService.getInstance().getFileBlob(file.fileName) + if (!fileBlob) { + await this.updateTaskEventFn(em, task, ETaskStatus.Failed, { + details: `Can not load attachments ${file.fileName}`, + clientId: api.id + }) + await this.cachingService.set('LAST_TASK_CLIENT', api.id, Date.now()) + return + } + const uploadedImg = await api.uploadImage(fileBlob, file.fileName) + if (!uploadedImg) { + await this.updateTaskEventFn(em, task, ETaskStatus.Failed, { + details: `Failed to upload attachment into comfy server, ${file.fileName}`, + clientId: api.id + }) + await this.cachingService.set('LAST_TASK_CLIENT', api.id, Date.now()) + return + } + builder.input(key, uploadedImg.info.filename) + break + default: + builder.input(key, inputData) + break + } } - } - console.log(JSON.stringify(builder.workflow)) - return new CallWrapper(api, builder) - .onPending(async () => { - await this.updateTaskEventFn(em, task, ETaskStatus.Running, { - details: 'LOADING RESOURCES', - clientId: api.id - }) - }) - .onProgress(async (e) => { - await this.updateTaskEventFn(em, task, ETaskStatus.Running, { - details: JSON.stringify({ - key: 'progress', - data: { node: Number(e.node), max: Number(e.max), value: Number(e.value) } - }), - clientId: api.id - }) - }) - .onPreview(async (e) => { - const arrayBuffer = await e.arrayBuffer() - const base64String = Buffer.from(arrayBuffer).toString('base64') - await this.cachingService.set('PREVIEW', task.id, { blob64: base64String }) - }) - .onStart(async () => { - await this.updateTaskEventFn(em, task, ETaskStatus.Running, { - details: 'START RENDERING', - clientId: api.id + console.log(JSON.stringify(builder.workflow)) + return new CallWrapper(api, builder) + .onPending(async () => { + await this.updateTaskEventFn(em, task, ETaskStatus.Running, { + details: 'LOADING RESOURCES', + clientId: api.id + }) }) - }) - .onFinished((outData) => { - const backgroundFn = async () => { - await this.updateTaskEventFn(em, task, ETaskStatus.Success, { - details: 'DOWNLOADING OUTPUT', + .onProgress(async (e) => { + await this.updateTaskEventFn(em, task, ETaskStatus.Running, { + details: JSON.stringify({ + key: 'progress', + data: { node: Number(e.node), max: Number(e.max), value: Number(e.value) } + }), clientId: api.id }) - const attachment = AttachmentService.getInstance() - const output = await parseOutput(api, workflow, outData) - await this.updateTaskEventFn(em, task, ETaskStatus.Success, { - details: 'UPLOADING OUTPUT', + }) + .onPreview(async (e) => { + const arrayBuffer = await e.arrayBuffer() + const base64String = Buffer.from(arrayBuffer).toString('base64') + await this.cachingService.set('PREVIEW', task.id, { blob64: base64String }) + }) + .onStart(async () => { + await this.updateTaskEventFn(em, task, ETaskStatus.Running, { + details: 'START RENDERING', clientId: api.id }) - const tmpOutput = cloneDeep(output) as Record - // If key is array of Blob, convert it to base64 - for (const key in tmpOutput) { - if (Array.isArray(tmpOutput[key])) { - tmpOutput[key] = (await Promise.all( - tmpOutput[key].map(async (v, idx) => { - console.log('out', key, v) - if (v instanceof Blob) { - // Check if v is Video, Image or others - const blobType = classifyBlob(v) - switch (blobType) { - case 'image': { - await this.handleImageOutput(v, { key, idx, task, workflow }, attachment, em) - break - } - case 'video': { - await this.handleVideoOutput(v, { key, idx, task, workflow }, attachment, em) - break - } - default: { + }) + .onFinished((outData) => { + const backgroundFn = async () => { + await this.updateTaskEventFn(em, task, ETaskStatus.Success, { + details: 'DOWNLOADING OUTPUT', + clientId: api.id + }) + const attachment = AttachmentService.getInstance() + const output = await parseOutput(api, workflow, outData) + await this.updateTaskEventFn(em, task, ETaskStatus.Success, { + details: 'UPLOADING OUTPUT', + clientId: api.id + }) + const tmpOutput = cloneDeep(output) as Record + // If key is array of Blob, convert it to base64 + for (const key in tmpOutput) { + if (Array.isArray(tmpOutput[key])) { + tmpOutput[key] = (await Promise.all( + tmpOutput[key].map(async (v, idx) => { + console.log('out', key, v) + if (v instanceof Blob) { + // Check if v is Video, Image or others + const blobType = classifyBlob(v) + switch (blobType) { + case 'image': { + await this.handleImageOutput(v, { key, idx, task, workflow }, attachment, em) + break + } + case 'video': { + await this.handleVideoOutput(v, { key, idx, task, workflow }, attachment, em) + break + } + default: { + } } } + return v + }) + )) as string[] + } + } + const outputConfig = workflow.mapOutput + const outputData = Object.keys(outputConfig || {}).reduce( + (acc, val) => { + if (tmpOutput[val] && outputConfig?.[val]) { + acc[val] = { + type: outputConfig[val].type as EValueType, + value: tmpOutput[val] as any } - return v - }) - )) as string[] + } + return acc + }, + {} as Record< + string, + { + type: EValueType + value: any + } + > + ) + task.executionTime = performance.now() - start + task.outputValues = outputData + if (user) { + userRep.makeNotify(user, { + title: `Task is finished`, + type: ENotificationType.Info, + target: { + targetType: ENotificationTarget.WorkflowTask, + targetId: task.id + } + }) + this.cachingService.set('USER_EXECUTING_TASK', user.id, Date.now()) } + await Promise.all([ + em.flush(), + this.updateTaskEventFn(em, task, ETaskStatus.Success, { + details: 'FINISHED', + clientId: api.id, + data: outData + }) + ]) } - const outputConfig = workflow.mapOutput - const outputData = Object.keys(outputConfig || {}).reduce( - (acc, val) => { - if (tmpOutput[val] && outputConfig?.[val]) { - acc[val] = { - type: outputConfig[val].type as EValueType, - value: tmpOutput[val] as any - } - } - return acc - }, - {} as Record< - string, - { - type: EValueType - value: any - } - > + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Task execution timed out')), 60000) ) - task.executionTime = performance.now() - start - task.outputValues = outputData + Promise.race([backgroundFn(), timeoutPromise]).catch(async (e) => { + await this.updateTaskEventFn(em, task, ETaskStatus.Failed, { + details: (e.cause as any)?.error?.message || e.message, + clientId: api.id + }) + console.error(e) + }) + }) + .onFailed(async (e) => { if (user) { userRep.makeNotify(user, { - title: `Task is finished`, - type: ENotificationType.Info, + title: `Task is failed`, + type: ENotificationType.Error, + priority: 2, + description: (e.cause as any)?.error?.message || e.message, target: { targetType: ENotificationTarget.WorkflowTask, targetId: task.id @@ -474,86 +509,55 @@ export class ComfyPoolInstance { }) this.cachingService.set('USER_EXECUTING_TASK', user.id, Date.now()) } - await Promise.all([ - em.flush(), - this.updateTaskEventFn(em, task, ETaskStatus.Success, { - details: 'FINISHED', - clientId: api.id, - data: outData - }) - ]) - } - const timeoutPromise = new Promise((_, reject) => - setTimeout(() => reject(new Error('Task execution timed out')), 60000) - ) - Promise.race([backgroundFn(), timeoutPromise]).catch(async (e) => { await this.updateTaskEventFn(em, task, ETaskStatus.Failed, { details: (e.cause as any)?.error?.message || e.message, clientId: api.id }) console.error(e) }) - }) - .onFailed(async (e) => { - if (user) { - userRep.makeNotify(user, { - title: `Task is failed`, - type: ENotificationType.Error, - priority: 2, - description: (e.cause as any)?.error?.message || e.message, - target: { - targetType: ENotificationTarget.WorkflowTask, - targetId: task.id - } - }) - this.cachingService.set('USER_EXECUTING_TASK', user.id, Date.now()) - } - await this.updateTaskEventFn(em, task, ETaskStatus.Failed, { - details: (e.cause as any)?.error?.message || e.message, - clientId: api.id + .run() + .catch(async (e) => { + throw e }) - console.error(e) - }) - .run() - .catch(async (e) => { - throw e - }) - } catch (e: any) { - if (user) { - userRep.makeNotify(user, { - title: `Task is failed`, - type: ENotificationType.Error, - description: (e.cause as any)?.error?.message || e?.message || "Can't execute task", - priority: 2, - target: { - targetType: ENotificationTarget.WorkflowTask, - targetId: task.id - } + } catch (e: any) { + if (user) { + userRep.makeNotify(user, { + title: `Task is failed`, + type: ENotificationType.Error, + description: (e.cause as any)?.error?.message || e?.message || "Can't execute task", + priority: 2, + target: { + targetType: ENotificationTarget.WorkflowTask, + targetId: task.id + } + }) + this.cachingService.set('USER_EXECUTING_TASK', user.id, Date.now()) + } + await this.updateTaskEventFn(em, task, ETaskStatus.Failed, { + details: (e.cause as any)?.error?.message || e?.message || "Can't execute task", + clientId: api.id }) - this.cachingService.set('USER_EXECUTING_TASK', user.id, Date.now()) + console.error(e) + return false } - await this.updateTaskEventFn(em, task, ETaskStatus.Failed, { - details: (e.cause as any)?.error?.message || e?.message || "Can't execute task", - clientId: api.id - }) - console.error(e) - return false - } - }, task.computedWeight) - } catch (e) { - console.error(e) - await this.updateTaskEventFn(em, task, ETaskStatus.Failed, { - details: `Can't execute task ${task.id}` - }) + }, task.computedWeight) + } catch (e) { + console.error(e) + await this.updateTaskEventFn(em, task, ETaskStatus.Failed, { + details: `Can't execute task ${task.id}` + }) + } } + await this.cachingService.set('LAST_TASK_CLIENT', -1, Date.now()) + await em.flush() + } else { + if (tries < 100) tries++ } - await this.cachingService.set('LAST_TASK_CLIENT', -1, Date.now()) - await em.flush() + } catch (e) { + console.warn(e) } - } catch (e) { - console.warn(e) + await delay(tries * 10) } - delay(100).then(() => this.pickingJob()) } async setClientStatus(clientId: string, status: EClientStatus, msg?: string) { diff --git a/server/services/mikro-orm.ts b/server/services/mikro-orm.ts index 4982f36..2e8a16f 100644 --- a/server/services/mikro-orm.ts +++ b/server/services/mikro-orm.ts @@ -10,10 +10,15 @@ export class MikroORMInstance { private constructor() { this.logger = new Logger('MikroORMInstance') this.orm = MikroORM.init(dbConfig).then(async (orm) => { - const generator = orm.getSchemaGenerator() - await generator.updateSchema().catch((e) => { - this.logger.i('init', 'Schema is updated', e) - }) + const generator = orm.schema + await generator + .updateSchema() + .then(() => { + this.logger.i('init', 'Schema is updated') + }) + .catch((e) => { + this.logger.w('init', 'Update schema error', e) + }) this.logger.i('init', 'MikroORM initialized') return orm })