Kafka集群部署方案&参数推荐配置
集群部署选择
操作系统
Linux
在以下3个方面表现更胜一筹:
I/O
模型使用- 数据网络传输效率
- 社区支持度
Kafka
客户端底层使用了Java
的Selector
,Selector
在Linux
上实现的机制是epoll
,能够获得更高效的I/O
性能。
Kafka
部署在Linux
上能够享受的零拷贝(Zero Copy)
技术,即当数据在磁盘和网络进行传输是避免昂贵的内核态数据拷贝从而实现快速数据传输。
I/O模型和零拷贝会单拎出来写文章。
磁盘
由于Kafka
采用的是顺序写,所以一定程度上规避了HDD
最大的缺点,即随机写操作慢。
HDD
因易损坏造成的可靠性差等缺陷,又由Kafka
在软件层面提供机制来保证,因此大部分场景下使用HDD
性价比很高。
Kafka
在软件层面自行实现了负载均衡,随然很多大厂确实是把Kafka
底层的存储交由RAID,但目前Kfafka
在存储这方面提供了越来越便捷的高可靠性方案,
生产中使用RAID
似乎不是那么重要了。
磁盘容量
假设某个业务一天发1亿条消息到Kafka
,每条消息存2份防止丢失,消息默认保存2周,消息平均大小为1KB。
计算:
每天1亿条,每条1KB,每条2份,保存2周,即:
每天的数据量:1亿 * 1KB * 2 / 1000 / 1000 = 200GB
,另外Kafka
集群除了消息数据还有其他类型的数据,比如索引数据等,再为这些数据留出10%的空间。
因此每天数据量就是220GB。保存两周,即 220GB * 14 ≈ 3TB
,Kafka
支持数据压缩,假设压缩比是0.75,最后要规划的容量是0.75 * 3 = 2.25TB
。
通过上面的分析可以总结,规划磁盘容量时需要考虑以下几个元素:
新增消息数
消息留存时间
平均消息大小
备份数
是否启用压缩
带宽
Kafka
是一种通过网络大量进行数据传输的工具,因此带宽特别容易成为瓶颈。
普通的以太网络,带宽主要有:1Gbps千兆网络(单机带宽),10Gbps万兆网络。
与其说是带宽规划,实际上是Kafka
服务器数量规划。
假设机房环境是千兆网络,某业务目标是1小时内处理1TB业务数据。
计算:
带宽是1Gbps,即每秒处理1G的数据,假设每台Kafka
都是单机部署,通常情况下Kafka
会用到70%的带宽资源,因为其他进程可能会占用一些带宽资源。
即单机部署Kafka
,每台服务器可以用到700Mb的带宽,但是实际上,这只是它能使用的最大带宽,你不能让Kafka
服务器常规性使用这么多资源,故通常要再额外预留出2/3的资源,即单台服务器使用带宽700Mb/3≈240Mbps
**(为follower拉取留一些带宽)**。
这个2/3实际上是相当保守的,需要根据实际情况酌情减少此值。
1T=1024GB,1GB=1024MB,1MB=1024KB,1KB=1024B(字节)
1TB=1024 * 1024MB = 8 * 1024 * 1024Mb位
那么8 * 1024 * 1024Mb / 3600s= 2330Mbps
2330Mbps/234Mbps
约等于 10台服务器,如果每天需要2个副本,那么总共需要30台服务器。
A_NATE_👻:
我们曾经也认为用普通硬盘就行,换成普通硬盘导致生产者堵塞写入负载偏高,换成SSD就没事了,我们每天消息数大概50亿。
胡夕: 专栏里面只是给出一个评估的方法。具体还要结合自己的实际情况来调整。通常我们认为SSD的顺序写TPS大约是HDD的4倍。
除了纵向扩展使用SSD之外,也可以尝试一下横向扩展,增加更多的broker或HDD分散负载:)
胡夕:没有谈及CPU,是因为通常情况下Kafka不太占用CPU,因此没有这方面的最佳实践出来。
但有些情况下Kafka broker是很耗CPU的:
- server和client使用了不同的压缩算法;
- server和client版本不一致造成消息格式转换;
- broker端解压缩校验
其中前两个都能规避,第三个目前无法规避。不过相比带宽资源,CPU通常都不是瓶颈。
集群参数配置
Broker端参数
Broker
端参数也被成为静态参数(Static Configs)
,即必须在Kafka的配置文件server.properties
中进行配置的参数,增删改都需要重启Broker
才能生效。
log.dirs
指定了Broker
需要使用的若干个文件目录路径。没有默认值,需要亲自指定。log.dir
用来补充上一个参数
这两个参数设置第一个即可,并且一定要为其配置多个路径,CSV格式,逗号分隔,比如/home/kakfa1,/home/kafka2,/home/kafka3
。
有条件的话,最好保证目录挂载到不同物理磁盘,好处:
提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
能够实现故障转移:即
Failover
。Kafka
1.1 开始,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且Broker
还能正常工作。这个改进正是舍弃 RAID 方案的基础:如果没有这种Failover
,只能依靠 RAID 来提供保障。
zookeeper.connect
CSV格式 指定格式zk1:2181,zk2:2181,zk3:2181
,如果想让两套Kafka
集群使用同一套Zookeeper
集群:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2
listeners
:监听器,告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。advertise.listeners
:和listeners
比多了个advertised
。Advertised
的含义表示宣称的、公布的,就是说这组监听器是Broker
用于对外发布的。
监听器的概念,从构成上来说,它是若干个逗号分隔的三元组,每个三元组的格式为<协议名称,主机名,端口号>。
这里的协议名称可能是标准的名字,比如 PLAINTEXT
表示明文传输、SSL 表示使用 SSL 或 TLS 加密传输等;
也可能是你自己定义的协议名字,比如CONTROLLER: //localhost:9092
。
listener.security.protocol.map
一旦你自己定义了协议名称,你必须还要指定listener.security.protocol.map
参数告诉这个协议底层使用了哪种安全协议,比如指定istener.security.protocol.map=CONTROLLER:PLAINTEXT
表示CONTROLLER
这个自定义协议底层使用明文不加密传输数据。
建议:最好全部使用主机名,即 Broker 端和 Client 端应用配置中全部填写主机名。 Broker 源代码中也使用的是主机名,如果你在某些地方使用了 IP 地址进行连接,可能会发生无法连接的问题。
auto.create.topics.enable
:是否支持自动创建topic
。建议为false
,避免生产上出现很多未知topic
unclean.leader.election.enable
:是否支持脏选举。建议显式设置为false
,只能让ISR
选举,后果就是出问题的分区不可用。设置为true
会引起数据丢失。auto.leader.rebalance.enable
:是否允许定期进行Leader
选举。设置它的值为true
表示允许Kafka
定期地对一些Topic
分区进行Leader
重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中Leader
选举的最大不同在于,它不是选Leader
,而是换Leader
!比如Leader A
一直表现得很好,但若auto.leader.rebalance.enable=true
,那么有可能一段时间后Leader A
就要被强行卸任换成Leader B
。换一次Leader
代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换Leader
本质上没有任何性能收益,因此建议在生产环境中把这个参数设置成false
。log.retention.{hours|minutes|ms}
:控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低。比如log.retention.hours
=168表示保存7天。(不同Topic 根据自身业务需要,设置自己的留存时间。如果只能设置全局Broker
参数,那么势必要提取所有业务留存时间的最大值作为全局参数值,此时应该设置 Topic 级别参数把它覆盖。)log.retention.bytes
:这是指定Broker
为消息保存的总磁盘容量大小。默认值-1,即想保存多少就多少。message.max.bytes
:控制Broker
能够接收的最大消息大小。默认值1000012,不到1MB,太小。根据自身业务调大。
第一片心意:
auto.leader.rebalance.enable
关于这个参数的设置,我有一点不同的意见,官网说的是如果某个broker挂了,那分布在他上的leader副本就会自动切换到其他活着的broker上,但是挂掉的broker重启之后,集群并不会将他之前的leader副本再切换回来,这样就会使其他broker上leader副本数较多,而该broker上无leader副本(无新主题创建),从而造成负载不均衡的情况。
这时我们可以通过kafka-preferred-replica-election.sh
脚本来重新平衡集群中的leader副本。但是我们配置这个参数为true的话,controller角色就会每五分钟(默认)检查一下集群不平衡的状态,进而重新平衡leader副本。胡夕: 同意。不过实际上,线上环境贸然大面积迁移副本leader是非常有风险的事情:)
杨陆伟:
log.retention.bytes
这个参数是针对主题的吧?比如设置为100M,Kafka定期会把每个主题的日志数据留存到100M以下?胡夕: 这个参数既有broker端也有topic端,不过最终都是作用于topic的。另外算法上也不是简单的比较大小。举个例子:假设日志段大小是700MB,当前分区共有4个日志段文件,大小分别是700MB,700MB,700MB和1234B——显然1234B那个文件就是active日志段。此时该分区总的日志大小是3*700MB+1234B=2100MB+1234B,如果阈值设置为2000MB,那么超出阈值的部分就是100MB+1234B,小于日志段大小700MB,故Kafka不会执行任何删除操作,即使总大小已经超过了阈值;反之如果阈值设置为1400MB,那么超过阈值的部分就是700MB+1234B > 700MB,此时Kafka会删除最老的那个日志段文件。
咸淡一首诗:
对于failover机制,kafka会新建副本,从leader处同步最新的数据给新建副本。如果坏掉的盘是leader持久化的盘并且其他副本没有来的及从坏掉的leader分区同步最新数据,重新选举leader后岂不是也会丢失数据???
胡夕: 是的,这种情况会丢失数据。其实Kafka并没有承诺不丢失数据,而是在满足某些条件下才做持久化保证。
Topic级别参数
Topic
级别参数会覆盖全局 Broker
参数的值,而每个 Topic
都能设置自己的参数值,这就是所谓的 Topic
级别参数。
retention.ms
:该Topic
消息被保存的时长。默认是 7 天,即该Topic
只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉Broker
端的全局参数值。retention.bytes
:规定了要为该Topic
预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的Kafka
集群中会有用武之地。默认值是 -1,表示可以无限使用磁盘空间。max.message.bytes
:决定了Kafka Broker
能够正常接收该Topic
的最大消息大小
JVM参数
无脑给出一个通用的建议:将JVM
堆大小设置成 6GB ,这是目前业界比较公认的一个合理值。很多人就是使用默认的 Heap Size
来跑 Kafka
,默认的 1GB 有点小,毕竟 Kafka Broker
在与客户端进行交互时会在 JVM
堆上创建大量的 ByteBuffer
实例,Heap Size
不能太小。
Java 8,那么可以手动设置使用 G1 收集器。在没有任何调优的情况下,G1 表现得要比 CMS 出色,主要体现在更少的 Full GC,需要调整的参数更少等。
KAFKA_HEAP_OPTS
:指定堆大小。KAFKA_JVM_PERFORMANCE_OPTS
:指定 GC 参数。
操作系统参数
- 文件描述符限制:
ulimit -n
通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000
。其实设置这个参数一点都不重要,但不设置的话后果很严重,比如你会经常看到“Too many open files”的错误。 - 文件系统类型: 生产环境最好还是使用 XFS。最近有个 Kafka 使用 ZFS 的数据报告,貌似性能更加强劲,有条件的话不妨一试。
- Swappiness:一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,至少能够观测到 Broker 性能开始出现急剧下降,从而有进一步调优和诊断问题的时间。基于这个考虑,建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。
- 提交时间:向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。小结
saup007:
修改 Topic 级 max.message.bytes,还要考虑以下两个吧?
还要修改 Broker的 replica.fetch.max.bytes 保证复制正常
消费还要修改配置 fetch.message.max.bytes
Hello world:
老师说的无脑配置给jvm heap 6G大小,这应该也看机器的吧,现在机器的内存也越来越大,我们这的机器都是64G 内存,配了16G的heap,老师觉得可以优化吗
胡夕: 虽然无脑推荐6GB,但绝不是无脑推荐>6GB。一个16GB的堆Full GC一次要花多长时间啊,所以我觉得6GB可以是一个初始值,你可以实时监控堆上的live data大小,根据这个值调整heap size。只是因为大内存就直接调整到16GB,个人觉得不可取。
另外堆越小留给页缓存的空间也就越大,这对Kafka是好事。