KAFKA分布式安装及使用¶
本文中,将选择ubuntu16.04(AMD64)操作系统作为例,进行KAFKA分布式部署安装及实践。 kafka依赖于zookeeper和java,因此在安装kafka之前,请确保zookeeper及java正常运行。具体java及zookeeper安装,可见hadoop ha分布式部署相关章节。我们本文中,安装 kafka使用hadoop用户安装。
1-软件及环境准备¶
1.1 本次使用三台服务器安装kafka。其中,master、master-0、slaver-1安装kafka集群,slaver-1、slaver-2、slaver-3安装zookeeper集群。
package hostname
kafka、java master
kafka、java master-0
kafka、java、zookeeper slaver-1
java、zookeeper slaver-2
java、zookeeper slaver-3
# kafka 版本2.12 zookeeper版本3.4.12 java 1.8
1.2 下载软件包。KAFKA源选取合适的包下载安装. 将kafka安装包在master节点解压到/opt目录,并改名为kafka。
$ tar -xzvf kafka_2.12-1.1.0.tgz -C /opt/
$ cd /opt
$ mv kafka_2.12-1.1.0/ kafka
1.3 修改kafka配置文件;
root@master:/opt/kafka/config# vi server.properties
root@master:/opt/kafka/config# grep -vE '^#|^$' server.properties
broker.id=1 #根据服务器配置,每个服务器号唯一
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=slaver-1:2181,slaver-2:2181,slaver-3:2181 #zookeeper 集群节点;
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
1.4 将kafka解压包使用scp命令拷贝至集群其他节点,并将文件mv至/opt目录下,修改配置文件中的broker.id项。
1.5 修改各个服务器文件目录权限。
root@master:/opt# chown -R hadoop-1:hadoop-1 kafka/
2-启动服务¶
2.1 启动zookeeper集群,并确保zookeeper正常运行。 2.2 登录到三台服务器上,使用hadoop用户启动kafka。
hadoop@master:/opt/kafka$ bin/kafka-server-start.sh config/server.properties & # 此为临时启动命令,关闭窗口后kafka停止运行,后台日志报错
# 后端运行命令。 # ./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &
# 1>/dev/null 2>&1 是将命令产生的输入和错误都输入到空设备,也就是不输出的意思。
hadoop@master:/opt/kafka$ jps
18097 NameNode
22690 Kafka
18538 ResourceManager
23246 Jps
18462 DFSZKFailoverController
3-创建TOPIC¶
创建一个名称为test-zhao的Topic,3个分区,并且复制因子为1,执行如下命令:
hadoop-1@master:/opt/kafka$ bin/kafka-topics.sh --create --zookeeper slaver-1:2181,slaver-2:2181,slaver-3:2181 --replication-factor 1 --partitions 3 --topic test-zhao
创建成功后,可在kafka数据目录查看,分别在master、master-0、slaver-1主机的kafka数据目录, 即配置文件中的log.dirs=/var/kafka-logs,形成partition为 test-zhao-0,test-zhao-1,test-zhao-2的文件夹, 文件夹下xxx.log是消息集文件, xxx.index 偏移量索引文件 ,xxx.timeindex 时间戳索引文件;
查看已创建的topic;
hadoop-1@master:/opt/kafka$ ./bin/kafka-topics.sh --list --zookeeper slaver-2:2181
test-zhao
# 查看topic信息。
hadoop-1@master:/var/log/kafka-logs/test-zhao-0$ /opt/kafka//bin/kafka-topics.sh --describe --zookeeper slaver-1:2181 --topic test-zhao
Topic:test-zhao PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test-zhao Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test-zhao Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: test-zhao Partition: 2 Leader: 3 Replicas: 3 Isr: 3
可以看到 partition0在id为1的broker上,其数据副本也在broker1上,并且broker1为leader状态。 我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
4-模拟客户端发送消息¶
Kafka自带一个命令行客户机,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每一行都将作为单独的消息发送。 使用如下指令发送消息。
hadoop-1@master:/opt/kafka$ bin/kafka-console-producer.sh --broker-list master:9092, master-0:9092, slaver-1:9092 --topic test-zhao
>cecgw-kafka-zhaoyuanjie-first
我们在master节点,模拟发送了”cecgw-kafka-zhaouanjie-first”的消息。我们通过字符串查找,可以看到,该消息落到了slaver-1节点日志中。
hadoop-1@slaver-1:/var/log/kafka-logs/test-zhao-2$ grep 'cecgw' ./ -R
Binary file ./00000000000000000000.log matches
可以看出该文件中,有发送的消息内容。通过kafka自带的命令,可以将二进制文件,转化为字符类型文件。
opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /var/log/kafka-logs/test-zhao-2/00000000000000000000.log --print-data-log
Dumping /var/log/kafka-logs/test-zhao-2/00000000000000000000.log
Starting offset: 0
offset: 0
position: 0
CreateTime: 1551680240403
isvalid: true
keysize: -1
valuesize: 29
magic: 2 #这个占用1个字节,主要用于标识 Kafka 版本。
compresscodec: NONE
producerId: -1
producerEpoch: -1
sequence: -1
isTransactional: false
headerKeys: []
payload: cecgw-kafka-zhaoyuanjie-first
# 查看index文件内容
hadoop-1@slaver-1:/var/log/kafka-logs/test-zhao-2$ /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /var/log/kafka-logs/test-zhao-2/00000000000000000000.index --print-data-log
Dumping /var/log/kafka-logs/test-zhao-2/00000000000000000000.index
offset: 0 position: 0
我们仅在slaver-1节点查找到了该字符串,因此数据备份因子为1生效。
5-启动消费者¶
同样,KAFKA可以使用命令行启动消费者服务。再启动一个shell终端,执行如下命令:
hadoop-1@slaver-1:/var/log/kafka-logs/test-zhao-2$ /opt/kafka/bin/kafka-console-consumer.sh --zookeeper slaver-1:2181, slaver-2:2181, slaver-3:2181 --from-beginning --topic test-zhao
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
cecgw-kafka-zhaoyuanjie-first
目前已经正常收到消息。
查看消费者组。
hadoop-1@master:/home/ubuntu$ /opt/kafka/bin/kafka-consumer-groups.sh --zookeeper slaver-1:2181 --list
Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).
console-consumer-40107
console-consumer-99536
console-consumer-60818
console-consumer-57442
testgroup
console-consumer-61493
console-consumer-79230
console-consumer-10997
console-consumer-20932
console-consumer-1732
console-consumer-86987
查看某一消费者组描述信息。
hadoop-1@master:/home/ubuntu$ /opt/kafka/bin/kafka-consumer-groups.sh --zookeeper slaver-1:2181 --group console-consumer-40107 --describe
Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
test-new 0 11 39996 39985 -
test-new 1 11 40002 39991 -
test-new 2 12 33335 33323 -
#消费的topic名称、partition id、consumer group最后一次提交的offset、最后提交的生产消息offset、消费offset与生产offset之间的差值、当前消费topic-partition的group成员id.
- 删除topic。
hadoop-1@master:/opt/kafka$ bin/kafka-topics.sh --delete --zookeeper slaver-1:2181 --topic test-zhao
Topic test-zhao is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[2019-03-04 15:05:49,125] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-03-04 15:05:49,172] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions (kafka.server.ReplicaFetcherManager)
[2019-03-04 15:05:49,172] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions (kafka.server.ReplicaAlterLogDirsManager)
[2019-03-04 15:05:49,177] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions test-zhao-0 (kafka.server.ReplicaFetcherManager)
[2019-03-04 15:05:49,177] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions test-zhao-0 (kafka.server.ReplicaAlterLogDirsManager)
[2019-03-04 15:05:49,180] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions (kafka.server.ReplicaFetcherManager)
[2019-03-04 15:05:49,180] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions (kafka.server.ReplicaAlterLogDirsManager)
[2019-03-04 15:05:49,181] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions test-zhao-0 (kafka.server.ReplicaFetcherManager)
[2019-03-04 15:05:49,181] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions test-zhao-0 (kafka.server.ReplicaAlterLogDirsManager)
[2019-03-04 15:05:49,219] INFO Log for partition test-zhao-0 is renamed to /var/log/kafka-logs/test-zhao-0.fd5fa204b9a54209afd39ced6263e026-delete and is scheduled for deletion (kafka.log.LogManager)
可以看到各个节点上的partition均已经删除掉。
7、创建一个复制因子为2,partition为3的主题:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic