From b1b5d5ba59508816f6ad9428e1278680f9984d5e Mon Sep 17 00:00:00 2001 From: Assaf Oren Date: Sun, 6 Dec 2020 09:02:18 +0200 Subject: [PATCH] Handle 'offsetOutOfRange' error in consumer (similar to the handling in consumerGroup) --- lib/consumer.js | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/lib/consumer.js b/lib/consumer.js index d85a7351..4363feb8 100644 --- a/lib/consumer.js +++ b/lib/consumer.js @@ -100,6 +100,26 @@ Consumer.prototype.connect = function () { self.fetch(); }); }); + + this.on('offsetOutOfRange', err => { + self.pause(); + const offsetObj = new Offset(self.client); + // we can ignore this since we are already forwarding error event emitted from client + offsetObj.on('error', _.noop); + offsetObj.fetch(self.payloads, (err, serverOffsets) => { + if (err) { + self.emit('error', err); + return; + } + self.payloads.forEach(function (payload) { + let offset = _.head(serverOffsets[payload.topic][payload.partition]); + if (offset === -1) offset = 0; + payload.offset = offset; + logger.debug(`Fixing offset for topic '${payload.topic}' partition '${payload.partition}' to ${offset}`); + }); + self.resume(); + }); + }); }; Consumer.prototype.init = function () {