- 查看当前服务器中的所有topic
bin/kafka-topics.sh --list --bootstrap-server hadoop102:9092
- 创建topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092--create --replication-factor 3 --partitions 1 --topic flink-demo;
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数片
删除topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
需要在 server.properties 中设置delete.topic.enable=true,
否则只是标记删除
发送消息
bin/kafka-console-proucer.sh --broker-list hadoop102:9092 --topic first
消费消息
bin/kafka-console-consumer.sh \ --bootstrap-server hadoop102:9092 --topic first
bin/kafka-console-consumer.sh \ --bootstrap-server hadoop102:9092 --from-beginning --topic first
--from-beginning:会把主题中以往所有的数据都读出来
查看某个topic的详情
bin/kafka-topics.sh --zookeeper hadoop102:2181 --describer --topic first
修改分区数
bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
修改分区数
bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
验证生产的消息是否成功
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test
10.获取消费者组的id
./kafka-consumer-groups.sh --bootstrap-server haodoop102:9092,hadoop102:9092,hadoop104:9092 --list
11.查询__consumer_offsets 偏移量的位置信息
bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
__consumer_offsets topic的每一日志项的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]