Skip to content

Commit cbd2657

Browse files
authored
Merge pull request #19 from hackmdio/fix/only-check-for-orphan-with-conditions
fix/move orphan check from connection to worker
2 parents fbc53ea + 9438c8c commit cbd2657

File tree

2 files changed

+51
-49
lines changed

2 files changed

+51
-49
lines changed

src/api.js

Lines changed: 48 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -115,51 +115,19 @@ export class Api {
115115
this.redisMinMessageLifetime = number.parseInt(env.getConf('redis-min-message-lifetime') || '60000')
116116
this.redisWorkerStreamName = this.prefix + ':worker'
117117
this.redisWorkerGroupName = this.prefix + ':worker'
118+
this.workerSetName = `${this.prefix}:worker:${this.consumername}:idset`
118119
this._destroyed = false
119120
this.redis = redis.createClient({
120121
url,
121122
// scripting: https://github.com/redis/node-redis/#lua-scripts
122123
scripts: {
123-
checkAndRecoverWorkerStream: redis.defineScript({
124-
NUMBER_OF_KEYS: 1,
125-
SCRIPT: `
126-
local found = false
127-
local messages = redis.call("XREAD", "COUNT", 0, "STREAMS", "${this.redisWorkerStreamName}", "0-0")
128-
129-
if messages and #messages > 0 then
130-
local entries = messages[1][2]
131-
for _, entry in ipairs(entries) do
132-
-- Each entry is an array where entry[2] is the message fields
133-
if entry[2][2] == KEYS[1] then
134-
found = true
135-
break
136-
end
137-
end
138-
end
139-
140-
-- If stream not found in y:worker and the stream exists, add it
141-
if not found and redis.call("TYPE", KEYS[1]).ok == "stream" then
142-
redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1])
143-
end
144-
`,
145-
/**
146-
* @param {string} key
147-
*/
148-
transformArguments (key) {
149-
return [key]
150-
},
151-
/**
152-
* @param {null} x
153-
*/
154-
transformReply (x) {
155-
return x
156-
}
157-
}),
158124
addMessage: redis.defineScript({
159125
NUMBER_OF_KEYS: 1,
160126
SCRIPT: `
161127
if redis.call("EXISTS", KEYS[1]) == 0 then
162128
redis.call("XADD", "${this.redisWorkerStreamName}", "*", "compact", KEYS[1])
129+
elseif redis.call("XLEN", KEYS[1]) > 100 then
130+
redis.call("SADD", "${this.prefix}:worker:checklist", KEYS[1])
163131
end
164132
redis.call("XADD", KEYS[1], "*", "m", ARGV[1])
165133
`,
@@ -246,16 +214,6 @@ export class Api {
246214
return this.redis.addMessage(computeRedisRoomStreamName(room, docid, this.prefix), m)
247215
}
248216

249-
/**
250-
* @param {string} room
251-
* @param {string} docid
252-
*/
253-
async checkAndRecoveryWorkerStream (room, docid) {
254-
await this.redis.checkAndRecoverWorkerStream(
255-
computeRedisRoomStreamName(room, docid, this.prefix)
256-
)
257-
}
258-
259217
/**
260218
* @param {string} room
261219
* @param {string} docid
@@ -327,13 +285,15 @@ export class Api {
327285
})
328286
}
329287
tasks.length > 0 && logWorker('Accepted tasks ', { tasks })
288+
if (this.redis.isOpen) await this.redis.expire(this.workerSetName, 60 * 5)
330289
let reclaimCounts = 0
331290
await promise.all(tasks.map(async task => {
332291
const streamlen = await this.redis.xLen(task.stream)
333292
if (streamlen === 0) {
334293
await this.redis.multi()
335294
.xDelIfEmpty(task.stream)
336295
.xDel(this.redisWorkerStreamName, task.id)
296+
.sRem(this.workerSetName, task.stream)
337297
.exec()
338298
logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream })
339299
} else {
@@ -348,6 +308,7 @@ export class Api {
348308
.xTrim(task.stream, 'MINID', lastId - this.redisMinMessageLifetime)
349309
.xAdd(this.redisWorkerStreamName, '*', { compact: task.stream })
350310
.xDel(this.redisWorkerStreamName, task.id)
311+
.sAdd(this.workerSetName, task.stream)
351312
.exec()
352313
])
353314
logWorker('Compacted stream ', { stream: task.stream, taskId: task.id, newLastId: lastId - this.redisMinMessageLifetime })
@@ -413,11 +374,53 @@ export class Worker {
413374
await promise.wait(client.redisWorkerTimeout - (now - prev))
414375
}
415376
prev = now
377+
378+
await this.checkAndRecoverOrphanStreams()
416379
} catch (e) {
417380
console.error(e)
418381
}
419382
}
420383
logWorker('Ended worker process ', { id: client.consumername })
421384
})()
422385
}
386+
387+
async checkAndRecoverOrphanStreams () {
388+
if (!this.client.redis.isOpen) return
389+
const rawConsumers = await this.client.redis.xInfoConsumers(
390+
this.client.redisWorkerStreamName,
391+
this.client.redisWorkerGroupName
392+
)
393+
const consumers = rawConsumers
394+
.filter((c) => c.pending > 0 || c.inactive < this.client.redisWorkerTimeout)
395+
.map(({ name }) => name)
396+
const leader = consumers.sort()[0]
397+
if (this.client.consumername !== leader) return
398+
399+
/** @type {Set<string>} */
400+
const processingSet = new Set()
401+
for (const consumer of consumers) {
402+
const ids = await this.client.redis.sMembers(
403+
`${this.client.prefix}:worker:${consumer}:idset`
404+
)
405+
for (const id of ids) processingSet.add(id)
406+
}
407+
408+
const checkListKey = `${this.client.prefix}:worker:checklist`
409+
const checklist = await this.client.redis.sMembers(checkListKey)
410+
411+
const orphans = checklist.filter((room) => !processingSet.has(room))
412+
if (!orphans) return
413+
logWorker(`adding ${orphans.length} orphans back to worker`)
414+
415+
await Promise.all(
416+
orphans.map((room) =>
417+
this.client.redis.xAdd(
418+
this.client.redisWorkerStreamName,
419+
'*',
420+
{ compact: room }
421+
)
422+
)
423+
)
424+
await this.client.redis.del(checkListKey)
425+
}
423426
}

src/y-socket-io/y-socket-io.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@ export class YSocketIO {
157157
this.initAwarenessListeners(socket)
158158
this.initSocketListeners(socket)
159159

160-
await this.client.checkAndRecoveryWorkerStream(namespace, 'index')
161160
const doc = await this.client.getDoc(namespace, 'index')
162161

163162
if (
@@ -216,7 +215,7 @@ export class YSocketIO {
216215
this.getNamespaceString(socket.nsp),
217216
'index',
218217
Buffer.from(this.toRedis('sync-update', message))
219-
)
218+
).catch(console.error)
220219
})
221220
}
222221

@@ -240,7 +239,7 @@ export class YSocketIO {
240239
this.getNamespaceString(socket.nsp),
241240
'index',
242241
Buffer.from(this.toRedis('awareness-update', new Uint8Array(message)))
243-
)
242+
).catch(console.error)
244243
})
245244
}
246245

@@ -285,7 +284,7 @@ export class YSocketIO {
285284
this.getNamespaceString(socket.nsp),
286285
'index',
287286
Buffer.from(this.toRedis('sync-step-2', message))
288-
)
287+
).catch(console.error)
289288
}
290289
)
291290
if (doc.awareness.states.size > 0) {

0 commit comments

Comments
 (0)