Skip to content

Commit

Permalink
feat(service/comfyui): fix blob save to em cause mem leak
Browse files Browse the repository at this point in the history
  • Loading branch information
tctien342 committed Jan 21, 2025
1 parent 7a89bb1 commit 54f0042
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 11 deletions.
25 changes: 20 additions & 5 deletions server/services/caching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { RedisService } from './redis'

export type TCachingKeyMap = {
CLIENT_STATUS: CustomEvent<EClientStatus>
CLIENT_LOG: CustomEvent<{ m: string; t: number }>
CLIENT_LOG: CustomEvent<{ m: string; t: string }>
SYSTEM_MONITOR: CustomEvent<TMonitorEvent>
LAST_TASK_CLIENT: CustomEvent<number>
PREVIEW: CustomEvent<{ blob64: string }>
Expand All @@ -24,6 +24,7 @@ enum ECachingType {
MEMORY = 'memory',
REDIS = 'redis'
}
const PERSISTENT_KEYS: (keyof TCachingKeyMap)[] = ['CLIENT_STATUS', 'LAST_TASK_CLIENT', 'USER_BALANCE']

class CachingService extends EventTarget {
private logger: Logger
Expand Down Expand Up @@ -74,7 +75,7 @@ class CachingService extends EventTarget {
}
}

async set(category: keyof TCachingKeyMap, id: string | number, value: any) {
async emit<K extends keyof TCachingKeyMap>(category: K, id: string | number, value: TCachingKeyMap[K]['detail']) {
const key = `${category}:${id}`
switch (this.cache.type) {
case ECachingType.MEMORY: {
Expand All @@ -87,19 +88,33 @@ class CachingService extends EventTarget {
}
})
)
this.cache.client.set(key, JSON.stringify(value))
break
}
case ECachingType.REDIS: {
await Promise.all([
this.cache.client.redis.publish(key, JSON.stringify(value)),
this.cache.client.redis.publish(category, JSON.stringify({ id, value })),
this.cache.client.redis.set(key, JSON.stringify(value))
this.cache.client.redis.publish(category, JSON.stringify({ id, value }))
])
}
}
}

async set<K extends keyof TCachingKeyMap>(category: K, id: string | number, value: TCachingKeyMap[K]['detail']) {
const key = `${category}:${id}`
if (PERSISTENT_KEYS.includes(category)) {
switch (this.cache.type) {
case ECachingType.MEMORY: {
this.cache.client.set(key, JSON.stringify(value))
break
}
case ECachingType.REDIS: {
await this.cache.client.redis.set(key, JSON.stringify(value))
}
}
}
this.emit(category, id, value)
}

async get<K extends keyof TCachingKeyMap>(
category: K,
id: string | number
Expand Down
62 changes: 56 additions & 6 deletions server/services/comfyui.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,43 @@ export class ComfyPoolInstance {
}
}

private handleFileOutput = async (
fileBlob: Blob,
info: {
key: string
idx: number
task: WorkflowTask
workflow: Workflow
},
attachment: AttachmentService,
em: Awaited<ReturnType<Awaited<ReturnType<typeof MikroORMInstance.getInstance>['getEM']>>>
) => {
const { key, idx, task, workflow } = info
const buff = Buffer.from(await fileBlob.arrayBuffer())
const extension = mine.getExtension(fileBlob.type)
const tmpName = extension ? `${task.id}_${key}_${idx}.${extension}` : `${task.id}_${key}_${idx}`

const uploaded = await attachment.uploadFile(buff, `${tmpName}`)
if (uploaded) {
const fileInfo = await attachment.getFileURL(tmpName)
const outputAttachment = em.create(
Attachment,
{
fileName: tmpName,
size: buff.byteLength,
type: EValueType.File,
storageType: fileInfo?.type === EAttachmentType.LOCAL ? EStorageType.LOCAL : EStorageType.S3,
status: EAttachmentStatus.UPLOADED,
task,
workflow
},
{ partial: true }
)
em.persist(outputAttachment)
return outputAttachment.id
}
}

private async pickingJob() {
let tries = 1
const pool = this.pool
Expand Down Expand Up @@ -421,20 +458,33 @@ export class ComfyPoolInstance {
if (Array.isArray(tmpOutput[key])) {
tmpOutput[key] = (await Promise.all(
tmpOutput[key].map(async (v, idx) => {
console.log('out', key, v)
if (v instanceof Blob) {
// Check if v is Video, Image or others
const blobType = classifyBlob(v)
switch (blobType) {
case 'image': {
await this.handleImageOutput(v, { key, idx, task, workflow }, attachment, em)
break
return await this.handleImageOutput(
v,
{ key, idx, task, workflow },
attachment,
em
)
}
case 'video': {
await this.handleVideoOutput(v, { key, idx, task, workflow }, attachment, em)
break
return await this.handleVideoOutput(
v,
{ key, idx, task, workflow },
attachment,
em
)
}
default: {
return await this.handleFileOutput(
v,
{ key, idx, task, workflow },
attachment,
em
)
}
}
}
Expand All @@ -444,7 +494,7 @@ export class ComfyPoolInstance {
}
}
const outputConfig = workflow.mapOutput
const outputData = Object.keys(outputConfig || {}).reduce(
let outputData = Object.keys(outputConfig || {}).reduce(
(acc, val) => {
if (tmpOutput[val] && outputConfig?.[val]) {
acc[val] = {
Expand Down

0 comments on commit 54f0042

Please sign in to comment.