安装kafka、zookeeper前,需要先安装jdk,建议使用jdk1.8
网络规划
- zookeeper集群:192.168.31.101、192.168.31.102、192.168.31.103
kafka集群:192.168.31.104、192.168.31.105、192.168.31.106开发测试环境建议关闭防火墙和iptables,执行以下命令
# systemctl status firewalld.service # systemctl stop firewalld.service # systemctl disable firewalld.service # systemctl status iptables.service # systemctl stop iptables.service # systemctl disable iptables.service
下载
-
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz wget https://archive.apache.org/dist/kafka/0.10.1.1/kafka_2.10-0.10.1.1.tgz
解压
-
mkdir -p /opt/dms tar -xzvf kafka_2.10-0.10.2.0.tgz -C /opt/dms/ tar -xzvf zookeeper-3.4.10.tar.gz -C /opt/dms/
部署zookeeper集群
-
- zookeeper三个节点分别配置zoo.cfg文件,执行以下命令
cat >/opt/dms/zookeeper-3.4.10/conf/zoo.cfg <<EOF # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/opt/dms/zookeeper-3.4.10/data # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 server.1=192.168.31.101:2888:3888 server.2=192.168.31.102:2888:3888 server.3=192.168.31.103:2888:3888 EOF
- zookeeper三个节点分别创建myid文件,文件内容分别为1,2,3,注意修改下面脚本的数字。
cat >/opt/dms/zookeeper-3.4.10/data/myid <<EOF 1 EOF
- 启动zookeeper集群,并查询状态
sh bin/zkServer.sh start sh bin/zkServer.sh status
- 关闭当前zookeeper节点,测试zookeeper集群是否正常,follow和leader两种状态。
sh bin/zkServer.sh stop
- 在kafka节点查看zookeeper的版本信息
telnet 192.168.31.101 2181 srvr
到此,zookeeper配置成功。
部署kafka集群
-
- 分别修改三个kafka节点的server.properties配置文件
vim config/server.properties
192.168.31.104 kafka节点:
broker.id=0 delete.topic.enable=true listeners=PLAINTEXT://192.168.31.104:9092 port=9092 advertised.listeners=PLAINTEXT://192.168.31.104:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/dms/kafka_2.10-0.10.2.0/logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181 zookeeper.connection.timeout.ms=6000
192.168.31.105 kafka节点:
broker.id=1 delete.topic.enable=true listeners=PLAINTEXT://192.168.31.105:9092 port=9092 advertised.listeners=PLAINTEXT://192.168.31.105:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/dms/kafka_2.10-0.10.2.0/logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181 zookeeper.connection.timeout.ms=6000
192.168.31.106 kafka节点:
broker.id=2 delete.topic.enable=true listeners=PLAINTEXT://192.168.31.106:9092 port=9092 advertised.listeners=PLAINTEXT://192.168.31.106:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/dms/kafka_2.10-0.10.2.0/logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181 zookeeper.connection.timeout.ms=6000
- 启动kafka集群
sh bin/kafka-server-start.sh -daemon ../config/server.properties
查看kafka进程信息
ps -ef|grep -v grep|grep kafka
在zookeeper节点查看kafka集群brokers信息,由于192.168.31.105、192.168.31.106 kafka节点未启动,故查看到一个broker.
sh bin/zkCli.sh -server localhost:2181 ls /brokers/ids
- 创建topic
创建一个叫做my-topic的主题,包含1个分区,每个分区拥有1个副本
sh bin/kafka-topics.sh --zookeeper 192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181 --create --topic my-topic --replication-factor 1 --partitions 1
- topic增加分区
sh bin/kafka-topics.sh --zookeeper 192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181 --alter --topic my-topic --partitions 3
- 删除topic
sh bin/kafka-topics.sh --zookeeper 192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181 --delete --topic my-topic
- 列出集群里所有的topic
sh bin/kafka-topics.sh --zookeeper 192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181 --list
- 列出topic详细信息
sh bin/kafka-topics.sh --zookeeper 192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181 --describe --topic my-topic
- 生产消息
sh bin/kafka-console-producer.sh --broker-list 192.168.31.104:9092,192.168.31.105:9092,192.168.31.106:9092 --topic my-topic
- 消费消息(新版使用bootstrap-server,旧版使用zookeeper)
sh bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.104:9092,192.168.31.105:9092,192.168.31.106:9092 --from-beginning --topic sh bin/kafka-console-consumer.sh --zookeeper 192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181 --from-beginning --topic my-topic