Golang 使用 Sarama 消费 Kafka 集群消息 (2)


前言

使用 Sarama 消费 Kafka 集群消息,除了 ConsumerGroup 方式外还可以使用 Consumer 方式。早期的Kafka版本只能用 Consumer 方式,特点是 Consumer 消费 topic 中所有的消息。N个 Consumer 消费同一个 topic ,相当于一个 topic 中的消息被消费了N遍。

创建 Consumer 消费消息示例

package main

import (
	"log"

	"github.com/IBM/sarama"
)

func main() {
	// 指定topic
	topic := "topic1"
	
	// 指定kafka集群版本
	config := sarama.NewConfig()
	config.Version = sarama.DefaultVersion
	
	consumer, err := sarama.NewConsumer([]string{"host1:9092", "host2:9092", "host3:9092"}, config)
	if err != nil {
		log.Printf("start consumer error: %v\n", err)
		return
	}
	partList, err := consumer.Partitions(topic) // 取所有分区
	if err != nil {
		log.Printf("get list of partition error: %v\n", err)
		return
	}

	var wg sync.WaitGroup
	for partition := range partList { // 遍历所有分区
		// 每分区创建一个消费者
		load, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest)
		if err != nil {
			log.Printf("start partition %d consumer error:%v\n", partition, err)
			return
		}
		defer load.AsyncClose()
		wg.Add(1)

		// 消费消息
		// 示例代码,每个Partition仅消费一次,打印消息后退出
		// 由于消息太长,仅打印前20个字节
		go func(sarama.PartitionConsumer) {
			defer wg.Done()
			for msg := range load.Messages() {
				log.Printf("Partition:%d Offset:%d Length:%v Value:%v\n", msg.Partition, msg.Offset, len(msg.Value), string(msg.Value[:20]))
				break
			}
		}(load)
	}
	wg.Wait()
}

总结

需要注意 Consumer 消费完成后需要自行维护 offset ,使用msg.Offset可以取到当前的offset。

sarama 还提供了两个特殊的 offset :sarama.OffsetOldestsarama.OffsetNewest,表示 topic 最老和最新的 offset。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值