diff --git a/README.md b/README.md index 38041d03..8b971b15 100644 --- a/README.md +++ b/README.md @@ -1092,18 +1092,6 @@ Result: # Troubleshooting / FAQ -## HighLevelProducer with KeyedPartitioner errors on first send - -Error: - -``` -BrokerNotAvailableError: Could not find the leader -``` - -Call `client.refreshMetadata()` before sending the first message. Reference issue [#354](https://github.com/SOHU-Co/kafka-node/issues/354) - - - ## How do I debug an issue? This module uses the [debug module](https://github.com/visionmedia/debug) so you can just run below before starting your app. diff --git a/lib/baseProducer.js b/lib/baseProducer.js index 12cb9f7d..476a011d 100644 --- a/lib/baseProducer.js +++ b/lib/baseProducer.js @@ -122,8 +122,19 @@ BaseProducer.prototype.send = function (payloads, cb) { var client = this.client; var requireAcks = this.requireAcks; var ackTimeoutMs = this.ackTimeoutMs; + const isMissingTopicMetadata = !Object.keys(client.topicMetadata).length; - client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb); + if (isMissingTopicMetadata) { + client.refreshMetadata(_.map(payloads, 'topic'), (err) => { + if (err) { + cb(err); + } + + client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb); + }); + } else { + client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb); + } }; BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {