本文由阿里云钉群直播整理而来。
讲师介绍:
丁威:中通科技技术平台部资深架构师。《RocketMQ技术内幕》作者,社区直播讲师。开源爱好者,关注分布式、云计算、大数据领域。目前主要负责消息中间件与全链路压测的实施与落地。
本次分享将主要围绕以下四个方面展开
1、如何学习RocketMQ之我所见。
2、路由注册、发现、剔除设计模式。
3、消息发送高可用设计。
4、RocketMQ存储设计。
5、RocketMQ消息消费。
6、RocketMQ HA(主从同步)。
一、如何学习RocketMQ之我所见
为大家介绍自身学习RocketMQ经验,给大家学习提供了借鉴的思路。
首先是通读RocketMQ官方文档,特别是RocketMQ 3.x版本设计手册,从全局了解RocketMQ的设计理念,需要解决的问题等。从官方文档设计理念,大家会发现官方文档中不仅囊括了RocketMQ,还包括了MQ中间件涉及的各个方面,比如MQ通用的角色如Prodcuer消息生产者,Consumer消息消费者,Push Consumner推模式,Pull Consumner拉模式,Producer Group生产者组,Consumner Group消费者组通过这些名词介绍你就会对RocketMQ有一个整体的了解,了解RocketMQ要解决那些问题,如何订阅和发布的实现机制是什么,还有RocketMQ存储特点零拷贝原理,相信大家肯对会这些疑问产生好奇心,提出问题。通过反复阅读官方文档对RocketMQ的整体有大概的认识,同时也会给大家带来一些思考,如果让你来实现这些功能你会怎么做,如果自己不会,是不是可以带着这个问题看看RocketMQ是怎样实现的。这时大家会发现学习新东西不是非常困难。
其次下载RocketMQ源码不要立马查看源码,大家可以重点关注example包这是官方提供的使用示例。在阅读RocketMQ源码中,首先关注的是example官方示例,通过对官方提供示例的学习可以知晓RocketMQ的使用方式,注意事项,从而达到使用目的。但在大家学会使用之后要想驾驭RocketMQ并且能够处理工作中遇到得各种问题,分析源码是最好的方式。首先通过分析源码的过程中大家通过他的实现细节,了解工作原理方便为以后生产实际工作中在出现问题时提供解决问题的思路和方法。另一方面是RocketMQ的代码质量非常高,RocketMQ拥有高性能。为了实现高性能会涉及到很多方面,比如说RocketMQ在多线程方面的实践,在高并发编程中基于文件的设计模式,基于Nitty的网络通信等待这些在分析源码的过程中能够提升大家的工作中处理问题的能力,对我们大家自身编程能力的提升是非常有帮助的。在这里特别提示分析源码首先要有一定给分布式的基础,在大家会发现分析源码有难度时,介绍了自身如何通过六个月的时间打下坚实的基础,真正看懂源码的过程。还介绍到个人博客,大家可以通过逆序查看从而了解讲师在查看RocketMQ源码之前做了那些准备工作,比如集合,锁,Netty基础,对于学习分布式系统同学,这些基础都是需要掌握的。
如果大家对源码实在看不懂时又想全面的学习RocketMQ的知识体系,可以通过阅读《RocketMQ技术内幕》一书来进行学习,书中对RocketMQ的原理,设计理念,实践细节讲的非常透彻,相信对大家的学习会起到一定的帮助。
官方地址:http://rocketmq.apache.org/
讲师CSDN地址:https://me.csdn.net/prestigeding
二、路由注册、发现、提出设计模式
首先介绍了业界我们接触经常到的一些中间件的服务注册-发现的实现原理。在此这列举了Dubbo中注册-发现机制中的实时推送模式。介绍Dubbo是如何提供服务的,首先服务提供者启动时会向注册中心发送消息注册自己也就是告知注册中心自己可以提供服务了,那如何注册呢?注册中心在收到服务提供者发送的消息后会创建节点名称为的服务提供者的全路径名例如com.springboot.dubbo.demo.Demoservice文件,同时在对应节点目录产生四个节点如下图所示。
在服务提供者启动时通过事件机制,发送事件消息的方式推送给注册中心,注册中心收到消息后在对应providers目录增加一条记录保存该服务。对于服务消费者通过订阅注册中心消息知晓那些服务提供者可以提供服务从而服务被消费者远程调用。如果注册中心providers下面的节点增加或者是减少时,注册中心都会通过事件机制发送消息及时的通知到服务消费者。同时为了保证服务高可用,服务提供者每30s向注册中心发送消息告知自己的状态正常。这种模式的优点是实时性非常高,只要提供者有变动,消费者都能及时的知道,缺点就是服务注册实现较复杂,至少每个服务都需要具备消息的发布-订阅能力,而且,像zookeeper内部实现复杂,并不适用于RocketMQ,有点大财小用。
RocketMQ核心概念
1)Topic:消息主题,一级消息类型,生产者向其发送消息。
2)生产者:也称为消息发布者,负责生产并发送消息至主题Topic。
3)消费者:也称为消息订阅者,负责从主题Topic 接收并消费消息。
4)消息:生产者向 主题Topic 发送并最终传送给消费者的数据和(可选)属性的组合。
5)消息属性:生产者可以为消息定义的属性,包含 Message Key 和 Tag。
6)Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。
7)生产者集群:用来表示发送消息应用,一个生产者集群下包含多个生产者实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个生产者对象。
一个生产者集群可以发送多个主题Topic 消息。发送分布式事务消息时,如果生产者中途意外宕机,消息存储者Broker 会主动回调生产者集群的任意一台机器来确认事务状态。
8)消费者集群:用来表示消费消息应用,一个消费者集群下包含多个消费者实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个消费者对象。一个消费者集群下的多个消费者以均摊方式消费消息。
如果设置的是广播方式,那么这个消费者集群下的每个实例都消费全量数据。
RocketMQ如何事件服务发现-注册-拉取模式
RocketMQ使用拉取模式实现主题Topic路由有什么缺点呢?
1.主题Topic路由中心(NameServer)Topic是基于最终一致性,极端情况下会出现数据不一致。
2.客户端无法实时感知路由信息的变化,例如某台消息存储Brocker自身进程为关闭,但停止向NameServer发送心跳包,但生产者无法立即感知该Brocker服务器的异常,会对消息发送造成一定的可用性?
RocketMQ并不打算解决上述问题,因为基于上述的设计,RocketMQ NameServer的实现非常简单高效,至于消息发送的高可用,则有消息发送客户端自己保证。
RocketMQ的设计遵循的一个设计理念:崇尚“缺陷美”,简单,高性能。
如果在知道定时拉取模式的不足时,有哪些方法方式去解决这些问题,带着这些问题去研究RocketMQ源码可以获得更大的收获,事半功倍。
三、消息发送高可用设计
问题:RocketMQ消息发送如何实现高可用?
答案:RocketMQ消息发送分三步首先是Topic路由寻址,其次是选择消息队列,最后是消息发送重试,Broker规避。比如存在主题Topic A有两条路由存储消息的Broker A 和存储消息的Broker B 共8个队列(Broker A q1, Broker A q2, Broker A q3,Broker A q4, Broker B q1, Broker B q2, Broker B q3,Broker B q4)。在RocketMQ客户端向RocketMQ集群发送消息的时候,首先要选择队列对于多个队列的选择系统默认使用轮询机制。比如发送第一条消息时如果选择Broker A收到了消息,那么发送第二条消息则会选择Broker B,发送第三条消息重新开始一轮选择Broker A发送,依次不断轮询发送,这也是RocketMQ默认的负载均衡机制。
如果RocketMQ客户端选择Broker A q1发送一条消息后,Broker A因为一些其他的原因导致Broker A不可用,RocketMQ客户端尝试进行重新发送,RocketMQ客户端第一次选择Broker A q2发送,第二次RocketMQ客户端选择Broker A q2发送,发现第一次和第二次都失败,RocketMQ客户端会重试两次,共发送三次。当Broker A故障导致不可用时,无论对Broker A重试多少次都会失败,RocketMQ客户端重试三次失败,则该消息被告知发送失败。RocketMQ采用规避机制解决次问题,首先RocketMQ客户端第一次向Broker A发送消息失败时在第二次选择Broker时会规避掉Broker A 的所有队列(Broker A q1, Broker A q2, Broker A q3,Broker A q4),也就是说Broker A的所有队列不参加选择,也就是第二次选择会选择Broker B 上的队列,这样可以保证第一次消息发送失败后,第二次可以成功发送消息,从而实现高可用。RocketMQ为了保证高可用提供了另外一种机制设置一个时间在RocketMQ客户端第一次向Broker A发送消息失败后在设定时间内RocketMQ客户端不再向不可用的Broker A发送消息,进一步保证高可用。
建议大家看发送消息的源码可用重点关注上面提到的高可用的机制,进一步探寻RocketMQ高可用的设计思想重试加规避。
四、RocketMQ存储设计设计
RocketMQ的存储设计是RocketMQ的最重要的部分,采取了一种数据与索引分离的存储方法。有效降低文件资源、IO资源,内存资源的损耗。即便是阿里这种海量数据,高并发场景也能够有效降低端到端延迟,并具备较强的横向扩展能力。作为一款高性能的MQ要具有一下特点。
1.吞度量Tps很高,能够支持高并发。
2.响应延时要很短。
3.支持海量消息的堆积能力。
以上特点离不开RocketMQ存储机制。
首先介绍RocketMQ存储设计整体组织方式
其次是RocketMQ存储设计之CommitLog文件,CommitLog是消息存储文件,所有主题Topic消息到达Broker后按顺序存储在CommitLog文件中的。每个CommitLog文件默认大小为1GB,固定文件大小方便内存映射 。通过对RocketMQ源码分析,学习RocketMQ如何完成内存映射的实现方式给大家一些借鉴的思想。RocketMQ对CommitLog这样的定长文件理解为一个逻辑的物理文件,巧妙的构造了文件名,比如第一文件名是00000000000000000000是以物理磁盘上文件的偏移量为文件名,对于第二个文件名0000000000010733741824就是以第一文件的偏移量作为文件名的。RocketMQ对CommitLog这样设计的优点能够快速定位到一条消息到达Brocker后落在那个文件中,拥有很高检索的效率。
对于一条按顺序写入CommitLog文件的消息,虽然极大的提高了文件的写入性能,但对于消息读取消息就会很慢,为了解决读取速度慢的问题RocketMQ引入ConsumeQueue文件类似于kaffka的队列文件称为消息消费队列文件。ConsumeQueue文件是对于CommitLog文件的基于Topic的索引文件,主要用于消费者根据Topic消息消费时,其组织方式为/topic/queue,同一队列存在多个文件,ConsumeQueue设计及其巧妙,每个条目使用固定长度(8字节CommitLog物理偏移量,4字节消息长度,8字节tag的 hashCode),这里不是存储tag的原始字符串,而是存储hashCode,目的就是确保每个条目的长度固定,可以使用访问类似数组下标的方式快速定位条目,极大的提高CommitLog的读取性能,试想一下,消息消费者根据Topic,消息消费进度(ConsumeQueue逻辑偏移量),即第几个ConsumeQueue条目,这样根据消费进度去访问消息的方法为使用逻辑偏移量logicOffset*20即可找到条目的起始偏移量(ConsumeQueue文件中的偏移量),然后读取该偏移量后20个字节即得到了一个条目,无需遍历整个ConsumeQueue文件。
相信大家在探究源码的过程中深刻理解上面的设计理念
然后是RocketMQ基于文件的Hash索引。类比mysql的Hash索引方式,基于文件的HashMap方式。提供了通过消息属性检索消息的机制,使用定长的方式,可以像使用数组一样去方便的检索。比如想要把一个订单编号的数据,首先要把订单编号的HashCode放在,通过HashCode在500W个Hash槽中取出一个,再去判断取出的hash槽中有没有消息,如果hash槽位为-1则有数据,
37:24IndexFile文件基于物理磁盘文件按实现Hash索引。其文件有40字节的文件,500W个Hash槽组成。每个hash槽为4个字节,最后由2000W的Index条目组成,每个条目由20个字节构成,分别为4字节索引key的HashCode、8字节消息物理偏移量、4字节时间戳、4字节的前一个Index条目(Hash冲突的链表结构)。
1、 内存映射。
2 、基于文件定长设计,应用数组的结构,方便检索。
3 基于HashCode的设计。
五、RocketMQ消息消费设计
首先介绍RocketMQ消息消费概要。消息消费通常需要考虑消息队列负载、消费模式、拉去机制、消息过滤、消息消费(处理消息)、消费进度反馈、消息消费限流等方面。
1.消息队列负载模式:RocketMQ集群内(同一消费组内)的消费者共同承担主题Topic下所有消息的消费,即一条消息只能被集群中一个消费者消费。
RocketMQ的队列负载原则是一个消费者可以共同承担同一主题下的多个消息消费队列,但同一个消息消费队列同一时间只允许被分配给一个消费者。
2.消息消费模式:RocketMQ执行集群消费和广播消费两种模式。
3.消息拉取模式:RocketMQ消息拉取支持推、拉两种模式,其本质为拉模式。
4.消息消费:RocketMQ支持顺寻消息、并发消息两种模式,每个消费组使用独立的线程池来处理拉取到的消息。
5.RocketMQ的消息消费端的限流主要包含两个维度:
1)消息堆积数量
如果消息消费处理队列中的消息条数超过1000条会出发消费端的流控,其具体做法是放弃本次拉取动作,并且延迟50ms后将放入该拉取任务放入到pullRequestQueue中,每1000条流控会打印一次消费端流控日志。
2)消息堆积大小
如果处理队列中堆积的消息总内存大小超过100M,同样触发一次流控。
并发消息拉取与消费流程
首先一个消费客户端有两个线程(PullMessageService线程和RebalanceService线程)工作,PullMessageService线程负责拉取消息,从阻塞队列pullRequestQueue中通过take的方式获取一条拉取消息的任务,如果队列pullRequestQueue为空时,则PullMessageService线程阻塞。怎么唤醒队列,则需要RebalanceService线程每20s进行一次队列重新负载,获取主题Topic的所有消息队列与当前订阅该主题的所有消费者按照队列负载算法分配队列
并发消息拉取与消费的几个核心要点:
1.PullMessageService线程与RebalanceService线程的交互。
2.每个消费组一个一个线程池,用来异步处理消息。
3.消息进度反馈。
RocketMQ消息消费-消费进度反馈机制
拉取流程:
1.PullMessageService从Brocker服务器拉取一批消息,默认32条。
2.先存储到本地处理队列ProcessQueue。
3.提交到消费组线程池,异步执行。
4.提交到线程池后,继续在从Brocker服务器拉取下一批消息。
思考:由于是并发消息,例如thread-1线程在消费消息msg1,thread-2线程在消费消息msg2,thread-3线程在消费消息msg3,此时如果thread-3线程先消费完消息msg3,但thread-1线程,thread-2线程还没处理完消息msg1,消息msg2,那thread-1线程是如何向消息存储者Brocker反馈消息msg3的偏移量?
在这里提示重复消费的问题是由业务方处理。
RocketMQ主从同步
RocketMQ的主从同步主要是为了读写分离并不提供主从服务切换功能,当主节点服务宕机后,RocketMQ只提供读取服务不提供写入服务。
实现步骤
1.首先启动Master服务并在指定端口进行监听。
2.客户端启动,主动连接Master服务,连接TCP连接。
3.客户端每5s(纠正不是5s只要有消息就会拉取,读不到消息时休眠5s再次拉取消息)的间隔时间向服务端拉取消息,如果第一次拉取的话,先获取本地commitLog文件中最大的偏移量,以该偏移量向服务端拉取数据。
4.服务端解析请求,并返回一批数据给客户端。
5.客户端收到一批数据后,将消息写入本地。
commitLog文件中,然后向Master服务汇报拉取进度,并更新下一次待拉取偏移量。
6.然后重复第三步。
问题1:主,从服务器都在运行中,消息消费者是从主节点拉取消息还是从从节点拉取?
答:默认情况下,RocketMQ消息消费者从主服务器拉取,当主服务器积压的消息超过物理内存的40%,则建议从从服务器拉取,但如果slaveReadEnable为false,表示从服务器不可读,从服务器也不会接管消息拉取。
问题2:当消息消费者向从服务器拉取消息后,会一直从从服务器拉取?
答:不是的,分如下情况:
1.如果从服务器的slaveReadEnable设置为false,则下次拉取,从主服务器拉取。
2.如果从服务器允许读取并且从服务器积压的消息为超过其物理内存的30%,下次拉取使用的Brocker为订阅组的Brocker指定的Brocker服务器,改制默认为0,代表为主服务器。
3.如果从服务器允许读取并且从服务器积压的消息超过了其物理内存的30%,下次拉取使用的Brocker为订阅组的whichBrockerWhenConsumeSlowly指定的Brocker服务器,该值默认为1,代表从服务器。
问题3:主从服务消息消费进度是如何同步的?
答:消息消费进度的同步是单向的,从服务器开启一个定时任务,定时从主服务器同步消息消费进度;无论消息消费者是从主服务器拉取的消息还是从从服务器拉取的消息,在向Broker反馈消息消费进度时,优先向主服务器汇报;消息消费者向主服务器拉取消息时,如果消息消费者内存中存在消息消费进度时,主服务器会尝试更新消息消费进度。
如果主服务器宕机后恢复后,消息消费者是否会重复消费?
当主服务器宕机恢复后,从服务器在同步消费进度时同步到的消息消费进度还是主服务器宕机前的进度,从而造成重复消费。只要消息消费者不重启的情况下,消息消费进度还是实时的。还是之前的,如果在此期间消息消费者重启了,那么重复消费就无法避免。在RocketMQ使用过程中很多地方会引发重复消费
大家可以通过以上讲述内容为切入点更深入的了解RocketMQ。
直播答疑
问题1:如果发送的是顺序消息,Brocker挂了,怎么做规避策略?还是说就是无法发送?
丁慧:顺序消息指的是消息消费方式两种(顺序消费和并发消费)RocketMQ能够保证进入消费者的消息按顺序消费,并不是消息发送者。例如订单场景下,我们会使用订单编号作为key,RocketMQ能够保证同一个单号的所有消息发送到同一个队列。如果Brocker宕机后队列数量会减少,消息会重新发送就无法保证发送的顺序性。如果要保证发送的顺序性,可以使用一个Topic一个队列,这样会牺牲你的高可用性。RocketMQ顺序消息指的是消费的顺序性,而不是发送的顺序性。
问题2:那些场景会用到MQ?
丁威: MQ的使用场景1对流量进行削锋填谷操作,使用他的消息堆积能力。例如双十一期间,订单量Tps是平时的几倍或者几十倍,如果通过服务来处理,无法抵挡订单洪峰,可以使用MQ进行降为打击,比如你的订单到达系统后先将订单放入MQ中,然后消费者的数量是有限的,可以平稳的通过异步的方式处理订单,保证系统高可用,不会造成你的服务在关键时刻宕机。使用MQ作为大量消息的挡箭牌,抵挡订单洪峰。2解耦系统模块,降低系统复杂性。比如当用户登陆后,要送优惠券、送积分时,就可以在用户登录后发送消息到MQ,你的优惠券系统,积分系统等都可以订阅MQ接到消息后再去派发优惠券或是积分等其他业务。
问题3:业务端如何解决重复消费的问题?
开发者:可以借助key+redis去重。