讲完了消息的生产过程,接下来记录一下消息发送到Broker后如何存储和分发。

前言

首先看下RocketMQ Broker的消息存储目录结构,RocketMQ的消息存储在本地文件系统中,默认在当前用户主目录下的store目录中:

  • abort:该文件在Broker启动后自动创建,正常关闭自动消息,如果没有启动Broker的情况下看到此文件表示上次Broker是非正常关闭的
  • checkpoint:存储着commitlog,consumequeue,index文件的最后刷盘时间
  • commitlog:存放commitlog文件,消息就是存储在其中
  • consumerqueue:存放consumequeue文件,队列就是存放在其中
  • index:存放消息索引文件,支持根据key查询的依据
  • lock:运行期间使用到的全局资源锁

消息存储

如何保证高性能读写

传统的IO读写方式,存在多次上下文切换和多次数据的拷贝,在并发极高的MQ场景下,会严重影响读写效率。所以为了减少内核态<=>用户态切换,减少数据拷贝次数,引入了零拷贝技术。

零拷贝

零拷贝是一种思想,指的是CPU不需要先将数据从某处内存复制到另一个特定区域,实现零拷贝的方式有以下几种方式:

  • mmap()
  • sendfile()

mmap()

mmap(memory map) 是一种内存映射方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对应关系。
对应上图中内核缓冲区->用户缓冲区->Socket缓冲区的过程直接变成了内核缓冲区-(CPU拷贝)->Socket缓冲区,减少了一次拷贝所需的时间。
但整个过程中,上下文切换依旧是4次,相比于传统IO没有提升,切换过程:

  1. 当用户发起mmap调用的时候会发生上下文切换1,进行内存映射;
  2. 然后数据被拷贝到内核缓冲区,mmap返回,发生上下文切换2;
  3. 随后用户调用write,发生上下文切换3;
  4. 将内核缓冲区的数据拷贝到Socket缓冲区,write返回,发生上下文切换4。
FileChannel fileChannel = 
new RandomAccessFile("mmap_test.txt", "rw").getChannel(); 
MappedByteBuffer mappedByteBuffer = 
fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileChannel.size());

sendfile()

sendfile()和mmap()一样可以减少一次CPU拷贝,但它可以减少2次上下文切换,切换过程:

  1. 用户发起sendfile()调用时会发生切换1,之后数据通过DMA拷贝到内核缓冲区
  2. 之后再将内核缓冲区的数据CPU拷贝到Socket缓冲区,最后拷贝到网卡,sendfile()返回,发生切换2。
//调用transferTo方法向目标数据传输 
FileChannel channel = FileChannel.open(Paths.get("./test.txt"),
 StandardOpenOption.WRITE, StandardOpenOption.CREATE); 

channel.transferTo(position, len, target);

commitlog(mappedFile)

目录与文件

真正存储消息,一个Broker只有一个commitlog目录,所有Topic的消息都存放于此。

mappedFile大小1G,文件名由20位十进制数组成,表示当前文件的第一条消息的起始位置偏移量。

  • 第一个文件名一定是20位0构成,第一条消息的偏移量commitlog offset为0.
  • 当第一个文件放满时,会自动生成第二个文件继续存放消息,假设第一个1073741820字节(1G=1073741824字节),最后只剩4字节不够存放下一条数据。则下一个文件名为00000000001073741824

消息单元

consumequeue

存储topic消息在commitlog的位置索引。

为了提高效率,会为每个topic在~/store/consumequeue中创建一个目录{TopicId}/{QueueId}consumequeue文件是commitlog的索引文件,可以根据consumequeue定位到具体的消息在commitlog中的位置。

consumequeue文件名也是由20位数字组成,表示当前文件的第一个索引条目的起始位置偏移量,与mappedFile文件不同的是,其后续的文件名也是固定的,因为consumequeue文件大小是固定不变的。

每个consumequeue文件可以包含30w个索引条目,其中包含:

  • 消息在mappedFile文件中的偏移量commitlog offset
  • 消息长度
  • 消息Tag的hashcode值
    这三个属性占用20个字节,所以每个文件的大小是固定的30w * 20字节


对文件的读写

消息写入

一条消息进入到Broker后经历了以下几个过程才最终被持久化:

  • Broker根据queueId,获取到该消息对应索引条目要在consumequeue目录中的写入偏移量,即QueueOffset
  • 将queueId、queueOffset等数据,与消息一起封装为消息单元
  • 将消息单元写入到commitlog
  • 形成消息索引条目
  • 将消息索引条目分发到相应的consumequeue

消息拉取

当Consumer来拉取消息时会经历以下几个步骤:

  • Consumer获取到其要消费消息所在Queue的消费偏移量offset,计算出其要消费消息的消费offset

消费offset即消费进度,consumer对某个queue的消费offset,即消费到了该queue的第几条消息
消息offset = 消费offset + 1

  • Consumer向Broker发送拉取请求,其中会包含其要拉取消息的Queue、消息offset及消息Tag
  • Broker计算在该consumequeue中的queueOffset

queueOffset = 消息offset * 20字节

  • 从该queueOffset处开始向后查找第一个指定Tag的索引条目
  • 解析该索引条目的前8个字节,即可定位到该消息在commitlog中的commitlog offset
  • 从对应commitlog offset中读取消息单元,并发送给Consumer

刷盘机制

RocketMQ消息写入到Commitlog文件中时并不是直接写入到文件,而是先写到PageCache中,也就是前面IO图中的内核缓冲区,所以RocketMQ也和MySQL等类似的log刷盘机制

异步刷盘

写入到PageCache后直接返回生产者消息存储成功,另外的后台线程在将消息刷到磁盘。其提供了2套刷盘机制:

  • 固定时间,默认每隔0.5s会刷一次【默认方式】
  • 每存一次会通知刷盘,但不会等待结果,同时如果0.5s没有收到通知,也会主动刷盘。

同步刷盘

同步刷盘机制很好理解,即每次一定等写入成功磁盘才会返回生产者写入成功,此方式可靠性更高,但会一定程度上影响系统吞吐量。

后记

本文参考:
https://juejin.cn/post/7186880907582636069