kafka支持两种方式回溯。一种是基于消息偏移量回溯,一种是基于时间点的消息回溯。
基于消息偏移量回溯
在kafka的每个分区中,每条消息都有一个唯一的offset值 ,即消息偏移量,用来表示消息在partition分区中的位置。消费者每次消费了消息,都会把消费的此条消息的offset提交到broker(消息节点),用于记录消费到分区中的位置,下条消息从这个位置之后开始消费。所以基于消息偏移量回溯很简单,只需要重置offset,然后消费者会从该offset之后开始消费
基于时间点的消息回溯
要想讲清楚kafka基于时间点的消息回溯的原理,得先从kafka存储消息的文件格式开始讲。
kafka存储消息是以日志的形式存储的,每一个partition都对应一个日志,但是日志不是一个文件,是多个文件组成的。日志文件都存储在一个文件夹里面的,文件格式为: topic-0 。
其中topic是kafka对应的主题名称、0是partition所在的分区号。文件夹里面存储的是什么文件呢,日志分段文件、偏移量索引文件、时间戳索引文件。
日志分段文件
kafka消息存储在一个.log的日志文件中,但是随着日志文件越来越大不利于消息的维护与清理,也不利于集群扩容时消息的复制。所以kafka需要对日志进行分段。
日志分段文件名称的定义:
日志分段名称是由日志从这日志片段开始的基准偏移量( baseOffset )命名的,名称固定为 20 位数字。因为baseOffset是long型的,long型最大值19位,所以文件名20位即可满足所以的偏移量要求
例如:00000000000000000054.log
当kafka判断一个日志文件很大了时,就会重新开辟一个.log的日志文件进行消息写入,对老的.log文件设置为只读,不能写入。kafka判断消息需要分片的策略有4中,满足其一即可。
1.当前日志分段文件的大小超过了broker端参数 log.segmentbytes配置的值。log.segmentbytes参数的默认值为 1073741824,即lGB
2.当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于log.roll.hours。默认情况下,配置了 log.roll.hours参数,其值为168天。
3.偏移量索引文件或时间戳索引文件的大小达到broker端参数log.index.size.max.bytes配置的值。log.index.size.max.bytes 的默认值为10485760,即10MB
4.追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE,
偏移量索引文件
文件存储的是消息对应在物理磁盘的地址。是以key、value形式存在的。索引文件采用稀疏索引( sparse index )的方式构造消息的索引。它并不保证每个消息在索引文件中都有对应的索引 每当写入一定量(由 broker 端参数 log.index.interval.bytes 指定,默认值为 4096 ,即 4KB )的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。
文件格式:00000000000054.index 。其中54依然是索引文件中存在的第一个消息的offset(偏移量)
索引格式如下:
每个索引占用 8 个字节,分为两个部分。
1. relativeOffset :日志索引的相对偏移量。相对于文件名的基准偏移量来说的,比如索引的第一个消息,那么relativeOffset是0
2. position :消息存储在磁盘的物理位置
那现在应该知道,通过消息的偏移量怎么找到对应的日志分段文件,然后之后的所有消息都知道了。
例如现在日志分段文件是这样的: 那么对应的索引文件是这样的:
0000000000000000012.log 0000000000000000012.index
0000000000000000034.log 0000000000000000034.index
0000000000000000078.log 0000000000000000078.index
其中0000000000000000034.index偏移量索引文件内容如下:
那么我们要找到偏移量为56的消息怎么找呢?
1. 首先定位到0000000000000000034.log 的日志文件。那么它是怎么找到这个日志文件的呢,kafka中用跳跃表存储了日志文件baseOffset对应的日志文件名。通过跳跃表很容易查到不大于56的最大的baseOffset,然后定位到日志文件。可以计算出56相对于34的相对偏移量为22。
2. 既然定位到日志文件是0000000000000000034.log。那么索引文件也肯定是0000000000000000034.index。在步骤1中,已经算出了相对偏移量为22,即对应在index索引文件是relativeOffset。那么我们需要在0000000000000000034.index中找到不大于22的最大的relativeOffset,是按照第一索引消息顺序找到的,即定位到22的索引。拿到对应的position
3.拿到了对应的物理磁盘position,既能直接找到消息,然后顺序往后查找,既能拿到所有的消息进行消息回溯
时间索引文件
文件格式:文件格式:0001586662165087.timeindex 。其中1586662165087对应的是这个时间点生成的时间索引文件
存储的时间索引内容格式:
每个索 项占用 12 个字节,分为两个部分。
1.timestamp :时间戳。
2.relativeOffset :时间戳所对应的消息的相对偏移量。
那么是怎么通过时间戳找到定义的消息的呢?
我们先通过时间索引文件找到时间对应的offset偏移量,在通过偏移量索引文件找到消息位置。
例如我们要回溯2020-4-12 09:00:00的之后消息:
1.首先换算成时间戳为1586653200000
2.根据时间戳1586653200000在时间索引中找到不大于1586653200000的最大的偏移量
3.找了偏移量,按照偏移量索引讲解的步骤,逐一去查找,即可找到对应的消息position
4.通过position定位了消息,获取消息的生成时间,比1586653200000进行比对,然后按顺序逐渐和后面的消息一一进行时间戳比对,如果前一个消息的时间戳<1586653200000 & 后一个消息的时间戳 > 1586653200000 。 那么这个位置是就是消息回溯点,拿到消息的offset,对消费者消费记录的offset进行重置,那么整个回溯就完成了