Skip to content

Commit d18651e

Browse files
committed
fix: move ack to redis transactions
1 parent a87d566 commit d18651e

File tree

1 file changed

+2
-13
lines changed

1 file changed

+2
-13
lines changed

src/api.js

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -301,15 +301,10 @@ export class Api {
301301
await this.redis.multi()
302302
.xDelIfEmpty(task.stream)
303303
.xDel(this.redisWorkerStreamName, task.id)
304+
.xAck(this.redisWorkerStreamName, this.redisWorkerGroupName, task.id)
304305
.sRem(this.workerSetName, task.stream)
305306
.exec()
306307
logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream })
307-
308-
await this.redis.xAck(
309-
this.redisWorkerStreamName,
310-
this.redisWorkerGroupName,
311-
task.id
312-
)
313308
} else {
314309
reclaimCounts++
315310
const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix)
@@ -325,16 +320,10 @@ export class Api {
325320
.xTrim(task.stream, 'MINID', lastId - this.redisMinMessageLifetime)
326321
.xAdd(this.redisWorkerStreamName, '*', { compact: task.stream })
327322
.xDel(this.redisWorkerStreamName, task.id)
323+
.xAck(this.redisWorkerStreamName, this.redisWorkerGroupName, task.id)
328324
.sAdd(this.workerSetName, task.stream)
329325
.exec()
330326
])
331-
332-
await this.redis.xAck(
333-
this.redisWorkerStreamName,
334-
this.redisWorkerGroupName,
335-
task.id
336-
)
337-
338327
logWorker('Compacted stream ', { stream: task.stream, taskId: task.id, newLastId: lastId - this.redisMinMessageLifetime })
339328
try {
340329
if (ydocUpdateCallback != null) {

0 commit comments

Comments
 (0)