From 51b6292340b12af423c1e331ab5eda0f3beb1852 Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Thu, 13 Jun 2019 17:24:41 +0200 Subject: [PATCH] Fix describe configs for multiple brokers (#1772) (#1280) This fixes describe configs when requesting broker configs for clusters with more than 1 broker. Instead of sending all requests to all brokers, requests for broker configs are sent to the specific broker. Topic config requests can go to any broker (follows the logic of the Java SDK). Also reworked a bit to fit better into api support functionality. Fixes #1172. --- lib/kafkaClient.js | 64 +++++++++++++++++++------------- lib/protocol/protocol.js | 32 +++++++++++++--- lib/protocol/protocolVersions.js | 6 ++- test/test.admin.js | 2 +- 4 files changed, 72 insertions(+), 32 deletions(-) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 4da3c704..98164574 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -1379,6 +1379,12 @@ KafkaClient.prototype.describeConfigs = function (payload, callback) { return callback(new Error('Client is not ready (describeConfigs)')); } let err; + + // Broker resource requests must go to the specific node + // other requests can go to any node + const brokerResourceRequests = []; + const nonBrokerResourceRequests = []; + _.forEach(payload.resources, function (resource) { if (resourceTypeMap[resource.resourceType] === undefined) { err = new Error(`Unexpected resource type ${resource.resourceType} for resource ${resource.resourceName}`); @@ -1386,39 +1392,47 @@ KafkaClient.prototype.describeConfigs = function (payload, callback) { } else { resource.resourceType = resourceTypeMap[resource.resourceType]; } + + if (resource.resourceType === resourceTypeMap['broker']) { + brokerResourceRequests.push(resource); + } else { + nonBrokerResourceRequests.push(resource); + } }); + if (err) { return callback(err); } - const brokers = this.brokerMetadata; - async.mapValuesLimit( - brokers, - this.options.maxAsyncRequests, - (brokerMetadata, brokerId, cb) => { - const broker = this.brokerForLeader(brokerId); - if (!broker || !broker.isConnected()) { - return cb(new errors.BrokerNotAvailableError('Broker not available (describeConfigs)')); - } - - const correlationId = this.nextId(); - let apiVersion = 0; - if (broker.apiSupport && broker.apiSupport.describeConfigs) { - apiVersion = broker.apiSupport.describeConfigs.max; + async.parallelLimit([ + (cb) => { + if (nonBrokerResourceRequests.length > 0) { + this.sendRequestToAnyBroker('describeConfigs', [{ resources: nonBrokerResourceRequests, includeSynonyms: payload.includeSynonyms }], cb); + } else { + cb(null, []); } - apiVersion = Math.min(apiVersion, 2); - const request = protocol.encodeDescribeConfigsRequest(this.clientId, correlationId, payload, apiVersion); - this.sendWhenReady(broker, correlationId, request, protocol.decodeDescribeConfigsResponse(apiVersion), cb); }, - (err, results) => { - if (err) { - callback(err); - return; - } - results = _.values(results); - callback(null, _.merge.apply({}, results)); + ...brokerResourceRequests.map(r => { + return (cb) => { + this.sendRequestToBroker(r.resourceName, 'describeConfigs', [{ resources: [r], includeSynonyms: payload.includeSynonyms }], cb); + }; + }) + ], this.options.maxAsyncRequests, (err, result) => { + if (err) { + return callback(err); } - ); + + callback(null, _.flatten(result)); + }); +}; + +/** + * Sends a request to any broker in the cluster + */ +KafkaClient.prototype.sendRequestToAnyBroker = function (requestType, args, callback) { + // For now just select the first broker + const brokerId = Object.keys(this.brokerMetadata)[0]; + this.sendRequestToBroker(brokerId, requestType, args, callback); }; module.exports = KafkaClient; diff --git a/lib/protocol/protocol.js b/lib/protocol/protocol.js index 445c10e2..c86cb7e9 100644 --- a/lib/protocol/protocol.js +++ b/lib/protocol/protocol.js @@ -1668,7 +1668,19 @@ function decodeVersionsResponse (resp) { return error || versions; } -function encodeDescribeConfigsRequest (clientId, correlationId, payload, apiVersion) { +function encodeDescribeConfigsRequest (clientId, correlationId, payload) { + return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 0); +} + +function encodeDescribeConfigsRequestV1 (clientId, correlationId, payload) { + return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 1); +} + +function encodeDescribeConfigsRequestV2 (clientId, correlationId, payload) { + return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 2); +} + +function _encodeDescribeConfigsRequest (clientId, correlationId, payload, apiVersion) { let request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.describeConfigs, apiVersion); const resources = payload.resources; request.Int32BE(resources.length); @@ -1692,10 +1704,16 @@ function encodeDescribeConfigsRequest (clientId, correlationId, payload, apiVers return encodeRequestWithLength(request.make()); } -function decodeDescribeConfigsResponse (apiVersion) { - return function (resp) { - return _decodeDescribeConfigsResponse(resp, apiVersion); - }; +function decodeDescribeConfigsResponse (resp) { + return _decodeDescribeConfigsResponse(resp, 0); +} + +function decodeDescribeConfigsResponseV1 (resp) { + return _decodeDescribeConfigsResponse(resp, 1); +} + +function decodeDescribeConfigsResponseV2 (resp) { + return _decodeDescribeConfigsResponse(resp, 2); } function _decodeDescribeConfigsResponse (resp, apiVersion) { @@ -1858,4 +1876,8 @@ exports.decodeListGroups = decodeListGroups; exports.encodeVersionsRequest = encodeVersionsRequest; exports.decodeVersionsResponse = decodeVersionsResponse; exports.encodeDescribeConfigsRequest = encodeDescribeConfigsRequest; +exports.encodeDescribeConfigsRequestV1 = encodeDescribeConfigsRequestV1; +exports.encodeDescribeConfigsRequestV2 = encodeDescribeConfigsRequestV2; exports.decodeDescribeConfigsResponse = decodeDescribeConfigsResponse; +exports.decodeDescribeConfigsResponseV1 = decodeDescribeConfigsResponseV1; +exports.decodeDescribeConfigsResponseV2 = decodeDescribeConfigsResponseV2; diff --git a/lib/protocol/protocolVersions.js b/lib/protocol/protocolVersions.js index b045fd3f..4666a361 100644 --- a/lib/protocol/protocolVersions.js +++ b/lib/protocol/protocolVersions.js @@ -50,7 +50,11 @@ const API_MAP = { [p.encodeCreateTopicRequestV1, p.decodeCreateTopicResponseV1] ], deleteTopics: null, - describeConfigs: [[p.encodeDescribeConfigsRequest, p.decodeDescribeConfigsResponse]], + describeConfigs: [ + [p.encodeDescribeConfigsRequest, p.decodeDescribeConfigsResponse], + [p.encodeDescribeConfigsRequestV1, p.decodeDescribeConfigsResponseV1], + [p.encodeDescribeConfigsRequestV2, p.decodeDescribeConfigsResponseV2] + ], saslAuthenticate: [[p.encodeSaslAuthenticationRequest, p.decodeSaslAuthenticationResponse]] }; diff --git a/test/test.admin.js b/test/test.admin.js index 2fdc138a..f22b4a91 100644 --- a/test/test.admin.js +++ b/test/test.admin.js @@ -188,7 +188,7 @@ describe('Admin', function () { }; admin.describeConfigs(payload, function (error, res) { should.not.exist(res); - error.should.have.property('message').and.containEql('Unexpected broker id'); + error.should.have.property('message').and.containEql('No broker with id ' + brokerId); done(); }); });