-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka.html
561 lines (407 loc) · 32 KB
/
kafka.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0, user-scalable=yes">
<title>kafka</title>
<style type="text/css">
*{margin:0;padding:0;}
body {
font:13.34px helvetica,arial,freesans,clean,sans-serif;
color:black;
line-height:1.4em;
background-color: #F8F8F8;
padding: 0.7em;
}
p {
margin:1em 0;
line-height:1.5em;
}
table {
font-size:inherit;
font:100%;
margin:1em;
}
table th{border-bottom:1px solid #bbb;padding:.2em 1em;}
table td{border-bottom:1px solid #ddd;padding:.2em 1em;}
input[type=text],input[type=password],input[type=image],textarea{font:99% helvetica,arial,freesans,sans-serif;}
select,option{padding:0 .25em;}
optgroup{margin-top:.5em;}
pre,code{font:12px Monaco,"Courier New","DejaVu Sans Mono","Bitstream Vera Sans Mono",monospace;}
pre {
margin:1em 0;
font-size:12px;
background-color:#eee;
border:1px solid #ddd;
padding:5px;
line-height:1.5em;
color:#444;
overflow:auto;
-webkit-box-shadow:rgba(0,0,0,0.07) 0 1px 2px inset;
-webkit-border-radius:3px;
-moz-border-radius:3px;border-radius:3px;
}
pre code {
padding:0;
font-size:12px;
background-color:#eee;
border:none;
}
code {
font-size:12px;
background-color:#f8f8ff;
color:#444;
padding:0 .2em;
border:1px solid #dedede;
}
img{border:0;max-width:100%;}
abbr{border-bottom:none;}
a{color:#4183c4;text-decoration:none;}
a:hover{text-decoration:underline;}
a code,a:link code,a:visited code{color:#4183c4;}
h2,h3{margin:1em 0;}
h1,h2,h3,h4,h5,h6{border:0;}
h1{font-size:170%;border-top:4px solid #aaa;padding-top:.5em;margin-top:1.5em;}
h1:first-child{margin-top:0;padding-top:.25em;border-top:none;}
h2{font-size:150%;margin-top:1.5em;border-top:4px solid #e0e0e0;padding-top:.5em;}
h3{margin-top:1em;}
hr{border:1px solid #ddd;}
ul{margin:1em 0 1em 2em;}
ol{margin:1em 0 1em 2em;}
ul li,ol li{margin-top:.5em;margin-bottom:.5em;}
ul ul,ul ol,ol ol,ol ul{margin-top:0;margin-bottom:0;}
blockquote{margin:1em 0;border-left:5px solid #ddd;padding-left:.6em;color:#555;}
dt{font-weight:bold;margin-left:1em;}
dd{margin-left:2em;margin-bottom:1em;}
sup {
font-size: 0.83em;
vertical-align: super;
line-height: 0;
}
kbd {
display: inline-block;padding: 3px 5px;font-size: 11px;line-height: 10px;color: #555;vertical-align: middle;background-color: #fcfcfc;border: solid 1px #ccc;border-bottom-color: #bbb;border-radius: 3px;box-shadow: inset 0 -1px 0 #bbb;
}
* {
-webkit-print-color-adjust: exact;
}
@media screen and (min-width: 914px) {
body {
width: 854px;
margin:0 auto;
}
}
@media print {
table, pre {
page-break-inside: avoid;
}
pre {
word-wrap: break-word;
}
}
</style>
</head>
<body>
<h2 id="toc_0">Kafka 海滩拾贝</h2>
<hr>
<p><em>written by Alex Stocks on 2017/02/02</em></p>
<h3 id="toc_1">0 引言</h3>
<hr>
<p>大年初三(2017-01-30)下午15:56公司线上kafka集群(3 instances)挂了一台,导致整个线上服务瘫痪,由于正处于假期时间,用手机联系了相关同事手工重启系统且待系统服务正常后就暂时弃置一边。</p>
<p>今日稍有闲暇,赶往公司想把事故复盘一遍,以追踪事故原因。下面分别列出相关问题,并记录解决方法。</p>
<h3 id="toc_2">1 kafka启动与无法连接broker问题若干</h3>
<hr>
<p>由于测试环境机器数目有限,我便在一个测试机器启动了3个kafka实例(kafka_2.11-0.10.1.1)和1个zk实例(zookeeper-3.4.9),并写了相关python程序去连接kafka集群。</p>
<h4 id="toc_3">Q1 kafka broker无法启动</h4>
<h2 id="toc_4"></h2>
<p>broker无法启动大致有两个原因:第一是内存不足,第二是jmx无法启动。
可以通过修改kafka-server-start.sh如下一行代码来确定broker修改JVM HEAP size:</p>
<div><pre><code class="language-none">export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"</code></pre></div>
<p>把kafka-run-class.sh如下代码删除掉就可以关闭kafka的JMX:</p>
<div><pre><code class="language-none"># JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
fi
# JMX port to use
if [ $JMX_PORT ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi</code></pre></div>
<p>把JMX关掉的坏处在于一些依赖于JMX(KafkaOffsetMonitor)就无法工作了。</p>
<h4 id="toc_5">Q2 python程序无法连接kafka broker</h4>
<h2 id="toc_6"></h2>
<p>程序一直报如下错误:</p>
<div><pre><code class="language-none">kafka.errors.NoBrokersAvailable: NoBrokersAvailable</code></pre></div>
<p>首先查看了kafka集群的网络监听情况。执行命令 netstat -nlp | grep 9092 得到如下结果:</p>
<div><pre><code class="language-none">tcp6 0 0 127.0.0.1:19092 :::* LISTEN 18782/java
tcp6 0 0 127.0.0.1:29092 :::* LISTEN 19111/java
tcp6 0 0 127.0.0.1:9092 :::* LISTEN 18406/java</code></pre></div>
<p>注意到了kafka实例使用的tcp协议的版本是tcp6,google一番后发现解决方法是把如下语句加入你的bash启动脚本(.bash_profile or .bashrc):</p>
<div><pre><code class="language-none">export _JAVA_OPTIONS="-Djava.net.preferIPv4Stack=true"</code></pre></div>
<p>再次执行上面的命令查验后,结果如下:</p>
<div><pre><code class="language-none">tcp 0 0 127.0.0.1:19092 0.0.0.0:* LISTEN 25551/java
tcp 0 0 127.0.0.1:29092 0.0.0.0:* LISTEN 25842/java
tcp 0 0 127.0.0.1:9092 0.0.0.0:* LISTEN 25254/java</code></pre></div>
<p>客户端程序是kafka python(https://github.com/dpkp/kafka-python)写的,再次启动后报如下错误:</p>
<div><pre><code class="language-none">Traceback (most recent call last):
File "producer.py", line 34, in <module>
producer_timings['python_kafka_producer'] = python_kafka_producer_performance()
File "producer.py", line 21, in python_kafka_producer_performance
producer = KafkaProducer(bootstrap_servers=brokers)
File "/usr/local/lib/python2.7/dist-packages/kafka/producer/kafka.py", line 328, in __init__
**self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 202, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 791, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable</code></pre></div>
<p>再次google后,在producer的参数里加上api_conf字段解决问题,修改后的代码如下:</p>
<div><pre><code class="language-none">brokers = bootstrap_servers.split(',')
producer = KafkaProducer(
bootstrap_servers=brokers,
api_version = (0, 10))</code></pre></div>
<h3 id="toc_7">2 kafka集群稳定性测试</h3>
<hr>
<p>测试环境:</p>
<ul>
<li>在一台机器上部署1个zk实例(zookeeper-3.4.8);</li>
<li>在同一台机器上部署3个kafka实例(kafka_2.11-0.10.1.1);</li>
<li>在同一台机器上部署1个kafka producer实例(基于kafka-python库,以下简称P);</li>
<li>在同一台机器上部署1个kafka consumer实例(基于kafka-python库,以下简称C);</li>
<li>topic一个,其replica为3,partition为3;</li>
</ul>
<p>测试流程:</p>
<blockquote>
<p>case 1 kill全部kafka实例然后30s内再全部重启</p>
</blockquote>
<div><pre><code class="language-none">P与C依然能正常工作,但丢失消息若干且部分乱序。</code></pre></div>
<blockquote>
<p>case 2 kill一个kafka实例然后重启之</p>
</blockquote>
<div><pre><code class="language-none">重启kafka之前,P与C都能正常工作, 但又部分消息乱序。重启kafka实例之后,60S内P与C都与新实例建立了正常连接,且partition2以新实例为leader。</code></pre></div>
<blockquote>
<p>case 3 kill一个kafka实例,kill P然后重启P,再kill C再重启C</p>
</blockquote>
<div><pre><code class="language-none">kill P且重启之后,P与C都可以正常工作。干掉C又重启之后,P与C依然能正常工作,但丢失消息若干且部分乱序。</code></pre></div>
<blockquote>
<p>case 4 新建一个topic,其partition为3,其replica为1,然后kill掉两个kafka实例</p>
</blockquote>
<div><pre><code class="language-none">kill掉一个kafka实例后,这个topic的信息如下图:</code></pre></div>
<p><img src="../pic/kafka-topic-one-replica.png" alt="kafka-topic-one-replica"></p>
<div><pre><code class="language-none">所以kafka中topic的replica应该大于1。</code></pre></div>
<p>上面程序的相关代码详见<a href="https://github.com/AlexStocks/test/tree/master/kafka/kafka_failure_test">kafka failure test</a>。</p>
<p>不改变测试环境其他条件,仅改变topic的replica为1的情况下,再次以下测试:</p>
<blockquote>
<p>case 1 kill全部kafka实例,3分钟后再全部重启</p>
</blockquote>
<div><pre><code class="language-none">P与C依然能正常工作,但丢失消息若干且部分乱序。但如果P为confluent_kafka(以下简称CK)实现,则仅仅有消息乱序现象。</code></pre></div>
<blockquote>
<p>case 2 kill全部kafka实例,48分钟后再全部重启</p>
</blockquote>
<div><pre><code class="language-none">P与C依然能正常工作,但丢失消息若干。</code></pre></div>
<h3 id="toc_8">3 线上kafka集群服务恢复</h3>
<hr>
<p>第一次把线上那台死掉的机器重启后,它不断在重建数据,大约10分钟后仍然没有启动成功,目测是数据彻底乱掉了。于是我们把其数目录清空,然后再启动就成功了。</p>
<p>整个kafka集群服务恢复后,发现服务仍然很慢,通过日志发现这个kafka实例是在复制数据。这台机器从当天17:00pm开始一直到第二天08:00am才把数据重建成功,数据量约为598G,复制速率约为40G/H = 11.38KB/s。</p>
<p>到线上发现kafka数据保存时间配置如下:log.retention.hours=168,也就是保存了7天的数据。</p>
<p>参考上面的case4和这个参数,大约就知道优化方向了。</p>
<h3 id="toc_9">4 kafka消费者与broker连接不断挂掉</h3>
<hr>
<p>在上海一家做wifi软件的公司工作的时候遇到这样一个问题:kafka consumer(Java)与broker之间的连接总是不断挂掉,查看了consumer的源码(主要是poll函数)后,发现主要原因是:</p>
<div><pre><code class="language-none">consumer是单线程程序,从broker批量取出一批消息后处理,处理完毕后向broker汇报心跳,即messge process逻辑和heartbeat逻辑在一个线程上。</code></pre></div>
<p>解决方法是:设置max.partition.fetch.bytes=4096(kafka v0.9.0.0)或者max.poll.records=10(kafka v0.10.0.1),这两个参数是用来设置每次拉取消息的最大量。</p>
<p>通过缩小batch message size来缩短message process时间,从而不阻塞hearbeat上报时间,后面这种现象就再也没有发生了。</p>
<h3 id="toc_10">5 kafka使用建议及相关参数</h3>
<hr>
<h4 id="toc_11">5.1 kafka使用建议</h4>
<hr>
<ul>
<li>据B站服务端老大说经他们测试,partition数目为磁盘数目的4倍(也就是说每个盘上放4个partition)时候kafka的性能最优;</li>
<li>磁盘建议使用ssd,为了充分利用CPU提高系统吞吐率计(无论磁盘顺序写再怎么快也跟不上内存和cpu的吞吐率);</li>
<li><p>如果使用SATA或者ATA磁盘,为了提高吞吐率建议使用多个磁盘,通过log.dirs配置这些磁盘目录,如 “log.dirs=/mnt/diska,/mnt/diskb,/mnt/diskc”,但是相关优缺点自己权衡,根据参考文档2这个参数会导致kafka如下行为:</p>
<blockquote>
<p>The intention is to allow the use of multiple disks without RAID or
logical volume management.</p>
<p>It takes a comma separated list and partition replicas are randomly
distributed to the list.</p>
<p>If you get a disk error that results in an IOException the broker will shut itself down.</p>
</blockquote>
<p>另外,不要一个目录配置成ssd而另一个目录配置成SATA,否则会导致topic数据传输忽快忽慢;</p></li>
<li><p>磁盘上数据保留时间(相关参数是log.retention.hours=168)建议改为24小时或者你认为其他的合适值即可;</p></li>
<li><p>不要想当然认为kafka保存数据的过程是可靠的,broker接收收据后异步批量刷入磁盘的,为了保证数据及时写入磁盘,可以修改参数 “log.flush.interval.messages”(这个参数一般不要修改,过大则影响数据可靠性,过小则影响broker的吞吐率进而影响响应生产者和消费者的速度,详细解释见参考文档3);</p></li>
<li><p>worker数目最好与parition数目相等(小于当然也可以),鄙人自己测试当partiton数目为1而消费者为10的时候,系统响应速度急剧下降,可见消费者都把时间浪费在消息争用上了;</p></li>
<li><p>为了保证系统稳定性,replica数目最少为2;</p></li>
<li><p>生产者发送消息选择压缩方法的时候,建议选择lz4(详见参考文档1);</p></li>
<li><p>如果使用kafka的版本是v0.10以上,建议使用最新版kafka(目前是0.10.2.0),个人发现 v0.10.1.0 版本的jar包不能正确获取某个consumer group的消费者个数;</p></li>
<li><p>其实个人真心建议不要使用v0.10,使用v0.8 or v0.9即可,其中一个原因是kafka版本越新则其周围可用的工具越少,工具的更新速度实在比不上kafka版本的个更新速度,每个大版本的更新就意味着其架构的大改;</p></li>
<li><p>kafka v0.10的版本支持了offset存储在kafka上,但是他的offset提交处理速度非常慢,虽然支持异步定时提交offset,但是重启的话还是会丢,所以依赖kafka做主从同步保障数据一致性是不可能的(例如阿里的canal在mysql 集群间传递binlog式它们是绝对不会使用kafka的),也就说kafka不考虑消费者是否重复消费,当然也有大厂自己封装kafka后把每个consumer消费的offset存在别的中间件上,通过assign方式读取kafka消息来保证不重复消费kafka message;</p></li>
<li><p>不要使用github.com/wvanbergen/kafka/consumergroup包,这个包将近两年没有更新,在kafka v0.10.1.0上测试的时候发现其官方example程序不能正确建立consumer group,建议以github.com/bsm/sarama-cluster替代之;</p></li>
<li><p>kafka使用的网卡流量到达极限的70%后,就开始大量丢包;</p></li>
<li><p>kafka streaming的statestore是通过rocksdb实现的;</p></li>
<li><p>kafka数据的顺序性、重复性和完整性(是否会丢)在发送端是没有保证的,<a href="http://docs.confluent.io/2.0.0/clients/producer.html">官方文档</a>对此有详细描述。这里只谈到retries参数(librdkafka:message.send.max.retries),它是在发送失败的情况设置重新发送次数,但是熟悉消息系统的人知道:一旦有消息发送重试就可能导致本来在生产者端上顺序在前的消息A和B到了broker之后顺序却是B和A,如果要确保在broker上消息顺序也是A和B,那么可以设置max.in.flight.requests.per.connection参数为1,它确保生产者发送A成功之前不会发送B,即每次只发送一个消息(生产者的吞吐率就没法保证了);</p></li>
<li><p>目前的kafka的负载均衡只考虑磁盘负载均衡没有考虑网卡流量,譬如有的topic虽然数据少但是消费者比较多,此时网卡就没有均衡,但即使是磁盘均衡也做到不够好,它的负载均衡是以partition为维度的,但多topic的parition数据量不可能相等;</p></li>
</ul>
<h5 id="toc_12">5.1.1 broker</h5>
<hr>
<ul>
<li><p>Kafka中Topic命名规范: appname_feature_function</p>
<p>如ikurento<em>push</em>sched</p></li>
<li><p>partition数目</p>
<p>partition数目多少并不会严重影响broker性能,confluent官方层测试过10000个partition的情况。</p></li>
</ul>
<h4 id="toc_13">5.2 kafka最优参数</h4>
<hr>
<p>在Kafka Beijing Meetup(3rd)上,胡夕给出了下图各个参数集:</p>
<p><img src="../pic/kafka-best-params.jpg" alt=""></p>
<p>下面分别解释下各个参数的意义。</p>
<h5 id="toc_14">5.2.1 Broker</h5>
<hr>
<ul>
<li>log.retention.hours - 日志保存时间 (hours)。还有一个参数log.retention.bytes(日志最大字节数),bytes和minutes无论哪个先达到都会启动相应的淘汰策略</li>
<li>num.network.threads - 处理网络请求的最大线程数</li>
<li>auto.leader.rebalance.enable - 是否在broker端启动partition-rebalance线程,检查Leader分配是否平衡</li>
<li>leader.imbalance.check.interval.seconds - 周期性检查Leader分配是否平衡时间间隔</li>
<li>replica.fetch.max.bytes - replicas每次获取数据的最大字节数</li>
<li>connections.max.idle.ms - 链接超时时间,如果链接idle时间超过这个时间则会被broker关闭</li>
<li>unclean.leader.election.enable - 是否允许leader死掉的情况下,不具备ISR选举资格的replicas被选为leader</li>
<li>min.insync.replicas - 设定ISR中的最小副本数是多少,默认值为1。当且仅当request.required.acks参数设置为-1时,此参数才生效。如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required</li>
<li>max.connections.per.ip - 每个ip地址上每个broker可以被连接的最大数目</li>
<li>max.connections.per.ip.overrides - 配置针对某个特别的IP or hostname的连接个数最大限制,配置样例见<a href="https://issues.apache.org/jira/browse/KAFKA-1512">#KAFKA-512</a></li>
<li><p>offsets.topic.replication.factor - Topic _<em>consumer</em>offsets的replica值,这个值默认为1,这是因为如果cluster只有一个kafka的情况下让系统跑起来,详细说明见<a href="https://issues.apache.org/jira/browse/KAFKA-1846">KAFKA-1846</a></p>
<p><font color=blue></p>
<p>如果不修改offsets.topic.replication.factor的值,则_<em>consumer</em>offsets的replica为1,如果某个partition的leader broker宕机,那就只能去无语对苍天了。所以预防的方法就是在config/server.properties中设置offsets.topic.replication.factor=3。那么,如果忘记修改offsets.topic.replication.factor的值,有什么补救补救办法,总不能眼睁睁看着悲剧发生吧?</p>
<p>办法总是有的。可以通过kafka提供的重新分配分区工具 bin/kafka-reassign-partitions.sh 修改_<em>consumer</em>offsets的replica,操作步骤如下:
1 请先准备重新分配分区配置文件replica.json:</p>
<div><pre><code class="language-none">{"version":1,
"partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},
{"topic":"__consumer_offsets","partition":1,"replicas":[1,2,0]},
{"topic":"__consumer_offsets","partition":2,"replicas":[2,0,1]},
]}</code></pre></div>
<p>2 通过如下命令执行扩容:</p>
<div><pre><code class="language-none">./bin/kafka-reassign-partitions.sh --zookeeper $zk --reassignment-json-file replica.json --execute</code></pre></div>
<p>3 查看扩容结果:</p>
<div><pre><code class="language-none">./bin/kafka-reassign-partitions.sh --zookeeper $zk --reassignment-json-file replica.json --verify</code></pre></div>
<p></font></p></li>
<li><p>offsets.topic.num.partitions - Topic _<em>consumer</em>offsets的partition值,默认为50。</p></li>
</ul>
<h5 id="toc_15">5.2.2 Producer</h5>
<hr>
<ul>
<li>batch.size - 批处理消息字节数。如果某个消息大于这个值则不会被处理</li>
<li>linger.ms - 发送延迟时间。producer端的消息发送线程到record buffer中获取消息然后立即发送,不管消息字节数是否达到batch.size,此时如果消息的数量太少就会影响吞吐率,linger参数的意义就是让发送线程发现发送消息总量太小的时候再等待ling.ms时间后再启动发送动作</li>
<li>compression.type - 压缩类型,目前最恰当的type就是lz4。当启动压缩算法后,将导致producer端消息处理时间过长,为了增大吞吐率就需要调整上面两个参数值</li>
<li>max.in.flight.requests.per.connection - 发送多少消息后等待broker端的回复。这个参数和retry配合使用,会影响消息的顺序,详细意义请参考5.1章节的内容</li>
<li>max.request.size - 请求的最大字节数。这个设置会限制producer每次批量发送请求的数目,以防发出大量的请求</li>
<li>request.required.acks - 设置数据发送数据请求的可靠性的级别。在设置request.required.acks=-1的同时,也要min.insync.replicas这个参数(可以在broker或者topic层面进行设置)的配合,这样才能发挥最大的功效,具体含义见参考文档5。</li>
<li>session.timeout.ms - 在恶劣网络环境下要注意放大这个值,以防止producer不稳定。</li>
</ul>
<h5 id="toc_16">5.2.3 Consumer</h5>
<hr>
<ul>
<li>fetch.message.max.bytes - 单次fetch消息的最大字节数。Producer端的max.message.bytes = broker端的replica.fetch.max.bytes = 消费者的fetch.message.max.bytes,这三个值一起控制了单个消息的最大长度</li>
<li>max.poll.records - 限制每回poll返回的最大数据条数。前面已经说到,fetch.message.max.bytes在v.10里面被max.poll.records替换掉,另外v.10版本中heartbeat不再在poll中触发,而是由单独的线程来完成,详细见<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread">KIP-62</a>。</li>
<li>num.consumer.fetchers - 用于fetch数据的fetcher线程数</li>
<li>auto.commit.enable - 是否自动提交offset</li>
<li>partition.assignment.strategy - <code>参考文献6</code> 建议将这个值设为 “ org.apache.kafka.clients.consumer.RoundRobinAssignor”,因为 <code>默认的策略(Range partitioning)会导致partition分配不均,如果采用默认的策略,当consumer(logstash数量*worker数量)的数量大于topic partition数量时,partition总是只会被分配给固定的一部分consumer</code>,这个策略本质是一种跨topic的shuffle,保证一个consumer group内消费多个topic时,总体保持负载均衡。关于两种策略的更详细区别请见 <code>参考文档7</code> 和 <code>参考文档14</code>。<br>
RoundRobinAssignor策略使用有以下要求:<br>
<ul>
<li>同一个Consumer Group里面的所有消费者的num.streams必须相等;<br></li>
<li>同一个Consumer Group内每个消费者订阅的主题必须相同。</li>
</ul></li>
</ul>
<p>参照MirrorMaker,这两个要求很好理解。</p>
<h5 id="toc_17">5.2.4 Os & Jvm & Mem</h5>
<hr>
<ul>
<li>linux文件系统最好使用ext3 or ext4</li>
<li>commit interval(page buffer commit)改为60s</li>
<li>vm.swappiness为0,等于告诉os尽量使用内存空间,然后才是磁盘swap的虚拟内存空间。否则os将频繁进行进程调度, 磁盘使用的波动就会比较大</li>
<li>JVM的heap区域设置为4G,kafka官方推荐6G。因为会消耗28-30g的文件系统缓存。</li>
<li>JVM的gc算法尽量使用CMS</li>
</ul>
<h5 id="toc_18">5.2.5 CPU</h5>
<hr>
<ul>
<li>CPU尽量选择多核。参考文档4认为:kafka不算是CPU消耗型服务,在主频和CPU核数之间,选择后者,将会获得多核带来的更好的并发处理性能。</li>
</ul>
<h5 id="toc_19">5.2.6 disk</h5>
<hr>
<ul>
<li>推荐使用RAID。在kafka broker中配置多目录,每个目录配置在不同的磁盘上。参考文档4 不建议使用SSD,但是经个人测试confluent kafka producer向128G的ssd上运行的kafka每秒可压测出23.8GB/s的结果,而在1T SATA上运行的kafka只能压测出4.9MB/s的结果。</li>
</ul>
<h3 id="toc_20">6 kafka lastest feature list</h3>
<hr>
<h4 id="toc_21">6.1 broker</h4>
<hr>
<ul>
<li>kafka 2017 7月份0.11版本发布后,可以在broker上进行<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging">消息重复过滤</a>,增强事务处理能力;</li>
</ul>
<h4 id="toc_22">6.2 consumer</h4>
<hr>
<ul>
<li>kafka 目前最新版本(0.10.2)已经添加<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index">OffsetsForTime功能</a>,在consumer端有API <a href="https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/Consumer.html#offsetsForTimes(java.util.Map)">offsetsForTimes</a>,可以获取某个时间范围内的消息的offset集合;</li>
</ul>
<h3 id="toc_23">7 kafka toolset</h3>
<hr>
<p>kafka自身不建议跨IDC部署,可以使用confluent的官方工具MirrorMaker或者uber开源的工具uReplicator把数据在跨IDC之间进行复制。uReplicator是基于MirrorMaker做的二次开发,但是uReplicator不能很好的支持kafka 0.10以上版本,暂不考虑。</p>
<h4 id="toc_24">7.1 MirrorMaker</h4>
<hr>
<p>MirrorMaker自身支持把多个kafka集群的数据复制到一个目的地kafka集群中。关于MirrorMake怎么使用请详细参考8到13,但是请注意,这里面很多参数是针对kafka 0.9的,在kafka 0.10里面已经无用,具体哪些无用,自己多测试即可。</p>
<p>下面重点说明conf/mirror-producer.properties的“partition.assignment.strategy”和MirrorMaker的”num.streams”。</p>
<p>“partition.assignment.strategy”关系到consumer的负载均衡策略,默认的range策略会导致负载不均衡且consumer group内consumer的个数多于topic partition的数目时多余的consumer处于idle状态无法工作, <code>参考文档6</code> 和 <code>参考文档11</code> 都提到这个问题,具体原因参见 <code>参考文档7</code> 和 <code>参考文档14</code>。采用RoundRobin策略后,所有的consumer都会处于工作状态,加快数据复制速度。</p>
<p>num.streams则是指定consumers的个数。经个人测试,共两个kafka topic,每个topic partition数目为36时,实际测试环境中num.streams给定值为40,而MirrorMaker的实际consumer的个数为72,把num.streams改为100后consumer个数也是100,所以二者不一定一致。而当consumer个数为100,只消费一个topic的时候,有36个consumer线程处于工作状态,其他线程都处于空闲状态(如下图)。所以num.streams的值应该设置为所有topic的partition数目之和。</p>
<p><img src="../pic/mirror-maker-partition.png" alt="consumer-partition"></p>
<p>至于MirrorMaker的性能还可以,想kafka集群写入13993021个长度为100B的kv后,MirrorMaker两分钟内复制完毕。</p>
<p>至于MirrorMaker的缺点,参考文档13提到 <code>当 MirrorMaker 节点发生故障时,数据复制延迟较大,对于动态添加 topic 则需要重启进程、黑白名单管理完全静态等。虽然 uReplicator 针对 MirrorMaker 进行了大量优化,但在我们的大量测试之后仍遇到众多问题,我们需要具备动态管理 MirrorMaker 进程的能力,同时我们也不希望每次都重启 MirrorMaker进程</code>,至于MirrorMaker有哪些深坑,囿于测试条件限制,个人无法给出定论。</p>
<p>个人的看法是,一个topic启动一个MirrorMaker,<code>参考文档12</code>说为了系统稳定性和高可用,建议“Always run more than one mirror-maker processes. Make sure you use the same groupId in consumer config.”。</p>
<p>至于MirrorMaker 集群自身的稳定性,<code>参考文档12</code> 认为可以通过检查MirrorMaker的consumer group的lag值来验证,确实是一个好办法。</p>
<h2 id="toc_25">参考文档</h2>
<ul>
<li>1 <a href="http://blog.yaorenjie.com/2015/03/27/Kafka-Compression-Performance-Tests/">Kafka Compression Performance Tests</a></li>
<li>2 <a href="http://grokbase.com/t/kafka/users/136mjfz5bg/new-log-dirs-property-as-opposed-to-log-dir">[Kafka-users] new log.dirs property (as opposed to log.dir)
</a></li>
<li>3 <a href="http://blog.csdn.net/lizhitao/article/details/25667831">apache kafka系列之server.properties配置文件参数说明</a></li>
<li>4 <a href="http://www.jianshu.com/p/8689901720fd">某互联网大厂kafka最佳实践</a></li>
<li>5 <a href="http://www.bijishequ.com/detail/381629?p=71">kafka数据可靠性深度解读</a> - <em>唯品会出品,里面关于“Leader选举”一节写的比较详细,尤其是“leader选举的算法非常多,比如Zookeeper的Zab、Raft以及Viewstamped Replication。而Kafka所使用的leader选举算法更像是微软的PacificA算法”这句话</em></li>
<li>6 <a href="https://mp.weixin.qq.com/s/onrBwQ0vyLJYWD_FRnNjEg">B站日志系统的前世今生</a></li>
<li>7 <a href="https://www.iteblog.com/archives/2209.html">Kafka分区分配策略</a></li>
<li>8 <a href="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring">Kafka mirroring</a></li>
<li>9 <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330">Kafka mirroring (MirrorMaker)</a></li>
<li>10 <a href="https://kafka.apache.org/documentation.html#basic_ops_mirror_maker">basic<em>ops</em>mirror_maker</a></li>
<li>11 <a href="https://mp.weixin.qq.com/s/tDbSpypnVOVK6203lZ758w">Kafka MirrorMaker使用与性能调优全解析</a></li>
<li>12 <a href="https://community.hortonworks.com/articles/79891/kafka-mirror-maker-best-practices.html">Kafka Mirror Maker Best Practices</a></li>
<li>13 <a href="http://www.sohu.com/a/211335858_262549">百亿访问量的监控平台如何炼成?</a></li>
<li>14 <a href="http://blog.csdn.net/zhanyuanlin/article/details/76021614">Kafka为Consumer分派分区:RangeAssignor和RoundRobinAssignor</a></li>
</ul>
<h2 id="toc_26">扒粪者-于雨氏</h2>
<ul>
<li>2017/02/02,于雨氏,于致真大厦。</li>
<li>2017/02/19,于雨氏,于致真大厦,添加replica为1条件下的测试结果。</li>
<li>2017/03/02,于雨氏,于致真大厦,添加“kafka使用建议”。</li>
<li>2017/03/25,于雨氏,于致真大厦,补充“kafka启动与无法连接kafka问题若干”一节。</li>
<li>2017/03/25,于雨氏,于致真大厦,补充“使用建议”一节。</li>
<li>2017/05/01,于雨氏,于致真大厦,根据kafka beijing meetup(3rd)添加5.1&5.2。</li>
<li>2017/05/04,于雨氏,于致真大厦,添加“kafka lastest feature list”一章。</li>
<li>2018/01/17,于雨氏,于海淀,添加MirrorMaker一节和consumer”partition.assignment.strategy“参数说明。</li>
</ul>
<div id="disqus_thread"></div>
<script type="text/javascript">
// disqus comment js block, added on 2016/02/10
// https://segmentfault.com/a/1190000002807674
var disqus_shortname = 'alexstocks'; // Required - Replace example with your forum shortname
var disqus_identifier = window.location.pathname; //'a unique identifier for each page where Disqus is present';
var disqus_title = document.title; // 'a unique title for each page where Disqus is present';
var disqus_url = document.URL; // window.location.origin + window.location.pathname; // 'a unique URL for each page where Disqus is present';
var disqus_config = function () {
this.page.url = window.location.href; // Replace PAGE_URL with your page's canonical URL variable
this.page.identifier = window.location.pathname; // Replace PAGE_IDENTIFIER with your page's unique identifier variable
};
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js'; dsq.setAttribute('data-timestamp', +new Date());
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
<noscript>Please enable JavaScript to view the <a href="https://disqus.com/?ref_noscript" rel="nofollow">comments powered by Disqus.</a></noscript>
<script id="dsq-count-scr" src='//' + disqus_shortname + '.disqus.com/count.js' async></script>
</body>
</html>