diff --git a/examples/redis-pending/index.js b/examples/redis-pending/index.js new file mode 100644 index 0000000..707e169 --- /dev/null +++ b/examples/redis-pending/index.js @@ -0,0 +1,58 @@ +"use strict"; + +// Adapted from: https://github.com/moleculerjs/moleculer-channels/issues/74 + +const { ServiceBroker } = require("moleculer"); +const ChannelsMiddleware = require("../../types").Middleware; +const broker = new ServiceBroker({ + namespace: "test", + nodeID: "test1", + transporter: "TCP", + middlewares: [ + ChannelsMiddleware({ + adapter: "redis://127.0.0.1:6379" + }) + ] +}); + +const serviceSchema = { + name: "subscriber", + channels: { + "order.created": { + group: "mygroup", + redis: { + minIdleTime: 1000, + claimInterval: 1, + startID: "0" + }, + maxRetries: 100, + handler(payload) { + this.logger.info("Received order.created event", payload); + throw new Error(); + } + } + } +}; +broker.createService(serviceSchema); + +// Start the Moleculer broker +broker.start().then(async () => { + try { + broker.repl(); + + for (let i = 0; i < 10; i++) { + await broker.sendToChannel("order.created", { id: i, items: "test" }); + + await broker.Promise.delay(100); + } + + await broker.destroyService("subscriber"); + + setTimeout(() => { + broker.logger.info("Recreate service"); + broker.createService(serviceSchema); + }, 10000); + } catch (error) { + console.log(error); + } +}); diff --git a/src/adapters/redis.js b/src/adapters/redis.js index 3f24754..360b793 100644 --- a/src/adapters/redis.js +++ b/src/adapters/redis.js @@ -505,15 +505,32 @@ class RedisAdapter extends BaseAdapter { }) .then(() => { const pubClient = this.clients.get(this.pubName); - // 1. Delete consumer from the consumer group - // 2. Do NOT destroy the consumer group - // https://redis.io/commands/XGROUP - return pubClient.xgroup( - "DELCONSUMER", - chan.name, // Stream Name - chan.group, // Consumer Group - chan.id // Consumer ID - ); + return pubClient + .xpending( + chan.name, + chan.group, + "-", // Start + "+", // End + 10 // Max reported entries + ) + .then(pending => { + if (pending.length !== 0) { + // Don't destroy the consumer group if there are pending messages + // It might come back online in the future and process the messages + // More info: https://github.com/moleculerjs/moleculer-channels/issues/74 + return; + } + + // 1. Delete consumer from the consumer group + // 2. Do NOT destroy the consumer group + // https://redis.io/commands/XGROUP + return pubClient.xgroup( + "DELCONSUMER", + chan.name, // Stream Name + chan.group, // Consumer Group + chan.id // Consumer ID + ); + }); }) .then(() => resolve()) .catch(err => reject(err));