Kafka-源码解析之-Broker文件存储(三)
摘要:消息存储对于每一款消息队列都非常重要,那么Kafka在这方面是如何来设计做到高效的呢?
Kafka这款分布式消息队列使用文件系统和操作系统的页缓存(page cache)分别存储和缓存消息,摒弃了Java的堆缓存机制,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。
而提起磁盘的文件系统,相信很多对硬盘存储了解的同学都知道:“一块SATA RAID-5阵列磁盘的线性写速度可以达到几百M/s,而随机写的速度只能是100多KB/s,线性写的速度是随机写的上千倍”,由此可以看出对磁盘写消息的速度快慢关键还是取决于我们的使用方法。
鉴于此,Kafka的数据存储设计是建立在对文件进行追加的基础上实现的,因为是顺序追加,通过O(1)的磁盘数据结构即可提供消息的持久化,并且这种结构对于即使是数以TB级别的消息存储也能够保持长时间的稳定性能。在理想情况下,只要磁盘空间足够大就一直可以追加消息。此外,Kafka也能够通过配置让用户自己决定已经落盘的持久化消息保存的时间,提供消息处理更为灵活的方式。
Kafka结构层级概述
下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果。
Kafka部分名词解释如下:
- (1)Broker:消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群;
- (2)Topic:主题是对一组消息的抽象分类,比如例如page view日志、click日志等都可以以topic的形式进行抽象划分类别。在物理上,不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可使得数据的生产者或消费者不必关心数据存于何处;
- (3)Partition:每个主题又被分成一个或者若干个分区(Partition)。每个分区在本地磁盘上对应一个文件夹,分区命名规则为主题名称后接“—”连接符,之后再接分区编号,分区编号从0开始至分区总数减-1;
- (4)LogSegment:每个分区又被划分为多个日志分段(LogSegment)组成,日志段是Kafka日志对象分片的最小单位;LogSegment算是一个逻辑概念,对应一个具体的日志文件(“.log”的数据文件)和一个索引文件(“.index”,表示偏移量索引文件)组成;日志文件是一个包含FileMessageSet的文件集实际的消息。索引文件是一个OffsetIndex,它从逻辑偏移量映射到物理文件位置。每个段都有一个基偏移量baseOffset,基偏移量是这个段中的任何消息的最小偏移量,而且大于上一段中的任何偏移量。
- (5)Offset:每个partition中都由一系列有序的、不可变的消息组成,这些消息被顺序地追加到partition中。每个消息都有一个连续的序列号称之为offset—偏移量,用于在partition内唯一标识消息(并不表示消息在磁盘上的物理位置);
- (6)Message:消息是Kafka中存储的最小最基本的单位,即为一个commit log,由一个固定长度的消息头和一个可变长度的消息体组成;
Kafka文件存储机制分析
分析过程分为以下4个步骤:
- topic中partition存储分布
- partiton中文件存储方式
- partiton中segment文件存储结构
- 在partition中如何通过offset查找message
通过上述4过程详细分析,我们就可以清楚认识到kafka文件存储机制的奥秘。
####topic中partition存储分布
在三台虚拟机上搭建完成Kafka的集群后(Kafka Broker节点数量为3个),通过在Kafka Broker节点的/bin下执行以下的命令即可创建主题和指定数量的分区以及副本:
1 | ./kafka-topics --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 3 --topic CommPair |
创建完主题、分区和副本后可以查到出主题的状态(该方式主要列举了主题所有分区对应的副本以及ISR列表信息):
1 | ./kafka-topics --describe --zookeeper 127.0.0.1:2181 --topic CommPair |
partiton中文件存储方式
通过在Kafka的config/server.properties配置文件中“log.dirs”指定的日志数据存储目录下存在三个分区目录,同时在每个分区目录下存在很多对应的日志数据文件和日志索引文件文件,具体如下:
1、分区目录文件1
2
3drwxr-x--- 2 root root 4096 Jul 26 19:35 CommPair-0
drwxr-x--- 2 root root 4096 Jul 24 20:15 CommPair-1
drwxr-x--- 2 root root 4096 Jul 24 20:15 CommPair-2
由上面可以看出,每个分区在物理上对应一个文件夹,分区的命名规则为主题名后接“—”连接符,之后再接分区编号,分区编号从0开始,编号的最大值为分区总数减1。每个分区又有1至多个副本,分区的副本分布在集群的不同代理上,以提高可用性。从存储的角度上来说,分区的每个副本在逻辑上可以抽象为一个日志(Log)对象,即分区副本与日志对象是相对应的。
- 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
- 每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。
这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。
partiton中segment文件存储结构
读者从2.2节了解到Kafka文件系统partition存储方式,本节深入分析partion中segment file组成和物理结构。
segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.
segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
下面文件列表是笔者在Kafka broker上做的一个实验,创建一个topicXXX包含1 partition,设置每个segment大小为500MB,并启动producer向Kafka broker写入大量数据,如下图2所示segment文件列表形象说明了上述2个规则:
下图是在三个Kafka Broker节点所组成的集群中分区的主/备份副本的物理分布情况图:
在partition中如何通过offset查找message
为了进一步查看“.index”偏移量索引文件、“.log”日志数据文件,可以执行下面的命令将二进制分段的索引和日志数据文件内容转换为字符型文件:
1、执行下面命令即可将日志数据文件内容dump出来1
2kafka-run-class kafka.tools.DumpLogSegments --files /data1/kafka/data/CommPair-0/00000000001050905938.index --print-data-log >00000000001050905938_txt.log
kafka-run-class kafka.tools.DumpLogSegments --files /data1/kafka/data/CommPair-0/00000000001050905938.log --print-data-log >00000000001050905938_txt.index
2、dump出来的具体日志数据内容1
2
3
4
5
6
7
8
9
10
11
12
13tail -20 00000000001050905938_txt.log
Dumping /data1/kafka/data/CommPair-0/00000000001050905938.log
Starting offset: 1050905938
offset: 1050905938 position: 0 isvalid: true payloadsize: 127 magic: 1 compresscodec: NoCompressionCodec crc: 2108247934 payload: 00000102-363aa04e-4467-41a9-828d-b131281ca9b6^2019-01-22 14:15:32^2019-01-22 14:21:02^UDP^172.16.140.202^49633^224.0.0.252^5355
...
offset: 1050906189 position: 37479 isvalid: true payloadsize: 126 magic: 1 compresscodec: NoCompressionCodec crc: 2291970542 payload: 00000102-363aa04e-4467-41a9-828d-b131281ca9b6^2019-01-22 14:20:04^2019-01-22 14:26:03^UDP^172.16.140.41^54911^224.0.0.252^5355
tail -20 00000000001050905938_txt.index
Dumping /data1/kafka/data/CommPair-0/00000000001050905938.index
offset: 1050906189 position: 37479
offset: 1050906505 position: 86271
offset: 1050906759 position: 125398
offset: 1050907068 position: 172701
由上面dump出来的偏移量索引文件和日志数据文件的具体内容可以分析出来,偏移量索引文件中存储着大量的索引元数据,日志数据文件中存储着大量消息结构中的各个字段内容和消息体本身的值。索引文件中的元数据postion字段指向对应日志数据文件中message的实际位置(即为物理偏移地址)。
- 1.日志数据文件
Kafka将生产者发送给它的消息数据内容保存至日志数据文件中,该文件以该段的基准偏移量左补齐0命名,文件后缀为“.log”。分区中的每条message由offset来表示它在这个分区中的偏移量,这个offset并不是该Message在分区中实际存储位置,而是逻辑上的一个值(Kafka中用8字节长度来记录这个偏移量),但它却唯一确定了分区中一条Message的逻辑位置,同一个分区下的消息偏移量按照顺序递增(这个可以类比下数据库的自增主键)。另外,从dump出来的日志数据文件的字符值中可以看到消息体的各个字段的内容值。
- 2.偏移量索引文件
如果消息的消费者每次fetch都需要从1G大小(默认值)的日志数据文件中来查找对应偏移量的消息,那么效率一定非常低,在定位到分段后还需要顺序比对才能找到。Kafka在设计数据存储时,为了提高查找消息的效率,故而为分段后的每个日志数据文件均使用稀疏索引的方式建立索引,这样子既节省空间又能通过索引快速定位到日志数据文件中的消息内容。偏移量索引文件和数据文件一样也同样也以该段的基准偏移量左补齐0命名,文件后缀为“.index”。
从上面dump出来的偏移量索引内容可以看出,索引条目用于将偏移量映射成为消息在日志数据文件中的实际物理位置,每个索引条目由offset和position组成,每个索引条目可以唯一确定在各个分区数据文件的一条消息。其中,Kafka采用稀疏索引存储的方式,每隔一定的字节数建立了一条索引,可以通过“index.interval.bytes”设置索引的跨度;
有了偏移量索引文件,通过它,Kafka就能够根据指定的偏移量快速定位到消息的实际物理位置。具体的做法是,根据指定的偏移量,使用二分法查询定位出该偏移量对应的消息所在的分段索引文件和日志数据文件。然后通过二分查找法,继续查找出小于等于指定偏移量的最大偏移量,同时也得出了对应的position(实际物理位置),根据该物理位置在分段的日志数据文件中顺序扫描查找偏移量与指定偏移量相等的消息。下面是Kafka中分段的日志数据文件和偏移量索引文件的对应映射关系图(其中也说明了如何按照起始偏移量来定位到日志数据文件中的具体消息)。
segment的索引文件中存储着大量的元数据,数据文件中存储着大量消息,索引文件中的元数据指向对应数据文件中的message的物理偏移地址。以索引文件中的3,497为例,在数据文件中表示第3个message(在全局partition表示第368772个message),以及该消息的物理偏移地址为497。
从上述图3了解到segment data file由许多message组成,下面详细说明message物理结构如下:
参数说明:
关键字 解释说明
8 byte offset 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校验message
1 byte “magic” 表示本次发布Kafka服务程序协议版本号
1 byte “attributes” 表示为独立版本、或标识压缩类型、或编码类型。
4 byte key length 表示key的长度,当key为-1时,K byte key字段不填
K byte key 可选
value bytes payload 表示实际消息数据。
例如读取offset=368776的message,需要通过下面2个步骤查找。
第一步查找segment file 上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找 文件列表,就可以快速定位到具体文件。 当offset=368776时定位到00000000000000368769.index|log
第二步通过segment file查找message 通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。
从上述图3可知这样做的优点,segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。
总结
Kafka中读写message有如下特点:
- 写message
消息从java堆转入page cache(即物理内存)。
由异步线程刷盘,消息从page cache刷入磁盘。
- 读message
消息直接从page cache转入socket发送出去。
当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁 盘Load消息到page cache,然后直接从socket发出去
- Kafka高效文件存储设计特点
- Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
- 通过索引信息可以快速定位message和确定response的最大大小。
- 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
- 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。