前言
使用 Sarama 消费 Kafka 集群消息,除了 ConsumerGroup 方式外还可以使用 Consumer 方式。早期的Kafka版本只能用 Consumer 方式,特点是 Consumer 消费 topic 中所有的消息。N个 Consumer 消费同一个 topic ,相当于一个 topic 中的消息被消费了N遍。
创建 Consumer 消费消息示例
package main
import (
"log"
"github.com/IBM/sarama"
)
func main() {
// 指定topic
topic := "topic1"
// 指定kafka集群版本
config := sarama.NewConfig()
config.Version = sarama.DefaultVersion
consumer, err := sarama.NewConsumer([]string{"host1:9092", "host2:9092", "host3:9092"}, config)
if err != nil {
log.Printf("start consumer error: %v\n", err)
return
}
partList, err := consumer.Partitions(topic) // 取所有分区
if err != nil {
log.Printf("get list of partition error: %v\n", err)
return
}
var wg sync.WaitGroup
for partition := range partList { // 遍历所有分区
// 每分区创建一个消费者
load, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest)
if err != nil {
log.Printf("start partition %d consumer error:%v\n", partition, err)
return
}
defer load.AsyncClose()
wg.Add(1)
// 消费消息
// 示例代码,每个Partition仅消费一次,打印消息后退出
// 由于消息太长,仅打印前20个字节
go func(sarama.PartitionConsumer) {
defer wg.Done()
for msg := range load.Messages() {
log.Printf("Partition:%d Offset:%d Length:%v Value:%v\n", msg.Partition, msg.Offset, len(msg.Value), string(msg.Value[:20]))
break
}
}(load)
}
wg.Wait()
}
总结
需要注意 Consumer 消费完成后需要自行维护 offset ,使用msg.Offset
可以取到当前的offset。
sarama 还提供了两个特殊的 offset :sarama.OffsetOldest
和sarama.OffsetNewest
,表示 topic 最老和最新的 offset。