一、使用 ZooKeeper

1、搭建 ZooKeeper 集群

在早期的 Kafka 架构中,ZooKeeper 起着关键的角色,负责管理和协调 Kafka 集群。以下是 Kafka 和 ZooKeeper 的具体关系和作用:

  • 元数据管理:ZooKeeper 负责存储和管理 Kafka 集群的元数据,包括主题、分区、偏移量、消费者组信息等。
  • 分区领导者选举:当一个 Kafka 分区的领导者(Leader)节点失效时,ZooKeeper 会负责选举新的领导者,确保数据的高可用性。
  • 集群状态监控和协调:ZooKeeper 维护 Kafka broker 集群的成员列表,帮助跟踪每个 broker 的状态和位置。
  • 协调服务:Kafka 使用 ZooKeeper 来管理协调服务,如消费者组的平衡、分区分配等。

关于搭建 ZooKeeper 集群,请参考:Docker 中部署 ZooKeeper 集群 | z2huo

2、Kafka Docker 配置与验证

2.1 Docker 配置文件一览

2.1.1 Compose 文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
name: kafka-cluster-demo

networks:
kafka-net:
name: kafka-cluster-net
driver: bridge
zookeeper-net:
name: zookeeper-cluster-net
external: true

volumes:
kafka-volume-1:
name: kafka-cluster-demo-volume-1
kafka-volume-2:
name: kafka-cluster-demo-volume-2
kafka-volume-3:
name: kafka-cluster-demo-volume-3
kafka-volume-4:
name: kafka-cluster-demo-volume-4
kafka-volume-5:
name: kafka-cluster-demo-volume-5

services:
broker-1:
image: ${image}
restart: ${restart}
container_name: ${container_name}-1
hostname: ${host_name}-1
env_file:
- ./kafka-cluster.env
networks:
- kafka-net
- zookeeper-net
ports:
- 127.0.0.1:19094:9094
volumes:
- kafka-volume-1:/bitnami/kafka
environment:
      KAFKA_CFG_BROKER_ID: 201
      KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://${host_name}-1:9092,EXTERNAL://127.0.0.1:19094

broker-2:
image: ${image}
restart: ${restart}
container_name: ${container_name}-2
hostname: ${host_name}-2
env_file:
- ./kafka-cluster.env
networks:
- kafka-net
- zookeeper-net
ports:
- 127.0.0.1:19095:9094
volumes:
- kafka-volume-2:/bitnami/kafka
environment:
KAFKA_CFG_BROKER_ID: 202
      KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://${host_name}-2:9092,EXTERNAL://127.0.0.1:19095

# ......
broker-5:
# ......

集群中共创建了 5 个节点,上面的 Compose 文件中省略了后面三个节点的内容。

2.1.2 kafka-cluster.env 文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# ZooKeeper 集群配置
KAFKA_CFG_ZOOKEEPER_CONNECT=zoo-1:2181,zoo-2:2181,zoo-3:2181,zoo-4:2181,zoo-5:2181/kafka-cluster-demo
KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS=18000

# inter.broker.listener.name 用于指定 Kafka 集群中 broker 之间通信时应该使用哪个监听器(listener)配置
KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL

# kafka 监听器配置
KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT

KAFKA_CFG_NUM_NETWORK_THREADS=3
KAFKA_CFG_NUM_IO_THREADS=8
KAFKA_CFG_SOCKET_SEND_BUFFER_BYTES=102400
KAFKA_CFG_SOCKET_RECEIVE_BUFFER_BYTES=102400
KAFKA_CFG_SOCKET_REQUEST_MAX_BYTES=104857600

# bitnami 镜像有自己的存放位置,不使用 kafka 默认存放位置
# KAFKA_CFG_LOG_DIRS=/tmp/kafka-logs

# 指定新创建的主题包含几个分区
KAFKA_CFG_NUM_PARTITIONS=5

KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR=1
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1
KAFKA_CFG_LOG_RETENTION_HOURS=720
KAFKA_CFG_LOG_RETENTION_CHECK_INTERVAL_MS=300000
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS=0

# 没有主题是是否自动创建主题
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false

# 保证主题的所有权不会集中在一台 broker 上,设置为true,让主题的所有权尽可能地在集群中保持均衡
KAFKA_CFG_AUTO_LEADER_REBALANCE_ENABLE=true

# 是否禁用主题删除功能,true 为禁用
KAFKA_CFG_DELETE_TOPIC_ENABLE=true
2.1.3 .env 文件
1
2
3
4
image=bitnami/kafka:3.6.1
container_name=kafka-broker
host_name=kafka
restart=unless-stopped

2.2 配置解释

2.2.1 ZooKeeper 集群地址使用 chroot path

Kafka 的 ZooKeeper 连接配置修改为:KAFKA_CFG_ZOOKEEPER_CONNECT: zoo-1:2181,zoo-2:2181,zoo-3:2181,zoo-4:2181,zoo-5:2181/kafka-cluster-demo,在连接最后添加 /kafka-cluster-demo

不使用 chroot path 时,Kafka Broker 注册到 ZooKeeper 集群中,节点在根目录下面存放,示例如下:

1
2
3
4
5
6
$ docker exec -it 93d213f61b79 /bin/bash
$ zkCli.sh
$ ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
$ ls /brokers/ids
[1]

而使用 chroot path 时,会在根目录下创建 chroot path 指定的路径 kafka-cluster-demo,如下:

1
2
3
4
5
6
$ ls /
[kafka-cluster-demo, zookeeper]
$ ls /kafka-cluster-demo
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
$ ls /kafka-cluster-demo/brokers/ids
[1]

这样方便在 ZooKeeper 中管理 Kafka 集群的配置信息。

2.2.2 镜像的选择

有两个可供选择对镜像:

  • bitnami/kafka,Verified Publisher
  • apache/kafka,Sponsored OSS

两个镜像在使用时对于配置文件和环境变量的处理方式不同,其中一个镜像会在绑定挂载了 server.properties 文件之后,Compose 文件中指定的环境变量会覆盖 server.properties 文件中的配置。

2.2.3 listenersadvertised.listenersinter.broker.listener.name 配置

listeners 属性定义了 Kafka broker 实际监听的网络接口和端口,它指定了 broker 可以接受连接的地址。这个配置项是用于 Kafka broker 内部绑定和监听的网络地址。

advertised.listeners 属性指定 Kafka broker 对外广播的网络地址,这是客户端用来连接 Kafka broker 的地址。这个属性通常用于 broker 在内部网络和外部网络上存在不同的网络接口或地址的情况。advertised.listeners 配置的地址是客户端和其他 brokers 可以访问的公共地址。

设置 inter.broker.listener.name 可以让 Kafka broker 使用一个特定的监听器(如 INTERNAL)来处理 broker 之间的内部通信。这个属性需要与 listeners 中配置的一个监听器名称匹配。

2.3 容器启动后验证 ZooKeeper 连接情况

2.3.1 使用 ZooKeeper CLI 检查

进入 ZooKeeper 集群中的一个节点。

1
$ docker exec -it 93d213f61b79 /bin/bash

使用 zkCli.sh 工具查看 Kafka 是否成功连接到 ZooKeeper 集群。

1
2
3
4
5
6
$ zkCli.sh
......
$ ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
$ ls /brokers/ids
[1]

上面查看 ids 输出的列表中,会输出 Kafka broker 的 ID 列表。

另外,在没有启动 Kafka 集群时,上面 ls / 命令的输出列表中只有一个 [zookeeper]

注意:如果 Kafka 连接 ZooKeeper 集群时使用了 chroot path,则执行 ls / 根目录下只会显示两个节点信息:

1
2
$ ls /
[kafka-cluster-demo, zookeeper]
2.3.2 使用 zookeeper-shell.sh 工具检查

可以使用 Kafka 自带的 zookeeper-shell.sh 工具,连接到 ZooKeeper,并检查 Kafka 创建的节点。

1
2
3
$ docker exec -it 7394557dcfec /bin/bash
$ zookeeper-shell.sh zoo-1:2181
$ ls /brokers/ids

2.4 验证 Kafka 集群

2.4.1 创建 topic 验证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test-topic
Created topic test-topic.

$ kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test-topic
[2024-08-12 16:28:54,743] WARN [AdminClient clientId=adminclient-1] The DescribeTopicPartitions API is not supported, using Metadata API to describe topics. (org.apache.kafka.clients.admin.KafkaAdminClient)
Topic: test-topic TopicId: X0odWG6LT8G_GBN-8bAG4Q PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test-topic Partition: 0 Leader: 11 Replicas: 11 Isr: 11 Elr: N/A LastKnownElr: N/A

$ kafka-topics.sh --bootstrap-server localhost:9092 --describe test-topic
Topic: test-topic TopicId: O-DrlT3KQHKX7dUMIOn0rw PartitionCount: 5 ReplicationFactor: 1 Configs:
Topic: test-topic Partition: 0 Leader: 201 Replicas: 201 Isr: 201
Topic: test-topic Partition: 1 Leader: 204 Replicas: 204 Isr: 204
Topic: test-topic Partition: 2 Leader: 202 Replicas: 202 Isr: 202
Topic: test-topic Partition: 3 Leader: 203 Replicas: 203 Isr: 203
Topic: test-topic Partition: 4 Leader: 205 Replicas: 205 Isr: 205

通过 zkCli.sh 查看 ZooKeeper 节点信息,可以看到 ZooKeeper 中已经有了 test-topic 的信息

1
2
3
$ zkCli.sh
$ ls /kafka-cluster-demo/brokers/topics
test-topic
2.4.2 topic 推送消息和消费消息验证
1
2
3
4
5
6
7
8
9
10
11
12
13
$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
>111
>222
>333
>444
>555

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
111
222
333
444
555

2.5 Java 程序验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ConsumerTest {

public static void main(String[] args) {
String topic = "test-topic";
String bootstrapServers = "127.0.0.1:19094,127.0.0.1:19095,127.0.0.1:19096,127.0.0.1:19097,127.0.0.1:19098";
String groupId = "my-consumer-group";

// Kafka 消费者配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapServers);
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", groupId);
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("enable.auto.commit", "true");

// 创建 Kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

try (consumer) {
// 订阅主题
consumer.subscribe(Collections.singletonList(topic));
// 不断地拉取和处理消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: Key = %s, Value = %s, Offset = %d%n", record.key(), record.value(), record.offset());
}
}
}
}
}

输出结果:

1
2
3
4
5
Received message: Key = null, Value = 111, Offset = 0
Received message: Key = null, Value = 222, Offset = 1
Received message: Key = null, Value = 333, Offset = 2
Received message: Key = null, Value = 444, Offset = 3
Received message: Key = null, Value = 555, Offset = 4

二、使用 Kafka Raft

随着 Kafka 的发展,Kafka 社区开始开发一种新的架构模式,即 **KRaft (Kafka Raft)**,旨在移除对 ZooKeeper 的依赖。KRaft 模式利用内部的 Raft 共识算法来替代 ZooKeeper,实现元数据的分布式管理和协调。

迁移到 KRaft 的好处

  • 简化架构:移除对外部 ZooKeeper 集群的依赖,简化了 Kafka 的部署和管理。
  • 提升性能:通过优化内部处理,KRaft 提供更高的吞吐量和更低的延迟。
  • 增强一致性:KRaft 采用 Raft 算法,提供更强的一致性和更快的领导者选举机制。

1、Docker 配置文件一览

1.1 Compose 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
name: kafka-cluster-kraft-demo

networks:
kafka-net:
name: kafka-cluster-kraft-net
driver: bridge
ipam:
driver: default
config:
- subnet: 172.99.0.0/16
ip_range: 172.99.0.0/16
gateway: 172.99.0.1

volumes:
kafka-volume-controller-1:
name: kafka-cluster-kraft-demo-volume-controller-1
kafka-volume-controller-2:
name: kafka-cluster-kraft-demo-volume-controller-2
kafka-volume-controller-3:
name: kafka-cluster-kraft-demo-volume-controller-3
kafka-volume-broker-1:
name: kafka-cluster-kraft-demo-volume-broker-1
kafka-volume-broker-2:
name: kafka-cluster-kraft-demo-volume-broker-2
kafka-volume-broker-3:
name: kafka-cluster-kraft-demo-volume-broker-3
kafka-volume-broker-4:
name: kafka-cluster-kraft-demo-volume-broker-4
kafka-volume-broker-5:
name: kafka-cluster-kraft-demo-volume-broker-5

services:
controller-1:
image: ${image}
restart: ${restart}
container_name: ${container_name}-controller-1
hostname: controller-1
env_file:
- ./env/controller.env
- ./env/common.env
networks:
kafka-net:
ipv4_address: 172.99.1.1
ports:
- 9301:9093
volumes:
- kafka-volume-controller-1:/bitnami
- ./config/controller.properties:/opt/bitnami/kafka/config/server.properties.original
environment:
KAFKA_CFG_NODE_ID: 101
KAFKA_CFG_LISTENERS: CONTROLLER://controller-1:9093
KAFKA_CFG_ADVERTISED_LISTENERS: CONTROLLER://${host_ip}:9301
# 启用调试模式,以获取更详细的日志信息
BITNAMI_DEBUG: true

controller-2:
image: ${image}
restart: ${restart}
container_name: ${container_name}-controller-2
hostname: controller-2
env_file:
- ./env/controller.env
- ./env/common.env
networks:
kafka-net:
ipv4_address: 172.99.1.2
ports:
- 9302:9093
volumes:
- kafka-volume-controller-2:/bitnami
- ./config/controller.properties:/opt/bitnami/kafka/config/server.properties.original
environment:
KAFKA_CFG_NODE_ID: 102
KAFKA_CFG_LISTENERS: CONTROLLER://controller-2:9093
KAFKA_CFG_ADVERTISED_LISTENERS: CONTROLLER://${host_ip}:9302
BITNAMI_DEBUG: true

controller-3:
image: ${image}
restart: ${restart}
container_name: ${container_name}-controller-3
hostname: controller-3
env_file:
- ./env/controller.env
- ./env/common.env
networks:
kafka-net:
ipv4_address: 172.99.1.3
ports:
- 9303:9093
volumes:
- kafka-volume-controller-3:/bitnami
- ./config/controller.properties:/opt/bitnami/kafka/config/server.properties.original
environment:
KAFKA_CFG_NODE_ID: 103
KAFKA_CFG_LISTENERS: CONTROLLER://controller-3:9093
KAFKA_CFG_ADVERTISED_LISTENERS: CONTROLLER://${host_ip}:9303
BITNAMI_DEBUG: true

broker-1:
image: ${image}
restart: ${restart}
container_name: ${container_name}-broker-1
hostname: broker-1
env_file:
- ./env/broker.env
- ./env/common.env
networks:
kafka-net:
ipv4_address: 172.99.2.1
ports:
- 9201:9094
volumes:
- kafka-volume-broker-1:/bitnami/kafka/data
- ./config/broker.properties:/opt/bitnami/kafka/config/server.properties.original
environment:
KAFKA_CFG_NODE_ID: 111
KAFKA_CFG_LISTENERS: BROKER://broker-1:9092,CLIENT://broker-1:9094
KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://broker-1:9092,CLIENT://${host_ip}:9201
# 启用调试模式,以获取更详细的日志信息
BITNAMI_DEBUG: true

broker-2:
image: ${image}
restart: ${restart}
container_name: ${container_name}-broker-2
hostname: broker-2
env_file:
- ./env/broker.env
- ./env/common.env
networks:
kafka-net:
ipv4_address: 172.99.2.2
ports:
- 9202:9094
volumes:
- kafka-volume-broker-2:/bitnami/kafka/data
- ./config/broker.properties:/opt/bitnami/kafka/config/server.properties.original
environment:
KAFKA_CFG_NODE_ID: 112
KAFKA_CFG_LISTENERS: BROKER://broker-2:9092,CLIENT://broker-2:9094
KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://broker-2:9092,CLIENT://${host_ip}:9202
BITNAMI_DEBUG: true

broker-3:
image: ${image}
restart: ${restart}
container_name: ${container_name}-broker-3
hostname: broker-3
env_file:
- ./env/broker.env
- ./env/common.env
networks:
kafka-net:
ipv4_address: 172.99.2.3
ports:
- 9203:9094
volumes:
- kafka-volume-broker-3:/bitnami/kafka/data
- ./config/broker.properties:/opt/bitnami/kafka/config/server.properties.original
environment:
KAFKA_CFG_NODE_ID: 113
KAFKA_CFG_LISTENERS: BROKER://broker-3:9092,CLIENT://broker-3:9094
KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://broker-3:9092,CLIENT://${host_ip}:9203
BITNAMI_DEBUG: true

broker-4:
image: ${image}
restart: ${restart}
container_name: ${container_name}-broker-4
hostname: broker-4
env_file:
- ./env/broker.env
- ./env/common.env
networks:
kafka-net:
ipv4_address: 172.99.2.4
ports:
- 9204:9094
volumes:
- kafka-volume-broker-4:/bitnami/kafka/data
- ./config/broker.properties:/opt/bitnami/kafka/config/server.properties.original
environment:
KAFKA_CFG_NODE_ID: 114
KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://broker-4:9092,CLIENT://${host_ip}:9204
KAFKA_CFG_LISTENERS: BROKER://broker-4:9092,CLIENT://broker-4:9094
BITNAMI_DEBUG: true

broker-5:
image: ${image}
restart: ${restart}
container_name: ${container_name}-broker-5
hostname: broker-5
env_file:
- ./env/broker.env
- ./env/common.env
networks:
kafka-net:
ipv4_address: 172.99.2.5
ports:
- 9205:9094
volumes:
- kafka-volume-broker-5:/bitnami/kafka/data
- ./config/broker.properties:/opt/bitnami/kafka/config/server.properties.original
environment:
KAFKA_CFG_NODE_ID: 115
KAFKA_CFG_LISTENERS: BROKER://broker-5:9092,CLIENT://broker-5:9094
KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://broker-5:9092,CLIENT://${host_ip}:9205
BITNAMI_DEBUG: true

构建的 Kafka 集群使用 3 个控制器节点,5 个 Broker 节点。

1.2 .env 文件

1
2
3
4
5
image=bitnami/kafka:4.0.0
container_name=kafka-broker-kraft
restart=unless-stopped
# 宿主机 IP
host_ip=192.168.99.99

使用 bitnami/kafka Docker 镜像。

1.3 环境变量文件

1.3.1 控制器环境变量文件
1
2
3
4
5
6
7
8
9
10
11
# Kafka 节点的角色,控制器、Broker 或者同时是控制器和 Broker
KAFKA_CFG_PROCESS_ROLES=controller

# 以逗号分隔的控制器使用的监听器名称列表
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

# 指定每个监听器的安全协议
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT

# 集群控制器 Id 和 endpoint,静态控制器仲裁,老方法
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=101@controller-1:9093,102@controller-2:9093,103@controller-3:9093
1.3.2 Broker 环境变量文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Kafka 节点的角色,控制器、Broker 或者同时是控制器和 Broker
KAFKA_CFG_PROCESS_ROLES=broker

# 用于指定 Broker 间通信的监听器名称
KAFKA_CFG_INTER_BROKER_LISTENER_NAME=BROKER

# 以逗号分隔的控制器使用的监听器名称列表
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

# 指定每个监听器的安全协议
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,CLIENT:PLAINTEXT

# 集群控制器 Id 和 endpoint,静态控制器仲裁,老方法
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=101@controller-1:9093,102@controller-2:9093,103@controller-3:9093
1.3.3 通用环境变量文件
1
2
# 集群 Id,该配置是该 kafka 镜像要求的
KAFKA_KRAFT_CLUSTER_ID=kafka-cluster-kraft-demo

1.4 properties 文件

1.4.1 控制器配置文件

额,控制器暂时不需要额外的配置属性。

1

1.4.2 Broker 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 是否禁用主题删除功能,true 为禁用
delete.topic.enable=true

# 没有主题是是否自动创建主题
auto.create.topics.enable=true

# 指定新创建的主题包含几个分区
# 指定了新创建的主题将包含多少个分区,特别是如果启用了主题自动创建功能
# 设置为 5,与 broker 数量保持一致
# 不知道为啥,只有自动创建的分区会应用该配置,比如直接向不存在的 topic 推送消息时自动创建的分区
# 而像通过 kafka-topics.sh --bootstrap-server broker-1:9092 --create --topic test-topic 创建的分区
# 如果不指定 --partitions 参数,则自动创建分区数量为 1
num.partitions=5

offsets.topic.num.partitions=50

# 指定复制系数,broker 级别的配置
default.replication.factor=3

# 使用默认配置 3
offsets.topic.replication.factor=3

# 副本跟生产者保持“同步”数量
min.insync.replicas=2

# 启动时用于日志恢复和关闭时用于刷新的每个数据目录的线程数
num.recovery.threads.per.data.dir=8

# 消息日志保留多长时间,30天
log.retention.ms=2592100000
# log.retention.hours=720

# 限制单条消息大小
message.max.bytes=1048576

# 日志片段在多少字节时关闭,默认为 1073741824(1GB),设定为 100 MB
log.segment.bytes=104857600

# 保证主题的所有权不会集中在一台 broker 上,设置为 true,让主题的所有权尽可能地在集群中保持均衡
auto.leader.rebalance.enable=true

log.retention.check.interval.ms=300000

group.initial.rebalance.delay.ms=0

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

2、Docker 镜像中环境变量和配置文件的关系

Kafka 使用的 Docker 镜像选择的是 bitnami/kafka,该镜像中可以同时使用 Docker 环境变量和 Kafka 配置文件,环境变量中的配置最终会覆盖配置文件中的配置,具体操作过程是在 Docker 构建镜像时的 Shell 脚本中指定的,并且 Kafka 配置文件的位置也是比较特殊的,有多个 Kafka 配置文件位置,配置文件存放在不同的位置,Shell 脚本的行为也是不同的。

Docker Compose 文件中可以看到如下文件挂载:

1
2
volumes:
- ./config/broker.properties:/opt/bitnami/kafka/config/server.properties.original

将本地的 broker.properties 文件挂载到容器的 /opt/bitnami/kafka/config/server.properties.original,容器启动后执行 Shell 脚本会从 server.properties.original 复制出 server.properties 文件,之后将环境变量中的配置追加到 server.properties 文件,对于重复的配置,会将环境变量中的配置覆盖 server.properties 中的原有配置。

先来看 Dockerfile 内容:

1
2
3
4
5
6
7
8
9
10
COPY rootfs /

# ......

RUN /opt/bitnami/scripts/kafka/postunpack.sh

# ......

ENTRYPOINT [ "/opt/bitnami/scripts/kafka/entrypoint.sh" ]
CMD [ "/opt/bitnami/scripts/kafka/run.sh" ]

postunpack.sh 文件中有如下内容:

1
2
# Move the original server.properties, so users can skip initialization logic by mounting their own server.properties directly instead of using the MOUNTED_CONF_DIR
mv "${KAFKA_CONF_DIR}/server.properties" "${KAFKA_CONF_DIR}/server.properties.original"

/opt/bitnami/kafka/config/ 目录下原本是没有 server.properties 的,如果存在,表示该文件为 Compose 文件中挂载的文件,则将该文件重命名为 server.properties.original

entrypoint.sh 文件中有如下内容,会执行 setup.sh

1
2
3
4
5
if [[ "$*" = *"/opt/bitnami/scripts/kafka/run.sh"* || "$*" = *"/run.sh"* ]]; then
info "** Starting Kafka setup **"
/opt/bitnami/scripts/kafka/setup.sh
info "** Kafka setup finished! **"
fi

setup.sh 中有如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Map Kafka environment variables
kafka_create_alias_environment_variables

# Dynamically set node.id/broker.id/controller.quorum.voters if the _COMMAND environment variable is set
kafka_dynamic_environment_variables
# Set the default truststore locations before validation
kafka_configure_default_truststore_locations
# Ensure Kafka user and group exist when running as 'root'
am_i_root && ensure_user_exists "$KAFKA_DAEMON_USER" --group "$KAFKA_DAEMON_GROUP"
# Ensure directories used by Kafka exist and have proper ownership and permissions
for dir in "$KAFKA_LOG_DIR" "$KAFKA_CONF_DIR" "$KAFKA_MOUNTED_CONF_DIR" "$KAFKA_VOLUME_DIR" "$KAFKA_DATA_DIR"; do
if am_i_root; then
ensure_dir_exists "$dir" "$KAFKA_DAEMON_USER" "$KAFKA_DAEMON_GROUP"
else
ensure_dir_exists "$dir"
fi
done

# Kafka validation, skipped if server.properties was mounted at either $KAFKA_MOUNTED_CONF_DIR or $KAFKA_CONF_DIR
[[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/server.properties" && ! -f "$KAFKA_CONF_FILE" ]] && kafka_validate
# Kafka initialization, skipped if server.properties was mounted at $KAFKA_CONF_DIR
[[ ! -f "$KAFKA_CONF_FILE" ]] && kafka_initialize

其中有大量处理 Docker 环境变量的方法,这些方法都在 libkafka.sh 文件中。调用的 kafka_initialize 方法中会进行配置的替换和复制。在执行方法之前会判断 $KAFKA_CONF_FILE 方法是否存在。下面列出一些路径变量的取值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
export BITNAMI_ROOT_DIR="/opt/bitnami"
export BITNAMI_VOLUME_DIR="/bitnami"

export KAFKA_BASE_DIR="${BITNAMI_ROOT_DIR}/kafka"
export KAFKA_VOLUME_DIR="/bitnami/kafka"
export KAFKA_DATA_DIR="${KAFKA_VOLUME_DIR}/data"
export KAFKA_CONF_DIR="${KAFKA_BASE_DIR}/config"
export KAFKA_CONF_FILE="${KAFKA_CONF_DIR}/server.properties"
export KAFKA_MOUNTED_CONF_DIR="${KAFKA_MOUNTED_CONF_DIR:-${KAFKA_VOLUME_DIR}/config}"
export KAFKA_CERTS_DIR="${KAFKA_CONF_DIR}/certs"
export KAFKA_INITSCRIPTS_DIR="/docker-entrypoint-initdb.d"
export KAFKA_LOG_DIR="${KAFKA_BASE_DIR}/logs"
export KAFKA_HOME="$KAFKA_BASE_DIR"
export PATH="${KAFKA_BASE_DIR}/bin:${BITNAMI_ROOT_DIR}/java/bin:${PATH}"

kafka_initialize 方法内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
kafka_initialize() {
info "Initializing Kafka..."

# Check for mounted configuration files
if ! is_dir_empty "$KAFKA_MOUNTED_CONF_DIR"; then
cp -Lr "$KAFKA_MOUNTED_CONF_DIR"/* "$KAFKA_CONF_DIR"
fi

if [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/server.properties" ]]; then
info "No injected configuration files found, creating default config files"
# Restore original server.properties but remove Zookeeper/KRaft specific settings for compatibility with both architectures
cp "${KAFKA_CONF_DIR}/server.properties.original" "$KAFKA_CONF_FILE"

kafka_configure_from_environment_variables

# ......

# Settings for each Kafka Listener are configured individually
read -r -a protocol_maps <<<"$(tr ',' ' ' <<<"$KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP")"
for protocol_map in "${protocol_maps[@]}"; do

# ......

done
# Configure Kafka using environment variables
# This is executed at the end, to allow users to override properties set by the initialization logic
kafka_configure_from_environment_variables
else
info "Detected mounted server.properties file at ${KAFKA_MOUNTED_CONF_DIR}/server.properties. Skipping configuration based on env variables"
fi
true
}

kafka_configure_from_environment_variables 方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
kafka_configure_from_environment_variables() {
# List of special cases to apply to the variables
local -r exception_regexps=(
"s/sasl\.ssl/sasl_ssl/g"
"s/sasl\.plaintext/sasl_plaintext/g"
)
# Map environment variables to config properties
for var in "${!KAFKA_CFG_@}"; do
key="$(echo "$var" | sed -e 's/^KAFKA_CFG_//g' -e 's/_/\./g' | tr '[:upper:]' '[:lower:]')"

# Exception for the camel case in this environment variable
[[ "$var" == "KAFKA_CFG_ZOOKEEPER_CLIENTCNXNSOCKET" ]] && key="zookeeper.clientCnxnSocket"

# Apply exception regexps
for regex in "${exception_regexps[@]}"; do
key="$(echo "$key" | sed "$regex")"
done

value="${!var}"
kafka_server_conf_set "$key" "$value"
done
}

该方法中会将 KAFKA_CFG_NUM_PARTITIONS 格式的环境变量循环遍历转换为 num.partitions,并将调用 kafka_server_conf_set 方法将属性配置追加到指定文件中,kafka_server_conf_set 方法如下:

1
2
3
kafka_server_conf_set() {
kafka_common_conf_set "$KAFKA_CONF_FILE" "$@"
}

变量和配置文件中属性的追加和替换就是以如上方式实现的,实现这个还怪麻烦的,不知道是不是 bitnami 提供的所有 Docker 镜像都有这个功能,比如 Redis、PostgreSQL 等。如果都有的话那确实挺好的,之前用的一些官方镜像中都一律使用配置文件,还没有去探索是否实现了两者共存的方式。另外 Shell 脚本也不怎么能看懂一些语法,费劲。

3、集群测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 创建 topic
$ kafka-topics.sh --bootstrap-server broker-1:9092 --create --replication-factor 3 --partitions 5 --topic test-topic
Created topic test-topic.

$ kafka-topics.sh --bootstrap-server broker-1:9092 --create --partitions 5 --topic test-topic

$ kafka-topics.sh --bootstrap-server broker-1:9092 --create --topic test-topic
Created topic test-topic.

# 删除 Topic
$ kafka-topics.sh --bootstrap-server broker-1:9092 --delete --topic test-topic

# 获取 Topic 列表
$ kafka-topics.sh --bootstrap-server broker-1:9092 --list

# 查看创建的 Topic 详情
$ kafka-topics.sh --bootstrap-server broker-1:9092 --describe --topic test-topic
Topic: test-topic TopicId: zFA0MFqBSKmmDYXuoda9Bw PartitionCount: 5 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=104857600,retention.ms=2592000000,max.message.bytes=1048576
Topic: test-topic Partition: 0 Leader: 112 Replicas: 112,113,114 Isr: 112,113,114 Elr: LastKnownElr:
Topic: test-topic Partition: 1 Leader: 113 Replicas: 113,114,115 Isr: 113,114,115 Elr: LastKnownElr:
Topic: test-topic Partition: 2 Leader: 114 Replicas: 114,115,111 Isr: 114,115,111 Elr: LastKnownElr:
Topic: test-topic Partition: 3 Leader: 115 Replicas: 115,111,112 Isr: 115,111,112 Elr: LastKnownElr:
Topic: test-topic Partition: 4 Leader: 111 Replicas: 111,112,113 Isr: 111,112,113 Elr: LastKnownElr:

# 生产者测试
$ kafka-console-producer.sh --bootstrap-server broker-1:9092 --topic test-topic
>111
>222
>333
>444
> 555

# 消费者测试
$ kafka-console-consumer.sh --bootstrap-server broker-1:9092 --topic test-topic --from-beginning
111
222
333
444
555

# 查看内部 Topic __consumer_offsets
$ kafka-topics.sh --bootstrap-server broker-1:9092 --describe --topic __consumer_offsets

# 查看消费者组
$ kafka-consumer-groups.sh --bootstrap-server broker-1:9092 --group my-consumer-group --describe

三、其他

1、构建自己的 Kafka 镜像

2、监听器配置

相关链接

kafka/docker/examples/README.md at trunk · apache/kafka (github.com)

bitnami/kafka - Docker Image | Docker Hub

apache/kafka - Docker Image | Docker Hub

Docker 中部署 ZooKeeper 集群 | z2huo

Kafka Broker 配置 | z2huo

[[Docker 中部署 ZooKeeper 集群]]

[[Broker 配置]]

[[chroot path]]

OB tags

#Docker #Kafka #ZooKeeper #未完待续