一、使用 ZooKeeper 1、搭建 ZooKeeper 集群 在早期的 Kafka 架构中,ZooKeeper 起着关键的角色,负责管理和协调 Kafka 集群。以下是 Kafka 和 ZooKeeper 的具体关系和作用:
元数据管理 :ZooKeeper 负责存储和管理 Kafka 集群的元数据,包括主题、分区、偏移量、消费者组信息等。
分区领导者选举 :当一个 Kafka 分区的领导者(Leader)节点失效时,ZooKeeper 会负责选举新的领导者,确保数据的高可用性。
集群状态监控和协调 :ZooKeeper 维护 Kafka broker 集群的成员列表,帮助跟踪每个 broker 的状态和位置。
协调服务 :Kafka 使用 ZooKeeper 来管理协调服务,如消费者组的平衡、分区分配等。
关于搭建 ZooKeeper 集群,请参考:Docker 中部署 ZooKeeper 集群 | z2huo
2、Kafka Docker 配置与验证 2.1 Docker 配置文件一览 2.1.1 Compose 文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 name: kafka-cluster-demo networks: kafka-net: name: kafka-cluster-net driver: bridge zookeeper-net: name: zookeeper-cluster-net external: true volumes: kafka-volume-1: name: kafka-cluster-demo-volume-1 kafka-volume-2: name: kafka-cluster-demo-volume-2 kafka-volume-3: name: kafka-cluster-demo-volume-3 kafka-volume-4: name: kafka-cluster-demo-volume-4 kafka-volume-5: name: kafka-cluster-demo-volume-5 services: broker-1: image: ${image} restart: ${restart} container_name: ${container_name}-1 hostname: ${host_name}-1 env_file: - ./kafka-cluster.env networks: - kafka-net - zookeeper-net ports: - 127.0 .0 .1 :19094:9094 volumes: - kafka-volume-1:/bitnami/kafka environment: KAFKA_CFG_BROKER_ID: 201 KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://${host_name}-1:9092,EXTERNAL://127.0.0.1:19094 broker-2: image: ${image} restart: ${restart} container_name: ${container_name}-2 hostname: ${host_name}-2 env_file: - ./kafka-cluster.env networks: - kafka-net - zookeeper-net ports: - 127.0 .0 .1 :19095:9094 volumes: - kafka-volume-2:/bitnami/kafka environment: KAFKA_CFG_BROKER_ID: 202 KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://${host_name}-2:9092,EXTERNAL://127.0.0.1:19095 broker-5:
集群中共创建了 5 个节点,上面的 Compose 文件中省略了后面三个节点的内容。
2.1.2 kafka-cluster.env
文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 # ZooKeeper 集群配置 KAFKA_CFG_ZOOKEEPER_CONNECT=zoo-1:2181,zoo-2:2181,zoo-3:2181,zoo-4:2181,zoo-5:2181/kafka-cluster-demo KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS=18000 # inter.broker.listener.name 用于指定 Kafka 集群中 broker 之间通信时应该使用哪个监听器(listener)配置 KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL # kafka 监听器配置 KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:9094 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_CFG_NUM_NETWORK_THREADS=3 KAFKA_CFG_NUM_IO_THREADS=8 KAFKA_CFG_SOCKET_SEND_BUFFER_BYTES=102400 KAFKA_CFG_SOCKET_RECEIVE_BUFFER_BYTES=102400 KAFKA_CFG_SOCKET_REQUEST_MAX_BYTES=104857600 # bitnami 镜像有自己的存放位置,不使用 kafka 默认存放位置 # KAFKA_CFG_LOG_DIRS=/tmp/kafka-logs # 指定新创建的主题包含几个分区 KAFKA_CFG_NUM_PARTITIONS=5 KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR=1 KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1 KAFKA_CFG_LOG_RETENTION_HOURS=720 KAFKA_CFG_LOG_RETENTION_CHECK_INTERVAL_MS=300000 KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS=0 # 没有主题是是否自动创建主题 KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false # 保证主题的所有权不会集中在一台 broker 上,设置为true,让主题的所有权尽可能地在集群中保持均衡 KAFKA_CFG_AUTO_LEADER_REBALANCE_ENABLE=true # 是否禁用主题删除功能,true 为禁用 KAFKA_CFG_DELETE_TOPIC_ENABLE=true
2.1.3 .env
文件 1 2 3 4 image=bitnami/kafka:3.6.1 container_name=kafka-broker host_name=kafka restart=unless-stopped
2.2 配置解释 2.2.1 ZooKeeper 集群地址使用 chroot path Kafka 的 ZooKeeper 连接配置修改为:KAFKA_CFG_ZOOKEEPER_CONNECT: zoo-1:2181,zoo-2:2181,zoo-3:2181,zoo-4:2181,zoo-5:2181/kafka-cluster-demo
,在连接最后添加 /kafka-cluster-demo
不使用 chroot path 时,Kafka Broker 注册到 ZooKeeper 集群中,节点在根目录下面存放,示例如下:
1 2 3 4 5 6 $ docker exec -it 93d213f61b79 /bin/bash $ zkCli.sh $ ls /[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper] $ ls /brokers/ids[1]
而使用 chroot path 时,会在根目录下创建 chroot path 指定的路径 kafka-cluster-demo
,如下:
1 2 3 4 5 6 $ ls /[kafka-cluster-demo, zookeeper] $ ls /kafka-cluster-demo[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification] $ ls /kafka-cluster-demo/brokers/ids[1]
这样方便在 ZooKeeper 中管理 Kafka 集群的配置信息。
2.2.2 镜像的选择 有两个可供选择对镜像:
bitnami/kafka
,Verified Publisher
apache/kafka
,Sponsored OSS
两个镜像在使用时对于配置文件和环境变量的处理方式不同,其中一个镜像会在绑定挂载了 server.properties
文件之后,Compose 文件中指定的环境变量会覆盖 server.properties
文件中的配置。
2.2.3 listeners
、advertised.listeners
和 inter.broker.listener.name
配置 listeners
属性定义了 Kafka broker 实际监听的网络接口和端口,它指定了 broker 可以接受连接的地址。这个配置项是用于 Kafka broker 内部绑定和监听的网络地址。
advertised.listeners
属性指定 Kafka broker 对外广播的网络地址,这是客户端用来连接 Kafka broker 的地址。这个属性通常用于 broker 在内部网络和外部网络上存在不同的网络接口或地址的情况。advertised.listeners
配置的地址是客户端和其他 brokers 可以访问的公共地址。
设置 inter.broker.listener.name
可以让 Kafka broker 使用一个特定的监听器(如 INTERNAL
)来处理 broker 之间的内部通信。这个属性需要与 listeners
中配置的一个监听器名称匹配。
2.3 容器启动后验证 ZooKeeper 连接情况 2.3.1 使用 ZooKeeper CLI 检查 进入 ZooKeeper 集群中的一个节点。
1 $ docker exec -it 93d213f61b79 /bin/bash
使用 zkCli.sh
工具查看 Kafka 是否成功连接到 ZooKeeper 集群。
1 2 3 4 5 6 $ zkCli.sh ...... $ ls /[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper] $ ls /brokers/ids[1]
上面查看 ids
输出的列表中,会输出 Kafka broker 的 ID 列表。
另外,在没有启动 Kafka 集群时,上面 ls /
命令的输出列表中只有一个 [zookeeper]
。
注意 :如果 Kafka 连接 ZooKeeper 集群时使用了 chroot path,则执行 ls /
根目录下只会显示两个节点信息:
1 2 $ ls /[kafka-cluster-demo, zookeeper]
2.3.2 使用 zookeeper-shell.sh
工具检查 可以使用 Kafka 自带的 zookeeper-shell.sh
工具,连接到 ZooKeeper,并检查 Kafka 创建的节点。
1 2 3 $ docker exec -it 7394557dcfec /bin/bash $ zookeeper-shell.sh zoo-1:2181 $ ls /brokers/ids
2.4 验证 Kafka 集群 2.4.1 创建 topic 验证 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 $ kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test-topic Created topic test-topic. $ kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test-topic [2024-08-12 16:28:54,743] WARN [AdminClient clientId=adminclient-1] The DescribeTopicPartitions API is not supported, using Metadata API to describe topics. (org.apache.kafka.clients.admin.KafkaAdminClient) Topic: test-topic TopicId: X0odWG6LT8G_GBN-8bAG4Q PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: test-topic Partition: 0 Leader: 11 Replicas: 11 Isr: 11 Elr: N/A LastKnownElr: N/A $ kafka-topics.sh --bootstrap-server localhost:9092 --describe test-topic Topic: test-topic TopicId: O-DrlT3KQHKX7dUMIOn0rw PartitionCount: 5 ReplicationFactor: 1 Configs: Topic: test-topic Partition: 0 Leader: 201 Replicas: 201 Isr: 201 Topic: test-topic Partition: 1 Leader: 204 Replicas: 204 Isr: 204 Topic: test-topic Partition: 2 Leader: 202 Replicas: 202 Isr: 202 Topic: test-topic Partition: 3 Leader: 203 Replicas: 203 Isr: 203 Topic: test-topic Partition: 4 Leader: 205 Replicas: 205 Isr: 205
通过 zkCli.sh
查看 ZooKeeper 节点信息,可以看到 ZooKeeper 中已经有了 test-topic
的信息
1 2 3 $ zkCli.sh $ ls /kafka-cluster-demo/brokers/topicstest-topic
2.4.2 topic 推送消息和消费消息验证 1 2 3 4 5 6 7 8 9 10 11 12 13 $ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic > 111 > 222 > 333 > 444 > 555 $ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning 111 222 333 444 555
2.5 Java 程序验证 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class ConsumerTest { public static void main (String[] args) { String topic = "test-topic" ; String bootstrapServers = "127.0.0.1:19094,127.0.0.1:19095,127.0.0.1:19096,127.0.0.1:19097,127.0.0.1:19098" ; String groupId = "my-consumer-group" ; Properties properties = new Properties (); properties.setProperty("bootstrap.servers" , bootstrapServers); properties.setProperty("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); properties.setProperty("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); properties.setProperty("group.id" , groupId); properties.setProperty("auto.offset.reset" , "earliest" ); properties.setProperty("enable.auto.commit" , "true" ); KafkaConsumer<String, String> consumer = new KafkaConsumer <>(properties); try (consumer) { consumer.subscribe(Collections.singletonList(topic)); while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500 )); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received message: Key = %s, Value = %s, Offset = %d%n" , record.key(), record.value(), record.offset()); } } } } }
输出结果:
1 2 3 4 5 Received message: Key = null, Value = 111, Offset = 0 Received message: Key = null, Value = 222, Offset = 1 Received message: Key = null, Value = 333, Offset = 2 Received message: Key = null, Value = 444, Offset = 3 Received message: Key = null, Value = 555, Offset = 4
二、使用 Kafka Raft 随着 Kafka 的发展,Kafka 社区开始开发一种新的架构模式,即 **KRaft (Kafka Raft)**,旨在移除对 ZooKeeper 的依赖。KRaft 模式利用内部的 Raft 共识算法来替代 ZooKeeper,实现元数据的分布式管理和协调。
迁移到 KRaft 的好处 :
简化架构 :移除对外部 ZooKeeper 集群的依赖,简化了 Kafka 的部署和管理。
提升性能 :通过优化内部处理,KRaft 提供更高的吞吐量和更低的延迟。
增强一致性 :KRaft 采用 Raft 算法,提供更强的一致性和更快的领导者选举机制。
1、Docker 配置文件一览 1.1 Compose 文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 name: kafka-cluster-kraft-demo networks: kafka-net: name: kafka-cluster-kraft-net driver: bridge ipam: driver: default config: - subnet: 172.99 .0 .0 /16 ip_range: 172.99 .0 .0 /16 gateway: 172.99 .0 .1 volumes: kafka-volume-controller-1: name: kafka-cluster-kraft-demo-volume-controller-1 kafka-volume-controller-2: name: kafka-cluster-kraft-demo-volume-controller-2 kafka-volume-controller-3: name: kafka-cluster-kraft-demo-volume-controller-3 kafka-volume-broker-1: name: kafka-cluster-kraft-demo-volume-broker-1 kafka-volume-broker-2: name: kafka-cluster-kraft-demo-volume-broker-2 kafka-volume-broker-3: name: kafka-cluster-kraft-demo-volume-broker-3 kafka-volume-broker-4: name: kafka-cluster-kraft-demo-volume-broker-4 kafka-volume-broker-5: name: kafka-cluster-kraft-demo-volume-broker-5 services: controller-1: image: ${image} restart: ${restart} container_name: ${container_name}-controller-1 hostname: controller-1 env_file: - ./env/controller.env - ./env/common.env networks: kafka-net: ipv4_address: 172.99 .1 .1 ports: - 9301 :9093 volumes: - kafka-volume-controller-1:/bitnami - ./config/controller.properties:/opt/bitnami/kafka/config/server.properties.original environment: KAFKA_CFG_NODE_ID: 101 KAFKA_CFG_LISTENERS: CONTROLLER://controller-1:9093 KAFKA_CFG_ADVERTISED_LISTENERS: CONTROLLER://${host_ip}:9301 BITNAMI_DEBUG: true controller-2: image: ${image} restart: ${restart} container_name: ${container_name}-controller-2 hostname: controller-2 env_file: - ./env/controller.env - ./env/common.env networks: kafka-net: ipv4_address: 172.99 .1 .2 ports: - 9302 :9093 volumes: - kafka-volume-controller-2:/bitnami - ./config/controller.properties:/opt/bitnami/kafka/config/server.properties.original environment: KAFKA_CFG_NODE_ID: 102 KAFKA_CFG_LISTENERS: CONTROLLER://controller-2:9093 KAFKA_CFG_ADVERTISED_LISTENERS: CONTROLLER://${host_ip}:9302 BITNAMI_DEBUG: true controller-3: image: ${image} restart: ${restart} container_name: ${container_name}-controller-3 hostname: controller-3 env_file: - ./env/controller.env - ./env/common.env networks: kafka-net: ipv4_address: 172.99 .1 .3 ports: - 9303 :9093 volumes: - kafka-volume-controller-3:/bitnami - ./config/controller.properties:/opt/bitnami/kafka/config/server.properties.original environment: KAFKA_CFG_NODE_ID: 103 KAFKA_CFG_LISTENERS: CONTROLLER://controller-3:9093 KAFKA_CFG_ADVERTISED_LISTENERS: CONTROLLER://${host_ip}:9303 BITNAMI_DEBUG: true broker-1: image: ${image} restart: ${restart} container_name: ${container_name}-broker-1 hostname: broker-1 env_file: - ./env/broker.env - ./env/common.env networks: kafka-net: ipv4_address: 172.99 .2 .1 ports: - 9201 :9094 volumes: - kafka-volume-broker-1:/bitnami/kafka/data - ./config/broker.properties:/opt/bitnami/kafka/config/server.properties.original environment: KAFKA_CFG_NODE_ID: 111 KAFKA_CFG_LISTENERS: BROKER://broker-1:9092,CLIENT://broker-1:9094 KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://broker-1:9092,CLIENT://${host_ip}:9201 BITNAMI_DEBUG: true broker-2: image: ${image} restart: ${restart} container_name: ${container_name}-broker-2 hostname: broker-2 env_file: - ./env/broker.env - ./env/common.env networks: kafka-net: ipv4_address: 172.99 .2 .2 ports: - 9202 :9094 volumes: - kafka-volume-broker-2:/bitnami/kafka/data - ./config/broker.properties:/opt/bitnami/kafka/config/server.properties.original environment: KAFKA_CFG_NODE_ID: 112 KAFKA_CFG_LISTENERS: BROKER://broker-2:9092,CLIENT://broker-2:9094 KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://broker-2:9092,CLIENT://${host_ip}:9202 BITNAMI_DEBUG: true broker-3: image: ${image} restart: ${restart} container_name: ${container_name}-broker-3 hostname: broker-3 env_file: - ./env/broker.env - ./env/common.env networks: kafka-net: ipv4_address: 172.99 .2 .3 ports: - 9203 :9094 volumes: - kafka-volume-broker-3:/bitnami/kafka/data - ./config/broker.properties:/opt/bitnami/kafka/config/server.properties.original environment: KAFKA_CFG_NODE_ID: 113 KAFKA_CFG_LISTENERS: BROKER://broker-3:9092,CLIENT://broker-3:9094 KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://broker-3:9092,CLIENT://${host_ip}:9203 BITNAMI_DEBUG: true broker-4: image: ${image} restart: ${restart} container_name: ${container_name}-broker-4 hostname: broker-4 env_file: - ./env/broker.env - ./env/common.env networks: kafka-net: ipv4_address: 172.99 .2 .4 ports: - 9204 :9094 volumes: - kafka-volume-broker-4:/bitnami/kafka/data - ./config/broker.properties:/opt/bitnami/kafka/config/server.properties.original environment: KAFKA_CFG_NODE_ID: 114 KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://broker-4:9092,CLIENT://${host_ip}:9204 KAFKA_CFG_LISTENERS: BROKER://broker-4:9092,CLIENT://broker-4:9094 BITNAMI_DEBUG: true broker-5: image: ${image} restart: ${restart} container_name: ${container_name}-broker-5 hostname: broker-5 env_file: - ./env/broker.env - ./env/common.env networks: kafka-net: ipv4_address: 172.99 .2 .5 ports: - 9205 :9094 volumes: - kafka-volume-broker-5:/bitnami/kafka/data - ./config/broker.properties:/opt/bitnami/kafka/config/server.properties.original environment: KAFKA_CFG_NODE_ID: 115 KAFKA_CFG_LISTENERS: BROKER://broker-5:9092,CLIENT://broker-5:9094 KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://broker-5:9092,CLIENT://${host_ip}:9205 BITNAMI_DEBUG: true
构建的 Kafka 集群使用 3 个控制器节点,5 个 Broker 节点。
1.2 .env
文件 1 2 3 4 5 image=bitnami/kafka:4.0.0 container_name=kafka-broker-kraft restart=unless-stopped # 宿主机 IP host_ip=192.168.99.99
使用 bitnami/kafka
Docker 镜像。
1.3 环境变量文件 1.3.1 控制器环境变量文件 1 2 3 4 5 6 7 8 9 10 11 # Kafka 节点的角色,控制器、Broker 或者同时是控制器和 Broker KAFKA_CFG_PROCESS_ROLES=controller # 以逗号分隔的控制器使用的监听器名称列表 KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # 指定每个监听器的安全协议 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT # 集群控制器 Id 和 endpoint,静态控制器仲裁,老方法 KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=101@controller-1:9093,102@controller-2:9093,103@controller-3:9093
1.3.2 Broker 环境变量文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 # Kafka 节点的角色,控制器、Broker 或者同时是控制器和 Broker KAFKA_CFG_PROCESS_ROLES=broker # 用于指定 Broker 间通信的监听器名称 KAFKA_CFG_INTER_BROKER_LISTENER_NAME=BROKER # 以逗号分隔的控制器使用的监听器名称列表 KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # 指定每个监听器的安全协议 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,CLIENT:PLAINTEXT # 集群控制器 Id 和 endpoint,静态控制器仲裁,老方法 KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=101@controller-1:9093,102@controller-2:9093,103@controller-3:9093
1.3.3 通用环境变量文件 1 2 # 集群 Id,该配置是该 kafka 镜像要求的 KAFKA_KRAFT_CLUSTER_ID=kafka-cluster-kraft-demo
1.4 properties 文件 1.4.1 控制器配置文件 额,控制器暂时不需要额外的配置属性。
1.4.2 Broker 配置文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 delete.topic.enable =true auto.create.topics.enable =true num.partitions =5 offsets.topic.num.partitions =50 default.replication.factor =3 offsets.topic.replication.factor =3 min.insync.replicas =2 num.recovery.threads.per.data.dir =8 log.retention.ms =2592100000 message.max.bytes =1048576 log.segment.bytes =104857600 auto.leader.rebalance.enable =true log.retention.check.interval.ms =300000 group.initial.rebalance.delay.ms =0 num.network.threads =3 num.io.threads =8 socket.send.buffer.bytes =102400 socket.receive.buffer.bytes =102400 socket.request.max.bytes =104857600
2、Docker 镜像中环境变量和配置文件的关系 Kafka 使用的 Docker 镜像选择的是 bitnami/kafka
,该镜像中可以同时使用 Docker 环境变量和 Kafka 配置文件,环境变量中的配置最终会覆盖配置文件中的配置,具体操作过程是在 Docker 构建镜像时的 Shell 脚本中指定的,并且 Kafka 配置文件的位置也是比较特殊的,有多个 Kafka 配置文件位置,配置文件存放在不同的位置,Shell 脚本的行为也是不同的。
Docker Compose 文件中可以看到如下文件挂载:
1 2 volumes: - ./config/broker.properties:/opt/bitnami/kafka/config/server.properties.original
将本地的 broker.properties
文件挂载到容器的 /opt/bitnami/kafka/config/server.properties.original
,容器启动后执行 Shell 脚本会从 server.properties.original
复制出 server.properties
文件,之后将环境变量中的配置追加到 server.properties
文件,对于重复的配置,会将环境变量中的配置覆盖 server.properties
中的原有配置。
先来看 Dockerfile 内容:
1 2 3 4 5 6 7 8 9 10 COPY rootfs / RUN /opt/bitnami/scripts/kafka/postunpack.sh ENTRYPOINT [ "/opt/bitnami/scripts/kafka/entrypoint.sh" ] CMD [ "/opt/bitnami/scripts/kafka/run.sh" ]
postunpack.sh
文件中有如下内容:
1 2 # Move the original server.properties, so users can skip initialization logic by mounting their own server.properties directly instead of using the MOUNTED_CONF_DIR mv "${KAFKA_CONF_DIR}/server.properties" "${KAFKA_CONF_DIR}/server.properties.original"
/opt/bitnami/kafka/config/
目录下原本是没有 server.properties
的,如果存在,表示该文件为 Compose 文件中挂载的文件,则将该文件重命名为 server.properties.original
。
entrypoint.sh
文件中有如下内容,会执行 setup.sh
1 2 3 4 5 if [[ "$*" = *"/opt/bitnami/scripts/kafka/run.sh"* || "$*" = *"/run.sh"* ]]; then info "** Starting Kafka setup **" /opt/bitnami/scripts/kafka/setup.sh info "** Kafka setup finished! **" fi
setup.sh
中有如下内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 # Map Kafka environment variables kafka_create_alias_environment_variables # Dynamically set node.id/broker.id/controller.quorum.voters if the _COMMAND environment variable is set kafka_dynamic_environment_variables # Set the default truststore locations before validation kafka_configure_default_truststore_locations # Ensure Kafka user and group exist when running as 'root' am_i_root && ensure_user_exists "$KAFKA_DAEMON_USER" --group "$KAFKA_DAEMON_GROUP" # Ensure directories used by Kafka exist and have proper ownership and permissions for dir in "$KAFKA_LOG_DIR" "$KAFKA_CONF_DIR" "$KAFKA_MOUNTED_CONF_DIR" "$KAFKA_VOLUME_DIR" "$KAFKA_DATA_DIR"; do if am_i_root; then ensure_dir_exists "$dir" "$KAFKA_DAEMON_USER" "$KAFKA_DAEMON_GROUP" else ensure_dir_exists "$dir" fi done # Kafka validation, skipped if server.properties was mounted at either $KAFKA_MOUNTED_CONF_DIR or $KAFKA_CONF_DIR [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/server.properties" && ! -f "$KAFKA_CONF_FILE" ]] && kafka_validate # Kafka initialization, skipped if server.properties was mounted at $KAFKA_CONF_DIR [[ ! -f "$KAFKA_CONF_FILE" ]] && kafka_initialize
其中有大量处理 Docker 环境变量的方法,这些方法都在 libkafka.sh
文件中。调用的 kafka_initialize
方法中会进行配置的替换和复制。在执行方法之前会判断 $KAFKA_CONF_FILE
方法是否存在。下面列出一些路径变量的取值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 export BITNAMI_ROOT_DIR="/opt/bitnami" export BITNAMI_VOLUME_DIR="/bitnami" export KAFKA_BASE_DIR="${BITNAMI_ROOT_DIR}/kafka" export KAFKA_VOLUME_DIR="/bitnami/kafka" export KAFKA_DATA_DIR="${KAFKA_VOLUME_DIR}/data" export KAFKA_CONF_DIR="${KAFKA_BASE_DIR}/config" export KAFKA_CONF_FILE="${KAFKA_CONF_DIR}/server.properties" export KAFKA_MOUNTED_CONF_DIR="${KAFKA_MOUNTED_CONF_DIR:-${KAFKA_VOLUME_DIR}/config}" export KAFKA_CERTS_DIR="${KAFKA_CONF_DIR}/certs" export KAFKA_INITSCRIPTS_DIR="/docker-entrypoint-initdb.d" export KAFKA_LOG_DIR="${KAFKA_BASE_DIR}/logs" export KAFKA_HOME="$KAFKA_BASE_DIR" export PATH="${KAFKA_BASE_DIR}/bin:${BITNAMI_ROOT_DIR}/java/bin:${PATH}"
kafka_initialize
方法内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 kafka_initialize() { info "Initializing Kafka..." # Check for mounted configuration files if ! is_dir_empty "$KAFKA_MOUNTED_CONF_DIR"; then cp -Lr "$KAFKA_MOUNTED_CONF_DIR"/* "$KAFKA_CONF_DIR" fi if [[ ! -f "${KAFKA_MOUNTED_CONF_DIR}/server.properties" ]]; then info "No injected configuration files found, creating default config files" # Restore original server.properties but remove Zookeeper/KRaft specific settings for compatibility with both architectures cp "${KAFKA_CONF_DIR}/server.properties.original" "$KAFKA_CONF_FILE" kafka_configure_from_environment_variables # ...... # Settings for each Kafka Listener are configured individually read -r -a protocol_maps <<<"$(tr ',' ' ' <<<"$KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP")" for protocol_map in "${protocol_maps[@]}"; do # ...... done # Configure Kafka using environment variables # This is executed at the end, to allow users to override properties set by the initialization logic kafka_configure_from_environment_variables else info "Detected mounted server.properties file at ${KAFKA_MOUNTED_CONF_DIR}/server.properties. Skipping configuration based on env variables" fi true }
kafka_configure_from_environment_variables
方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 kafka_configure_from_environment_variables() { # List of special cases to apply to the variables local -r exception_regexps=( "s/sasl\.ssl/sasl_ssl/g" "s/sasl\.plaintext/sasl_plaintext/g" ) # Map environment variables to config properties for var in "${!KAFKA_CFG_@}"; do key="$(echo "$var" | sed -e 's/^KAFKA_CFG_//g' -e 's/_/\./g' | tr '[:upper:]' '[:lower:]')" # Exception for the camel case in this environment variable [[ "$var" == "KAFKA_CFG_ZOOKEEPER_CLIENTCNXNSOCKET" ]] && key="zookeeper.clientCnxnSocket" # Apply exception regexps for regex in "${exception_regexps[@]}"; do key="$(echo "$key" | sed "$regex")" done value="${!var}" kafka_server_conf_set "$key" "$value" done }
该方法中会将 KAFKA_CFG_NUM_PARTITIONS
格式的环境变量循环遍历转换为 num.partitions
,并将调用 kafka_server_conf_set
方法将属性配置追加到指定文件中,kafka_server_conf_set
方法如下:
1 2 3 kafka_server_conf_set() { kafka_common_conf_set "$KAFKA_CONF_FILE" "$@" }
变量和配置文件中属性的追加和替换就是以如上方式实现的,实现这个还怪麻烦的,不知道是不是 bitnami 提供的所有 Docker 镜像都有这个功能,比如 Redis、PostgreSQL 等。如果都有的话那确实挺好的,之前用的一些官方镜像中都一律使用配置文件,还没有去探索是否实现了两者共存的方式。另外 Shell 脚本也不怎么能看懂一些语法,费劲。
3、集群测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 # 创建 topic $ kafka-topics.sh --bootstrap-server broker-1:9092 --create --replication-factor 3 --partitions 5 --topic test-topic Created topic test-topic. $ kafka-topics.sh --bootstrap-server broker-1:9092 --create --partitions 5 --topic test-topic $ kafka-topics.sh --bootstrap-server broker-1:9092 --create --topic test-topic Created topic test-topic. # 删除 Topic $ kafka-topics.sh --bootstrap-server broker-1:9092 --delete --topic test-topic # 获取 Topic 列表 $ kafka-topics.sh --bootstrap-server broker-1:9092 --list # 查看创建的 Topic 详情 $ kafka-topics.sh --bootstrap-server broker-1:9092 --describe --topic test-topic Topic: test-topic TopicId: zFA0MFqBSKmmDYXuoda9Bw PartitionCount: 5 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=104857600,retention.ms=2592000000,max.message.bytes=1048576 Topic: test-topic Partition: 0 Leader: 112 Replicas: 112,113,114 Isr: 112,113,114 Elr: LastKnownElr: Topic: test-topic Partition: 1 Leader: 113 Replicas: 113,114,115 Isr: 113,114,115 Elr: LastKnownElr: Topic: test-topic Partition: 2 Leader: 114 Replicas: 114,115,111 Isr: 114,115,111 Elr: LastKnownElr: Topic: test-topic Partition: 3 Leader: 115 Replicas: 115,111,112 Isr: 115,111,112 Elr: LastKnownElr: Topic: test-topic Partition: 4 Leader: 111 Replicas: 111,112,113 Isr: 111,112,113 Elr: LastKnownElr: # 生产者测试 $ kafka-console-producer.sh --bootstrap-server broker-1:9092 --topic test-topic > 111 > 222 > 333 > 444 > 555 # 消费者测试 $ kafka-console-consumer.sh --bootstrap-server broker-1:9092 --topic test-topic --from-beginning 111 222 333 444 555 # 查看内部 Topic __consumer_offsets $ kafka-topics.sh --bootstrap-server broker-1:9092 --describe --topic __consumer_offsets # 查看消费者组 $ kafka-consumer-groups.sh --bootstrap-server broker-1:9092 --group my-consumer-group --describe
三、其他 1、构建自己的 Kafka 镜像 2、监听器配置 相关链接 kafka/docker/examples/README.md at trunk · apache/kafka (github.com)
bitnami/kafka - Docker Image | Docker Hub
apache/kafka - Docker Image | Docker Hub
Docker 中部署 ZooKeeper 集群 | z2huo
Kafka Broker 配置 | z2huo
OB links [[Docker 中部署 ZooKeeper 集群]]
[[Broker 配置]]
[[chroot path]]
#Docker #Kafka #ZooKeeper #未完待续