Kafka 为新创建的主题提供了很多默认参数配置。可以通过管理工具为每个主题单独配置一些参数。还可以将服务器提供的默认配置作为基准,应用于集群内的大部分主题。

1、num.partitions

该参数指定了新创建的主题将包含多少个分区,如果启用了自动创建主题的功能,自动创建的主题的分区数量就是此参数指定的值。

默认值为1,即一个分区。

可以增加主题的分区数量,但是不能减少。

Kafka 集群通过分区来实现主题的横向伸缩。在有新的 broker 加入到集群中时,通过分区数来实现集群的负载均衡将变得十分重要。

可以将主题的分区数量设置为 broker 数量的倍数,可以让分区均衡地分布到 broker 上,进而均衡地分布消息负载。

下面是一个创建多个分区 topic 的示例。kafka 集群有5个节点,创建具有10个分区的主题,分区复制因子为1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 10 --topic test-topic-partition

$ kafka-topics.sh --bootstrap-server localhost:9092 --describe test-topic-partition
Topic: test-topic-partition TopicId: 5tzFvVZvT7qtjnVcK_DB0w PartitionCount: 10 ReplicationFactor: 1 Configs:
Topic: test-topic-partition Partition: 0 Leader: 201 Replicas: 201 Isr: 201
Topic: test-topic-partition Partition: 1 Leader: 204 Replicas: 204 Isr: 204
Topic: test-topic-partition Partition: 2 Leader: 202 Replicas: 202 Isr: 202
Topic: test-topic-partition Partition: 3 Leader: 203 Replicas: 203 Isr: 203
Topic: test-topic-partition Partition: 4 Leader: 205 Replicas: 205 Isr: 205
Topic: test-topic-partition Partition: 5 Leader: 201 Replicas: 201 Isr: 201
Topic: test-topic-partition Partition: 6 Leader: 204 Replicas: 204 Isr: 204
Topic: test-topic-partition Partition: 7 Leader: 202 Replicas: 202 Isr: 202
Topic: test-topic-partition Partition: 8 Leader: 203 Replicas: 203 Isr: 203
Topic: test-topic-partition Partition: 9 Leader: 205 Replicas: 205 Isr: 205

在考虑分区数量时,需要考虑如下因素

  • 主题需要达到多大的吞吐量?
  • 从单个分区读取数据的最大吞吐量是多少?通常每个分区都会有一个消费者读取。如果消费者将数据写入数据库的速度不会超过 50 MBps,那么也就知道一了从一个分区读取数据的吞吐量不需要超过 50 MBps
  • 生产者向单个分区写入数据的吞吐量。生产者的速度通常比消费者快得多,估算吞吐量时,可以为生产者多估算一些。
  • 如果消息是按照不同的键写入分区,以后很难在已有主题新增分区,所以要根据未来的预期使用量而不是当前的使用量来估算吞吐量。
  • 每个 broker 包含的分区数、可用的磁盘空间和网络带宽
  • 避免使用太多分区,每个分区都会占用 broker 的内存和其他资源,还会增加元数据更新和首领选举的时间
  • 是否需要镜像数据?大型分区可能会成为镜像的瓶颈
  • 使用的是云服务的话,需要考虑虚拟机或磁盘的 IOPS(每秒输入/输出操作)限制

综上所述,如果要向主题写入和从主题读取 1 GBps 的数据,并且每个消费者可以处理 50 MBps 的数据,那么至少需要 20 个分区。这样就可以让 20 个消费者同时读取这些分区,从而达到 1 GBps 的吞吐量。

如果无法估算,根据经验来讲,将分区每天保留的数据限制在 6 GB 以内可以获得比较理想的效果

2、default.replication.factor

如果启用了自动创建主题的功能,此参数决定了新创建主题的复制系数。

建议将复制系数设置为至少比 min.insync.replicas 大1的数。为了提升故障对抗能力,如果你有足够大的集群和足够多的硬件资源,则可以将复制系数设置为比 min.insync.replicas 大 2 的数(简写为 RF++)。RF++ 让集群维护变得更加容易,也能更好地防止停机,因为它允许集群内同时发生一次计划内停机和一次计划外停机。对于典型的集群,这意味着每个分区至少要有 3 个副本。如果在滚动部署或升级 Kafka 或底层操作系统期间出现网络交换机中断、磁盘故障或其他计划外的问题,你可以保证仍然有 1 个副本可用。

3、log.retention.ms

Kafka 通常根据配置的时间长短来决定数据可以被保留多久。

配置数据保留时长的配置有:

  • log.retention.hours,默认为 168 小时,也就是 1 周
  • log.retention.minutes
  • log.retention.ms

推荐使用 log.retention.ms,如果同时指定了上面的三个参数,Kafka 将会优先使用最小单位的那个。

根据时间保留数据是通过检查日志片段文件的最后修改时间来实现的。一般来说,最后修改时间就是日志片段的关闭时间,也就是文件中最后一条消息的时间戳。不过,如果使用管理工具在服务器间移动分区,那么最后修改时间就不准确了,这种误差可能会导致这些分区过多地保留数据。

4、log.retention.bytes

数据保留策略也可以通过计算已保留的消息的字节数量来判断旧消息是否过期。

该参数对应的是每一个分区。如果一个主题包含 8 个分区,log.retention.bytes 设置为 1 GB,那么该主题最多可以保留 8 GB 的数据。

如果配置此参数,当主题增加了分区数量,那么主题可以保留的数据也会随之增加。如果此配置改为 -1,那么分区就可以无限制地保留数据。

当同时设置根据时间保留数据和字节保留数据,那么只要任意一个条件得到满足,消息就会被删除。为了简单起见,建议只选择其中的一种保留策略。

5、log.segment.bytes

当消息到达 broker 时,它们会被追加到分区的当前日志片段上。当日志片段大小达到 log.segment.bytes 指定的上限(默认值是 1073741824,即1 GB)时,当前日志片段会被关闭,一个新的日志片段会被打开。

一旦日志片段被关闭,就可以开始进入过期倒计时。这个参数的值越小,关闭和分配新文件就会越频繁,从而降低整体的磁盘写入效率。

如果主题的消息量不是很大,此参数的配置将变得很重要。如果一个主题每天只接受 100 MB 的数据,此配置使用默认 1 GB设置,那么填满一个日志片段将需要 10 天。如果过期时间设置为 1 周,那么此消息片段将最多需要 17 天才会过期。

日志片段的大小也会影响使用时间戳获取偏移量的行为。当使用时间戳获取日志偏移量时,Kafka 会查找在指定时间戳写入的日志片段文件,也就是创建时间在指定时间戳之前且最后修改时间在指定时间戳之后的文件。然后,Kafka 会返回这个日志片段开头的偏移量(也就是文件名)。

6、log.roll.ms

该配置指定多长时间之后日志片段可以被关闭。该配置与 log.segment.bytes 并不互斥。

日志片段会在大小或时间达到上限时被关闭。

默认情况下,该配置没有设定值,使用的是 log.roll.hours 配置设定的值,默认为 168 个小时,即一周。

7、min.insync.replicas

为了提升集群的数据持久性,可以将 min.insync.replicas 设置为 2,确保至少有两个副本跟生产者保持“同步”。

生产者需要配合将 ack 设置为 all,这样就可以确保至少有两个副本(首领和另一个副本)确认写入成功,从而防止在以下情况下丢失数据:首领确认写入,然后发生停机,所有权被转移到一个副本,但这个副本没有写入成功。如果没有这些配置,则生产者会认为已经写入成功,但实际上消息丢失了。

不过,这样做是有副作用的,因为需要额外的开销,所以效率会有所降低。因此,对于能够容忍偶尔消息丢失的高吞吐量集群,不建议修改这个参数的默认值。

默认值为 1。

配置场景示例:创建一个复制因子为 3 的主题,将 min.insync.replicas 设置为 2,并将生产者的 acks 设置为 "all"。这将确保如果大多数副本未接收到写入,生产者将引发异常。

8、message.max.bytes

broker 通过设置 message.max.bytes 参数来限制单条消息的大小,默认值是 1 000 000,也就是 1MB。

如果生产者尝试发送超过这个大小的消息,那么不仅消息不会被 broker 接收,还会收到 broker 返回的错误信息。与其他 broker 配置参数一样,这个参数指的是压缩后的消息大小,也就是说,消息的实际大小可以远大于 message.max.bytes,只要压缩后小于这个值即可。

这个参数对性能有显著的影响。值越大,负责处理网络连接和请求的线程用在处理请求上的时间就越长。它还会增加磁盘写入块的大小,从而影响 I/O 吞吐量。

如何协调服务器端和客户端之间的消息大小配置?消费者客户端设置的 fetch.max.bytes 需要与服务端设置的消息大小保持一致。如果 fetch.max.bytes 的值比此配置的值小,那么消费者就无法读取比较大的消息,进而造成阻塞,无法继续处理消息。在配置 broker 的 replica.fetch.max.bytes 参数时,也遵循同样的原则。

相关链接

Apache Kafka

OB tags

#Kafka