kafka学习小结

简介

Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域

是 一个开源的 分 布式事件流平台 (Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用

作用

缓存/消峰、解耦和异步通信

架构

生产者 brokers(4.0版本后取消了zookeeper) 消费者

模式

点对点模式:

消费者主动拉取数据,消息收到后清除消息

发布/订阅模式:

可以有多个topic主题

消费者消费数据之后,不删除数据

每个消费者相互独立,都可以消费到数据

发布/订阅:

消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息 分为不同的类别,订阅者只接收感兴趣的消息

生产者

流程图

架构

一.main线程 包含

1.拦截器 处理一些拦截作用 一般不使用

2.序列化器 序列化数据

3.分区器 将数据分到不同的双端队列 每个双端队列默认16k 全部队列内存为32m

二.sender线程 拉取双端队列的数据到kafka集群

1.selector打通好每个双端队列对应的kafka集群中的broker和分区的通道

每个broker节点的每个分区作为一个key 每个key是一个拉取队列 在没有确认应答情况下最多发送5条

双端队列达到batch.size最大值时会触发发送一次 或者 达到时间后会发送一次 默认时0ms

当发送后需要确认应答 根据asks的应答级别来处理 如果失败则重试 如果成功删掉对应key队列的数据

方式

同步发送 数据发到分区器 并且分区器把数据发送到了kafka集群 下一批数据才能发到分区器

异步发送 数据发到分区器 不管分区器是否发了数据到kafka集群 都可以继续发送到分区器

数据分区

指定分区

根据key的hash值分区

随机分区

自定义分区

提高吞吐量方法

数据可靠性

-1级别中也有个问题 会有数据重复的问题

要求数据既不能重复也不丢失 那么需要幂等性和事务

幂等性

事务

数据有序

单分区内,有序; 多分区,分区与分区间无序

数据乱序

生产者小结

生产者有main线程和sender线程

main线程有拦截器 序列化器和分区器

分区器有双端队列 每个topic的每个分区是一个队列 队列大小16k 总体大小为64m

当达到队列大小或者等到时间到了 sender线程会读取kafka集群的元数据然后找到对应的topic的分区的leader所在的节点 然后发送数据 每次最多发送5条没有返回确定的数据

kafka集群收到消息后 根据ack级别返回确定 -1表示leader和follower全部都要落盘了才返回确定

leader会有isr队列 就是跟follower有心跳包回复的数据

生产者可以开启幂等性 就是发的消息带有一个主键《pid,分区id,自增序号》 每次kafka重启都会给每个生产者分配一个pid

保证了单分区单会话的唯一性

事务是生产者定义一个唯一的事务id   kafka里有一个事务管理器和一个存储事务信息的事务主题 有50个分区   根据事务id的hash值%50得到找哪个事务管理器

必须要开启幂等性

生产者开启事务

从事务管理器得到pid

发消息给对应的leader

跟事务管理器申请预提交 事务管理器给对应的事务主题提交准备标志

事务管理器向对应的leader申请提交

对应的leader返回确定

事务管理器向事务主题提交提交标志

通过事务id和事务管理器和二阶段提交 保证了事务要么成功要么失败

消费者不可消费还没确定提交的消息

broker

存储的信息

brokers下的ids和topics  controller下的数据都是比较关键的元数据

controller下记录哪个broker节点的controller是主controller 控制选举的过程

在消费者和生成者中也会有一份数据

工作流程

节点服役和退役

在服役和退役前需操作数据的转移

副本

概念

AR = ISR + OSR

ISR,表示和 Leader 保持同步的 Follower 集合

OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本

故障选举

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作

选举规则:在isr中存活为前提,按 照AR中排在前面的优先。例如 ar[1,0,2], isr [1,0,2],那么leader 就会按照1,0,2的顺序轮询

topic分区

文件存储

每个topic都多个分区

每个分区会都有对应的log 但防止log文件过大  所以每个分区下有多个segment

每个segment包括 .log日志文件 默认大小1g .index稀疏索引文件 .timeindex时间戳索引文件

注意:

1.index为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引。 参数log.index.interval.bytes默认4kb

2.Index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大, 因此能将offset的值控制在固定大小

文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整参数修改保存时间。

日志删除

基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳

基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。 log.retention.bytes,默认等于-1,表示无穷大

日志压缩

compact日志压缩:对于相同key的不同value值,只保留最后一个版本

高效读写数据

1.Kafka 本身是分布式集群,可以采用分区技术,并行度高

2.读数据采用稀疏索引,可以快速定位要消费的数据

3.顺序写磁盘 Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端, 为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这 与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间

4.页缓存 + 零拷贝技术

零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用 走应用层,传输效率高

PageCache页缓存:Kafka重度依赖底层操作系统提供的PageCache功 能。当上层有写操作时,操作系统只是将数据写入 PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存 都当做了磁盘缓存来使用

消费者

消费方式

consumer采用从broker中主动拉取数据  pull(拉)模 式

pull模式不足之处是,如 果Kafka没有数 据,消费者可能会陷入循环中,一直返回 空数据

工作流程

消费者组

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。 • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

消费者组初始化

coordinator:辅助实现消费者组的初始化和分区的分配。 coordinator节点选择 = groupid的hashcode值 % 50( __consumer_offsets的分区数量) 例如: groupid的hashcode值 = 1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator 作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset

消费者组消费过程

分区的分配和再平衡

一个consumer group中有多个consumer组成,一个 topic有多个partition组成,哪个consumer来消费哪个 partition的数据需要有个策略

Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky

可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用 多个分区分配策略

指的就是leader的consumer制作消费方案的过程

Range

首先对同一个 topic 里面的分区按照序号进行排序,并 对消费者按照字母顺序进行排序。

通过 partitions数/consumer数 来决定每个消费者应该 消费几个分区。如果除不尽,那么前面几个消费者将会多 消费 1 个分区

注意:如果只是针对 1 个 topic 而言,C0消费者多消费1 个分区影响不是很大。但是如果有 N 多个 topic,那么针对每 个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消 费的分区会比其他消费者明显多消费 N 个分区。 容易产生数据倾斜!

RoundRobin

RoundRobin 针对集群中所有Topic而言

RoundRobin 轮询分区策略,是把所有的 partition 和所有的 consumer 都列出来,然后按照 hashcode 进行排序,最后 通过轮询算法来分配 partition 给到各个消费者。

Sticky

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区 到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分 区不变化。

CooperativeSticky

采用合作式的分区重分配方式。当消费者组发生变更时,它不会一次性对所有分区进行重新分配,而是进行增量调整,只对受影响的分区进行移动。例如,当有新的消费者加入时,它会逐步将少量分区从现有消费者迁移到新消费者,避免了大规模的分区重新平衡,从而减少了消费者的停顿时间,提高了系统的可用

offset的维护

从0.9版本开始,consumer默认将offset保存在Kafka 一个内置的topic中,该topic为__consumer_offsets

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+ 分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行 compact,也就是每个 group.id+topic+分区号就保留最新数据

自动提交offset

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能

enable.auto.commit:是否开启自动提交offset功能,默认是true

auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

手动提交offset

commitSync(同步提交) 同步提交阻塞当前线程,一直到提交成 功,并且会自动失败重试(由不可控因素导致,也会出现提交失败) 必须等待offset提交完毕,再去消费下一批数据

commitAsync(异步提交) 异步提交则没有失败重试机制,故 有可能提交失败  发送完提交offset请求后,就开始消费下一批数据了

offset消费方式

指定 Offset 消费  auto.offset.reset = earliest | latest | none 默认是 latest

指定时间消费

漏消费和重复消费

重复消费:已经消费了数据,但是 offset 没提交

用了kafka自动提交offset

消费者消费了数据    kafka自动提交的时间间隔还没到 消费者的数据落盘后消费者挂了  没有提交到最新的offset   等下次消费者再消费时读取的offset是之前的位置 则重复消费了

可以用redis分布式锁 key为topic加分区id加offset  超时时间大于自动提交的时间

漏消费:先提交 offset 后消费,有可能会造成数据的漏消费

手动提交offset 先提交offset然后消费者数据还没落盘 挂了 导致漏消费

两种常见方式

用redis分布式锁 key为topic加分区id加offset   开启事务 并且在数据落盘和提交完offset才解锁

然后消费者开启事务 读取上次本地已消费的offset位置 如果小于当前的则执行 大于等于则抛弃

然后数据落地加本地再记录最新的offset落地 再提交事务 同时手动提交kafka的offset

还有一种就是消费者事务和kafka事务一起

说白了 重复消费用幂等性

漏消费就是事务处理完再提交offset

消费者事务

如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset 过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质

package main

import (
    "database/sql"
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "log"
    "os"
    "os/signal"
    "syscall"

    _ "github.com/go-sql-driver/mysql"
)

func main() {
    // 配置 Kafka 消费者
    kafkaConfig := &kafka.ConfigMap{
        "bootstrap.servers":        "localhost:9092",
        "group.id":                 "test_group",
        "auto.offset.reset":        "earliest",
        "enable.auto.commit":       false, // 禁用自动提交偏移量
        "isolation.level":          "read_committed",
        "transactional.id":         "consumer_transactional_id",
    }

    // 创建 Kafka 消费者实例
    consumer, err := kafka.NewConsumer(kafkaConfig)
    if err != nil {
        log.Fatalf("Failed to create consumer: %s", err)
    }
    defer consumer.Close()

    // 订阅 Kafka 主题
    topic := "test_topic"
    err = consumer.SubscribeTopics([]string{topic}, nil)
    if err != nil {
        log.Fatalf("Failed to subscribe to topic: %s", err)
    }

    // 配置 MySQL 数据库
    db, err := sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/your_database")
    if err != nil {
        log.Fatalf("Failed to connect to MySQL: %s", err)
    }
    defer db.Close()

    // 初始化 Kafka 事务
    err = consumer.InitTransactions(0)
    if err != nil {
        log.Fatalf("Failed to initialize transactions: %s", err)
    }

    // 处理系统信号,优雅退出
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

    run := true
    for run {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating\n", sig)
            run = false
        default:
            // 开始 Kafka 事务
            err = consumer.BeginTransaction()
            if err != nil {
                log.Printf("Failed to begin transaction: %s", err)
                continue
            }

            // 从 Kafka 拉取消息
            ev := consumer.Poll(100)
            if ev == nil {
                // 回滚事务
                err = consumer.AbortTransaction(0)
                if err != nil {
                    log.Printf("Failed to abort transaction: %s", err)
                }
                continue
            }

            switch e := ev.(type) {
            case *kafka.Message:
                // 开启 MySQL 事务
                tx, err := db.Begin()
                if err != nil {
                    log.Printf("Failed to begin MySQL transaction: %s", err)
                    // 回滚 Kafka 事务
                    err = consumer.AbortTransaction(0)
                    if err != nil {
                        log.Printf("Failed to abort Kafka transaction: %s", err)
                    }
                    continue
                }

                // 将消息插入到 MySQL 数据库
                _, err = tx.Exec("INSERT INTO your_table (message) VALUES (?)", string(e.Value))
                if err != nil {
                    log.Printf("Failed to insert message into MySQL: %s", err)
                    // 回滚 MySQL 事务
                    err = tx.Rollback()
                    if err != nil {
                        log.Printf("Failed to rollback MySQL transaction: %s", err)
                    }
                    // 回滚 Kafka 事务
                    err = consumer.AbortTransaction(0)
                    if err != nil {
                        log.Printf("Failed to abort Kafka transaction: %s", err)
                    }
                    continue
                }

                // 提交 MySQL 事务
                err = tx.Commit()
                if err != nil {
                    log.Printf("Failed to commit MySQL transaction: %s", err)
                    // 回滚 Kafka 事务
                    err = consumer.AbortTransaction(0)
                    if err != nil {
                        log.Printf("Failed to abort Kafka transaction: %s", err)
                    }
                    continue
                }

                // 提交 Kafka 偏移量
                err = consumer.SendOffsetsToTransaction([]kafka.TopicPartition{
                    {Topic: e.TopicPartition.Topic, Partition: e.TopicPartition.Partition, Offset: e.TopicPartition.Offset + 1},
                }, nil)
                if err != nil {
                    log.Printf("Failed to send offsets to transaction: %s", err)
                    // 回滚 Kafka 事务
                    err = consumer.AbortTransaction(0)
                    if err != nil {
                        log.Printf("Failed to abort Kafka transaction: %s", err)
                    }
                    continue
                }

                // 提交 Kafka 事务
                err = consumer.CommitTransaction(0)
                if err != nil {
                    log.Printf("Failed to commit Kafka transaction: %s", err)
                }

            case kafka.Error:
                fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
                if e.Code() == kafka.ErrAllBrokersDown {
                    run = false
                }
            default:
                fmt.Printf("Ignored event: %v\n", e)
            }
        }
    }

    fmt.Println("Closing consumer")
}
    

一段mysql作为kafka消费者的事务代码

数据积压

1.如果是Kafka消费能力不足,则可以考虑增 加Topic的分区数,并且同时提升消费组的消费者 数量,消费者数 = 分区数

2.如果是下游的数据处理不及时:提高每批次拉取的数 量。批次拉取数据过少(拉取数据/处理时间 < 生产速度), 使处理的数据小于生产的数据,也会造成数据积压

kraft

左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由 controller 进行 Kafka 集群管理。右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群, 而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接进 行 Kafka 集群管理

这样做的好处有以下几个: ⚫ Kafka 不再依赖外部框架,而是能够独立运行; ⚫ controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升; ⚫ 由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写能力限制; ⚫ controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强 controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策。

线程架构

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值