Skip to content
This repository has been archived by the owner on Mar 22, 2022. It is now read-only.

localhost used despite remote host is set in BOOTSTRAP_SERVERS_CONFIG #16

Open
ludenus opened this issue Apr 11, 2018 · 1 comment
Open

Comments

@ludenus
Copy link

ludenus commented Apr 11, 2018

hi
I am trying to use provided examples, they worked perfectly on localhost, but do not work with remote kafka.

despite I am setting

ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node-perf-01.ops.com:9092", // list of Kafka broker hostname and port pairs

gatling attemps to connect localhost and then fails with exception

2018-04-11 14:35:43,977 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.Metadata.update(Metadata.java:241) - Updated cluster metadata version 2 to Cluster(id = hfosHSPVSyi-Cyz1JfSsHQ, nodes = [localhost:9092 (id: 1 rack: null)], partitions = [Partition(topic = events, partition = 0, leader = 1, replicas = [1,], isr = [1,])])
2018-04-11 14:35:43,986 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.NetworkClient.initiateConnect(NetworkClient.java:496) - Initiating connection to node 1 at localhost:9092.

QUESTION: Am I missing some config elsewhere?

NOTE: I have seen similar issue here zendesk/maxwell#360
But I am not sure if it is relevant. Also it is not clear for me how to configure advertised.listeners in gatling, it is not to a producer option...

Please advise.

package load

import java.util.Date

import com.github.mnogu.gatling.kafka.Predef._
import io.gatling.core.Predef._
import org.apache.kafka.clients.producer.ProducerConfig

class KafkaSimulation extends Simulation {

  val kafkaConf = kafka
    .topic("test_events") // Kafka topic name
    .properties( // Kafka producer configs
    Map(
      ProducerConfig.ACKS_CONFIG -> "1",

      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node-perf-01.ops.com:9092", // list of Kafka broker hostname and port pairs

      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ->
        "org.apache.kafka.common.serialization.StringSerializer", // in most cases, StringSerializer or ByteArraySerializer

      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ->
        "org.apache.kafka.common.serialization.StringSerializer"
    )
  )

  val scn = scenario("Kafka Test")
    .exec(
      kafka("request")
        // message to send
        .send[String]("foo" + (new Date()).toString))


  val loadProfile = scn.inject(
    rampUsers(10) over (5)
  )

  setUp(loadProfile)
    .protocols(kafkaConf)
    .maxDuration(20)


}
	acks = 1
	batch.size = 16384
	block.on.buffer.full = false
	bootstrap.servers = [node-perf-01.ops.com:9092]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.fetch.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	timeout.ms = 30000
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2018-04-11 14:35:43,488 [INFO] [pool-1-thread-1] o.a.k.c.p.ProducerConfig.logAll(AbstractConfig.java:180) - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	block.on.buffer.full = false
	bootstrap.servers = [node-perf-01.ops.com:9092]
	buffer.memory = 33554432
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.fetch.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	timeout.ms = 30000
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2018-04-11 14:35:43,493 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name bufferpool-wait-time
2018-04-11 14:35:43,495 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name buffer-exhausted-records
2018-04-11 14:35:43,496 [DEBUG] [pool-1-thread-1] o.a.k.c.Metadata.update(Metadata.java:241) - Updated cluster metadata version 1 to Cluster(id = null, nodes = [node-perf-01.ops.com:9092 (id: -1 rack: null)], partitions = [])
2018-04-11 14:35:43,502 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name connections-closed:
2018-04-11 14:35:43,502 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name connections-created:
2018-04-11 14:35:43,503 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name bytes-sent-received:
2018-04-11 14:35:43,503 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name bytes-sent:
2018-04-11 14:35:43,504 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name bytes-received:
2018-04-11 14:35:43,504 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name select-time:
2018-04-11 14:35:43,504 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name io-time:
2018-04-11 14:35:43,511 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name batch-size
2018-04-11 14:35:43,511 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name compression-rate
2018-04-11 14:35:43,511 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name queue-time
2018-04-11 14:35:43,512 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name request-time
2018-04-11 14:35:43,512 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name produce-throttle-time
2018-04-11 14:35:43,512 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name records-per-request
2018-04-11 14:35:43,513 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name record-retries
2018-04-11 14:35:43,513 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name errors
2018-04-11 14:35:43,513 [DEBUG] [pool-1-thread-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name record-size-max
2018-04-11 14:35:43,514 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.p.i.Sender.run(Sender.java:130) - Starting Kafka producer I/O thread.
2018-04-11 14:35:43,515 [INFO] [pool-1-thread-1] o.a.k.c.u.AppInfoParser.<init>(AppInfoParser.java:83) - Kafka version : 0.10.1.1
2018-04-11 14:35:43,516 [INFO] [pool-1-thread-1] o.a.k.c.u.AppInfoParser.<init>(AppInfoParser.java:84) - Kafka commitId : f10ef2720b03b247
2018-04-11 14:35:43,516 [DEBUG] [pool-1-thread-1] o.a.k.c.p.KafkaProducer.<init>(KafkaProducer.java:332) - Kafka producer started
Simulation load.JbtSimulation started...
2018-04-11 14:35:43,597 [DEBUG] [GatlingSystem-akka.actor.default-dispatcher-3] i.g.c.c.Controller.io$gatling$core$controller$Controller$$anonfun$3$$$anonfun$2(Controller.scala:59) - Setting up max duration
2018-04-11 14:35:43,617 [DEBUG] [GatlingSystem-akka.actor.default-dispatcher-4] i.g.c.c.i.Injector.startUser(Injector.scala:131) - Start user #1
2018-04-11 14:35:43,623 [DEBUG] [GatlingSystem-akka.actor.default-dispatcher-4] i.g.c.c.i.Injector.injectStreams(Injector.scala:123) - Injecting 1 users, continue=false
2018-04-11 14:35:43,624 [INFO] [GatlingSystem-akka.actor.default-dispatcher-3] i.g.c.c.Controller.applyOrElse(Controller.scala:77) - InjectionStopped expectedCount=1
2018-04-11 14:35:43,631 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.NetworkClient.maybeUpdate(NetworkClient.java:644) - Initialize connection to node -1 for sending metadata request
2018-04-11 14:35:43,631 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.NetworkClient.initiateConnect(NetworkClient.java:496) - Initiating connection to node -1 at node-perf-01.ops.com:9092.
2018-04-11 14:35:43,748 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name node--1.bytes-sent
2018-04-11 14:35:43,748 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name node--1.bytes-received
2018-04-11 14:35:43,748 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name node--1.latency
2018-04-11 14:35:43,749 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.n.Selector.pollSelectionKeys(Selector.java:327) - Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
2018-04-11 14:35:43,749 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.NetworkClient.handleConnections(NetworkClient.java:476) - Completed connection to node -1
2018-04-11 14:35:43,852 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.NetworkClient.maybeUpdate(NetworkClient.java:640) - Sending metadata request {topics=[events]} to node -1
2018-04-11 14:35:43,977 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.Metadata.update(Metadata.java:241) - Updated cluster metadata version 2 to Cluster(id = hfosHSPVSyi-Cyz1JfSsHQ, nodes = [localhost:9092 (id: 1 rack: null)], partitions = [Partition(topic = events, partition = 0, leader = 1, replicas = [1,], isr = [1,])])
2018-04-11 14:35:43,986 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.NetworkClient.initiateConnect(NetworkClient.java:496) - Initiating connection to node 1 at localhost:9092.
2018-04-11 14:35:43,987 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name node-1.bytes-sent
2018-04-11 14:35:43,987 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name node-1.bytes-received
2018-04-11 14:35:43,988 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.m.Metrics.sensor(Metrics.java:296) - Added sensor with name node-1.latency
2018-04-11 14:35:43,988 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.n.Selector.pollSelectionKeys(Selector.java:365) - Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
	at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
	at java.lang.Thread.run(Thread.java:748)
2018-04-11 14:35:43,988 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.NetworkClient.handleDisconnections(NetworkClient.java:463) - Node 1 disconnected.
2018-04-11 14:35:44,039 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.NetworkClient.initiateConnect(NetworkClient.java:496) - Initiating connection to node 1 at localhost:9092.
2018-04-11 14:35:44,039 [DEBUG] [kafka-producer-network-thread | producer-1] o.a.k.c.n.Selector.pollSelectionKeys(Selector.java:365) - Connection with localhost/127.0.0.1 disconnected
@mpastecki
Copy link

advertised.listeners is kafka broker config. No matter what you have configured in ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this is only valid for bootstrap process. Kafka broker will send you list of all brokers that are part of the cluster then and they will be sent as configured in advertised.listeners.

It seems you have advertised.listeners set to localhost.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants