Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[BUG]Error parsing request header. Our best guess of the apiKey is: 116 #145

Open
tsywkGo opened this issue Apr 20, 2020 · 5 comments
Open

Comments

@tsywkGo
Copy link
Contributor

tsywkGo commented Apr 20, 2020

Describe the bug
我在Pulsar上配置KoP之后,使用Golang Shopify/sarama Kafka客户端进行生产消息时出错,Pulsar错误信息:Error parsing request header. Our best guess of the apiKey is: 116sarama Kafka客户端错误信息:"kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"

To Reproduce
Steps to reproduce the behavior:

  1. 我配置了etc/pulsar/standalone.conf,并且正常启动,具体配置如下:
### For kop config
messagingProtocols=kafka
protocolHandlerDirectory=share/java/pulsar/potocols

saslAllowedMechanisms=PLAIN

# Configuration to enable authentication and authorization
# Enable authentication
authenticationEnabled=true
# Enforce authorization
authorizationEnabled=true
  1. 我的Golang sarama Kafka Producer程序,如下:
func main() {
	config := sarama.NewConfig()

	config.Net.SASL.Enable = true
	config.Net.SASL.User = "test_app/data"
	config.Net.SASL.Password = "token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0X3VzZXIifQ.qWTV0-8T8DrNzxKIHhRc9BoWxiVUxCdiB2TtKsMC9yo"

	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	config.Producer.Return.Successes = true

	config.Version = sarama.V2_0_1_0

	producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Printf("unable to create kafka producer: %q\n", err)
		os.Exit(-1)
	}
	defer producer.Close()

	msg := &sarama.ProducerMessage{
		Topic: "app_binlog",
		Value: sarama.ByteEncoder([]byte("KoP Produce 测试")),
		Key:   sarama.StringEncoder("key"),
	}

	_, _, err = producer.SendMessage(msg)
	if err != nil {
		fmt.Printf("unable to send msg to kafka: %q\n", err)
		os.Exit(-1)
	}

	fmt.Printf("KoP 测试 produce msg success\n")
}

我运行我的程序,然后出错:unable to create kafka producer: "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"
我尝试修改了config.Version参数为sarama.V2_0_1_0, sarama.V2_0_0_0, sarama.V2_1_0_0都是同样的错误
3. 我在运行我的生产程序时,Pulsar出错了,错误信息:Error parsing request header. Our best guess of the apiKey is: 116

Expected behavior
想请问一下,怎样可以解决。

Screenshots
这个是Pulsar详细错误信息

13:09:50.436 [pulsar-io-50-6] INFO  io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - channel inactive [id: 0xec79fc2c, L:/10.55.3.187:9092 ! R:/192.168.65.5:61857]
13:09:50.436 [pulsar-io-50-6] INFO  io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - close channel [id: 0xec79fc2c, L:/10.55.3.187:9092 ! R:/192.168.65.5:61857]
13:09:50.800 [pulsar-io-50-8] INFO  io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler - KafkaProtocolHandler listeners PLAINTEXT://ali-c-inf-testing01.bj:9092 not contains type SSL
13:09:50.800 [pulsar-io-50-8] INFO  io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - channel active: [id: 0xbcd72325, L:/10.55.3.187:9092 - R:/192.168.65.5:61858]
13:09:50.859 [pulsar-io-50-8] ERROR io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - Caught error in handler, closing channel
org.apache.kafka.common.errors.InvalidRequestException: Error parsing request header. Our best guess of the apiKey is: 116
Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'client_id': Error reading string of length 28719, only 105 bytes available
	at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77) ~[?:?]
	at org.apache.kafka.common.requests.RequestHeader.parse(RequestHeader.java:121) ~[?:?]
	at io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.byteBufToRequest(KafkaCommandDecoder.java:96) ~[?:?]
	at io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.channelRead(KafkaCommandDecoder.java:135) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) [io.netty-netty-codec-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) [io.netty-netty-codec-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) [io.netty-netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) [io.netty-netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [io.netty-netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]

Additional context

@sijie
Copy link
Member

sijie commented Apr 23, 2020

@jiazhai Can you help with this question?

@jiazhai
Copy link
Contributor

jiazhai commented Apr 23, 2020

@yuanweikang2020 which version is are you using?
https://github.com/streamnative/kop/blob/master/integrations/golang-sarama/go.mod
mentioned this version:
require github.com/Shopify/sarama v1.24.1

@tsywkGo
Copy link
Contributor Author

tsywkGo commented May 6, 2020

我切换了指定版本,但是还是出错了,pulsar错误信息:

15:06:18.967 [pulsar-io-50-4] WARN  io.netty.channel.ChannelInitializer - Failed to initialize a channel. Closing: [id: 0xf8263bea, L:/10.55.3.187:9092 - R:/192.168.25.254:58424]
java.lang.NoClassDefFoundError: io/streamnative/pulsar/handlers/kop/KafkaRequestHandler
	at io.streamnative.pulsar.handlers.kop.KafkaChannelInitializer.initChannel(KafkaChannelInitializer.java:72) ~[?:?]
	at io.streamnative.pulsar.handlers.kop.KafkaChannelInitializer.initChannel(KafkaChannelInitializer.java:32) ~[?:?]
	at io.netty.channel.ChannelInitializer.initChannel(ChannelInitializer.java:129) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.ChannelInitializer.handlerAdded(ChannelInitializer.java:112) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.callHandlerAdded(AbstractChannelHandlerContext.java:956) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(DefaultChannelPipeline.java:609) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.DefaultChannelPipeline.access$100(DefaultChannelPipeline.java:46) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.DefaultChannelPipeline$PendingHandlerAddedTask.execute(DefaultChannelPipeline.java:1463) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.DefaultChannelPipeline.callHandlerAddedForAllHandlers(DefaultChannelPipeline.java:1115) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.DefaultChannelPipeline.invokeHandlerAddedIfNeeded(DefaultChannelPipeline.java:650) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:502) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:417) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:474) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) [io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) [io.netty-netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]

@jiazhai
Copy link
Contributor

jiazhai commented May 6, 2020

@yuanweikang2020 This looks like the package not loaded well. Is the kop.nar placed well under dir: share/java/pulsar/potocols? Is there any other error logs in the broker side?

@sijie
Copy link
Member

sijie commented Jul 1, 2020

@yuanweikang2020 any updates about Jia's last comment?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

3 participants