[转]漫谈消息队列:以Kafka和RocketMQ为例
1. 前言
消息队列是一种帮助开发人员解决系统间异步通信的中间件,常用于解决系统解耦和请求的削峰平谷的问题。它是一种中间件,意味着它是面向研发人员而非终端用户的产品,它的存在不能直接的创造价值,但可以有效的简化研发的开发工作。下面,我试着用一个简单的例子来展示下 MQ 的打开方式。
2. 一个电商系统的演进
一个最简单的电商系统,至少包含以下流程:
- 用户下单
- 付款
- 配送
相信大家都能理解为什么我们不会在一个系统中实现所有功能。 那好,我们假设三个功能简单的对应三个系统: 用户端系统,也就是给用户使用的前端,可以是 web 也可以是 app 或者小程序,这不重要;支付系统,对接银行或其他金融机构,完成付款流程;物流系统,负责商品配送的跟踪。
显而易见,用户如果想购买商品,会在前端系统中选中自己喜欢的商品,然后它需要付款,之后由物流人员进行配送。 那么问题来了,当用户选购物品之后,前端系统如何将订单消息转给支付系统? 很简单,支付系统可以给前端系统提供一个接口,当有订单生成时,前端系统调用支付系统的接口将消息传递给支付系统,完成付款流程。同样,支付系统也可以把订单消息再传给物流系统,逻辑清晰,架构简单,一切看起来都很美好,完全不需要消息队列。
但是接下来,随着业务的发展,又有了新的需求。公司新建了一套风控系统,防止用户恶意操作。 所以风控系统也需要接受到所有的订单请求。 按照前面的方式,前端调用支付系统后,又多了一步调用风控系统接口的操作。之后的之后又有了优惠卷系统,也需要获取订单信息,还有公司财务系统,内控合规,大数据统计,广告系统,用户推荐系统。前端系统的研发每天忙于和各个系统进行对接,完全没有时间优化用户体验。更糟糕的是,后来老板心血来潮,要做一次 69 大促,优惠套路及具欺骗性。届时订单量会有百倍的增长,各个系统当前处理能力严重不足。理想情况下,各个系统可以横向无限扩容,简单说就是增加服务器数量。但这样会带来严重的成本支出,尤其是当我们考虑到有些系统,尤其是大数据相关的其实对消息实时性要求并不高,这些成本会显得很没必要。
还好,God bless 码农。消息队列横空出世,拯救研发狗于水深火热中。前端系统只需将订单信息发送的 MQ 中,而不用关心都有谁需要接受订单信息。其他所有系统从 MQ 中获取消息,而且前端系统也不用关系其他系统收到消息后是否处理成功,MQ 可以帮助我们处理这些问题,这就是我们所说的系统节藕。 大促时,也只需要保证核心系统有充足的处理能力即可,对于处理能力较弱的系统,在流量峰值时 MQ 系统可以将消息暂时保存,下游系统可以优哉游哉的慢慢处理,这就是我们所说的削峰平谷。
那么问题来了,MQ 为什么这么牛逼呢?它是如何实现这些功能的? 我们一个个慢慢讲。
3. MQ 的基本概念
3-1. 三种常见消息协议
- JMS(Java Message Service): JMS 本质上是 JAVA API。在 JMS 中定义了 Producer,Consumer,Provider 三种角色,Producer 作为消息的发送方,Consumer 作为消息的接收方,Provider 作为服务的提供者,Producer 和 Consumer 统称为 Client。JMS 定义了点对点和发布订阅两种消息模型,发布订阅模型中,通过 topic 对消息进行路由,生产者可以将消息发到指定的 topic,消费者订阅这个 topic 即可收到生产者发送的消息。一个生产者可以向一个或多个 topic 中发送消息,一个消费者也可以消费一个或多个 topic 中的消息,一个 topic 也可以有多个生产者或消费者,生产者和消费者只需要关联 topic,而不用关心这消息由谁发送或者消费。 Provider 为每一个 topic 维护一个或多个 queue 来保存消息,消息在 queue 中是有序的,遵循先进先出的原则,不同 queue 间的消息是无序的。点对点模式中没有 topic 的概念,生产者直接将消息发送到指定 queue,消费者也指定 queue 进行消费,消息只能被一个消费者消费,不可以被多个消费者消费。Kafka 和 RocketMQ 都实现了或部分实现了 JMS 协议。
- AMQP(Advanced Message Queuing Protocol): 与 JMS 不同,AMQP 是一个应用层的网络传输协议,对报文格式进行定义,与开发语言无关。在 AMQP 中同样有生产者,消费者两种角色,消息也是保存在 queue 中的。 但不同于 JMS 用 topic 对消息进行路由,AMQP 的路由方式由 exchange 和 binding 决定。 client 可以创建 queue,并在创建 queue 的同时通知 exchange 这个 queue 接受符合什么条件的消息,这个条件即为 Bingding key。生产者发送消息到 exchange 的时候会指定一个 router key,exchange 收到消息后会与自己所维护的 Bingding key 做比较,发送到符合条件的 queue 中。消费者在消费时指定 queue 进行消费。RabbitMQ 实现了 AMQP 协议。
- MQTT(Message Queuing Telemetry Transport):MQTT 协议是一种基于发布订阅的轻量级协议,支持 TCP 和 UDP 两种连接方式,主要应用于即时通讯,小型设备,移动应用等领域。 MQTT 中有发布者(Publish),订阅者(Subscribe)和代理服务器(Broker)三种角色。Broker 是服务的提供者,发布者和前两种协议中的生产者相同,将消息(Message)发送到 Broker,Subscribe 从 Broker 中获取消息并做业务处理。MQTT 的 Message 中固定消息头(Fixed header)仅有 2 字节,开销极小,除此之外分为可变头(Variable header)和消息体(payload)两部分。固定头中包含消息类型,消息级别,变长头的大小以及消息体的总长度等信息。 变长头则根据消息类别,含有不同的标识信息。 MQTT 允许客户端动态的创建主题,发布者与服务端建立会话(session)后,可以通过 Publish 方法发送数据到服务端的对应主题,订阅者通过 Subscribe 订阅主题后,服务端就会将主题中的消息推送给对应的订阅者。
3-2. 系统解藕
一般来说,系统间如果需要通信,除了正常情况下的消息传递外,还要考虑下游系统处理异常,上游系统如何处理?系统宕机的情况下会不会导致数据丢失?当有业务数据异常时,如何去定位是上游系统发送出了问题还是下游系统的问题?如果需要同时将信息发送给多个下游系统,其中一个处理有问题会不会导致其它系统受影响?而 MQ 可以让这些问题变得简单。
3-3. 推 / 拉两种模式
在消费中,一般有推消息和拉消息两种模式。推模式即服务端收到消息后,主动将消息推送给消费者,由消费者进行处理,这种模式具有更高的实时性,但是由于服务端不能准确评估消费端的消费性能,所以有可能造成消息推送过多使客户端来不及处理收到的消息; 拉模式则是服务端收到消息后将消息保存在服务端,被动的等待客户端来拉取消息,这种模式下客户端可以根据自己的处理能力来决定拉消息的频率,但是缺点就是消息处理可能有延迟,不过可以通过长轮询的方式来提高实时性。
3-4. 三种消息级别
消息传递过程中,会有各种异常导致消息不能正常发送,这时候,我们有以下三种选择:
- 下游允许部分消息丢失,不进行处理,这种方式一般适用于监控信息和 log 的传递,少一两条影响不大,称为至多一次(Qos=0);
- 还有一种是消息必须全部送达,不允许任何消息丢失,但是可以接受部分消息重复,这种我们称为至少一次(Qos=1),此种方式一般适用于订单,支付等场景(当然,这要求下游系统实现去重或幂等);
- 还有一种最严格的要求,就是消息只能送达一次,不能多也不能少,这种我们称为正好一次(Qos=2)。
这三种方式又分别是如何实现的呢?
3-4-1. 至多一次的实现
要实现至多一次并不难,生产者只需要异步发送,在发送失败或者消费失败的时候不做任何处理即可。MQ 在消费者拉走消息后,就直接将消息标记为已经消费或者删除消息。在监控系统和日志系统中,丢失部分信息是可以接受的,但显然,电商系统,金融系统等大部分业务,是不允许出现消息丢失这种情况的,需要保证消息一定会送达到消费者。
3-4-2. 至少一次的实现
至少一次的实现一般如下:生产者发消息到 MQ,MQ 收到消息后返回确认信息(ACK)给生产者,生产者收到确认信息后生产过程完成,如果在一定时间内,生产者没有收到确认信息,生产者重新发送消息。 重新发送的过程可以是立即发送,也可以将处理异常的消息持久化,比如保存到数据库中,然后定时重试知道成功。同样,消费者从 MQ 获取到消息后,当业务逻辑处理完成,向 MQ 返回 ACK 信息。 但是存在下面一种情况,当 MQ 收到消息并发送 ACK,或者消费者消费完成发送 ACK 信息之后,由于网络,系统故障等问题,ACK 信息没有成功送达,就会导致消息重复发送。 对于大部分消息队列的实现来说(如 kafka,RocketMQ)对于消息重复的处理方式,就是不处理,交由消费者根据业务逻辑自己实现去重或幂等。消费者根据业务逻辑自己实现去重或幂等。消费者根据业务逻辑自己实现去重或幂等。 重要的事情说三遍。有些人或许会觉得这是常识和基本素养,但也有部分同学过于相信 MQ 系统和网络环境的稳定性,不做去重导致业务出现问题,比如优惠卷系统没有做去重处理,本来只能领取一张的优惠卷,结果给用户发了多张。
3-4-3. 正好一次的实现
如果要实现正好一次的消息级别,每次消息传递过程正需要四次通信,过程如下: 发送端发消息给接收端,接收端收到消息后持久化保存消息 ID 并返回 REC 信息给发送端,通知生产端我已经收到这个消息了。 这时消息是一种中间态,接受端不会进行业务逻辑的处理。这个过程中,如果 REC 消息丢失,服务端重传了消息, 接受端接受到消息后会和本地保存到消息 ID 做对比,如果重复,就丢弃消息不做处理,避免消息被处理多次,而且消息 ID 会持久化到硬盘,防止因为断电内存中数据丢失倒是消息被重复处理。 发送端收到接收端返回的 rec 消息后,发送一个 rel 请求给消费端,告诉消费端我确认收到了你的确认消息,接收端收到 rel 请求后才会进行具体的业务逻辑处理,并返回 comp 信息给发送端,同时在本地删除保存的消息 ID。如果发送端没有收到 comp 信息,会重发 rel 请求而不会重发消息。
以上,就是正好一次的实现过程。如果你没看懂,那就再看一遍,如果还是没看懂,那也没关系,至少,你知道这玩意实现起来很复杂就好了。也就明白了为什么大部分消息中间件一般只保证至少一次,去重的过程交给消费者自己处理了。 毕竟对于大多数场景,吞吐量才是首要指标。 那么,什么场景下,需要保证正好一次呢? 答案是物联网。 而标准的实现协议,是 MQTT。 因为物联网场景下,大部分终端是嵌入式系统,处理能力会比服务器低很多,所以服务端需要帮助终端实现去重,简化终端的业务逻辑。
3-5. 数据的可靠性
生产者将消息发到消息队列的 Broker 之后,如果 Broker 只将消息保存在内存中,那当服务器断电或各种原因导致宕机时, 还没有被消费的消息将会丢失。 为了解决这个问题,可以选择将数据持久化到硬盘,这样当机器故障恢复后数据还在,消费者可以继续消费之前没有消费完的数据。但是,如果仅仅持久化到硬盘,当服务器发生磁盘故障,Raid 卡故障时,数据依然存在丢失的风险。 为了解决这个问题,绝大多数消息队列的实现都引入了复制 / 多副本的概念,将每份数据都保存在多台服务器上,而且一般这些服务器还要尽可能多实现跨机架甚至跨数据中心。 复制可以是同步的也可以是异步的,可以是一主一从,也可以是一主多从,也可以基于 Raft,Paxos 等算法实现多副本。
不管是持久化还是复制,在保证数据可靠性的同时,都必然会带来一部分性能的损耗,所以不同的消息队列实现根据自己的定位,会选择不同的复制的实现方式以及持久化时的文件结构。下面,我根据自己的理解聊一聊常见的文件结构和复制方式的优缺点,只是个人观点,仅供参考。
4. Kafka 和 RocketMQ 的文件结构对比
其实消息队列的持久化,除了本地写文件外,还可以持久化到 K-V 存储或者关系型数据库中,但是性能会比较差,我们就不做讨论了,我们只聊聊持久化到本地文件系统中。而最常见的两种文件结构,一种是 Kafka 所使用的,一种是 RocketMQ 所使用的。
Kafka 会在 Broker 上为每一个 topic 创建一个独立的 partiton 文件,Broker 接受到消息后,会按主题在对应的 partition 文件中顺序的追加消息内容。而 RocketMQ 则会创建一个 commitlog 的文件来保存分片上所有主题的消息。
Broker 接收到任意主题的消息后,都会将消息的 topic 信息,消息大小,校验和等信息以及消息体的内容顺序追加到 commitlog 文件中,commitlog 文件一般为固定大小,当前文件达到限定大小时,会创建一个新的文件,文件以起始便宜位置命名。同时,Broker 会为每一个主题维护各自的 ConsumerQueue 文件,文件中记录了该主题消息的索引,包括在 commitlog 中的偏移位置,消息大小及校验和,以便于在消费时快速的定位到消息位置。ConsumerQueue 的维护是异步进行的,不影响消息生产的主流程,即使 ConsumerQueue 没有及时更新的 情况下,服务异常终止,下次启动时也可以根据 commitlog 文件中的内容对 ConsumerQueue 进行恢复。
这样的文件结构也就决定了,在同步刷盘的场景下,RocketMQ 是顺序写,而 Kafka 是随机写。通常情况下,我们认为顺序写的性能远高于随机写,尤其时对于传统的机械硬盘来讲更是如此。 且当 Broker 上的 topic 数量增多时,RocketMQ 在写消息的性能上几乎不会受到影响,而对 Kafka 的影响则会较大。
而在消费时,因为可以根据一定的缓存策略将热数据提前缓存到内存中,所以不管哪种方式对于磁盘的要求都不是太高。不过对于 RocketMQ 来说,内存加载时会加载一整个 commitlog 文件,如果同一个 Broker 上的两个主题,一个主题的消息积压了很长时间开始才开始消费,而另一个主题在及时消费新发送的消息时,Broker 可能会频发的读取文件更新到缓存中,造成磁盘性能损耗,进而影响到生产时的发送性能。所以虽然 RocketMQ 支持海量消息积压,但如果是在共享的集群中,还是建议用户最好能做到及时消费,保证集群中所有主题都在消费相近时间段的消息,这样命中内存缓存的概率会比较高,消费时不会带来额外的磁盘开销。
需要补充说明的是,在做技术选型时,还需要考虑到硬件的发展。现今固态硬盘虽然价格较机械硬盘还是高出很多,但普及度越来越高。而固态硬盘在乱序写时,性能表现比机械硬盘会好很多,特别是多线程同时进行写操作时,性能也会比单线程顺序写强。对于需要同步刷盘保证数据可靠性的应用,磁盘读写性能的重要性一般来讲也会远高于磁盘的空间大小。 成本上来讲,如果可以显著的提高单机性能,虽然单价来看固态硬盘更加昂贵,但是如果可以节省部分 CPU,内存和机架位置,还是很划算的。
5. 服务可用性保障—复制与 failover
复制的实现,最简单的方式就是一主一从的结构,开源版本的 RocketMQ 即使用了这种模式。由两个 Broker 实例组成一组服务,一个作为主节点,提供读写服务,一个作为从节点,在正常情况下只从主节点同步数据不提供读写服务,且每个 topic 都会分配到多个 Broker 分组上。当某个从节点发生故障时,可以禁止主节点的写入,依然允许消费者继续消费该节点中未处理完成的消息。而生产者有新消息过来时,由其它主从都健康的分组提供服务, 直到故障机器恢复后主节点重新提供读写服务。如果故障机器无法恢复,只需等积压消息全部消费完,替换故障机器即可。 如果主节点故障,则可以在从节点进行消费,其它处理方式与从节点故障处理方式一致。 这种方式的优点是逻辑简单,实现也简单,简单意味着稳定,隐藏的 bug 少。且数据只需要一份冗余,对磁盘空间的开销相对较少,可以保证大多数情况下的数据可靠性和服务可用性。
Kafka 的复制策略,使用的是 ISR(可用服务列表)的方式,可以把他看成主从结构的升级版。对于每一个 partiton,可以分配一个或多个 Broker。 其中一个作为主节点,剩余的作为跟随者,跟随者会保存一个 partition 副本。生产者将消息发送到主节点后,主节点会广播给所有跟随者,跟随者收到后返回确认信息给主节点。 用户可以自由的配置副本数及当有几个副本写成功后,则认为消息成功保存。且同时,会在 ZooKeeper 上维护一个可用跟随者列表,列表中记录所有数据和主节点完全同步的跟随者列表。当主节点发生故障时,在列表中选择一个跟随者作为新的主节点提供服务。在这种策略下,假设总共有 m 个副本,要求至少有 n 个(0<n<m+1)副本写成功,则系统可以在最多 m-n 个机器故障的情况下保证可用性。
还有一种实现是基于 Raft 算法实现的多副本机制,具体细节可以参考官方的 paper。Raft 集群一般由奇数节点构成,如果要保证集群在 n 个节点故障的情况下可用,则至少需要有 2n+1 个节点。 与 ISR 方式相比,Raft 需要耗费更多的资源,但是整个复制和选举过程都是集群中的节点自主完成,不需要依赖 ZooKeeper 等第三者。 理论上 Raft 集群规模可以无限扩展而 ISR 模式下集群规模会受限于 ZooKeeper 集群的处理能力。
6. 消息队列的高级特性
6-1. 顺序消息
一般情况下,因为消息分布在不同的 Broker 上,且有多个客户端同时消费,各实例间的网络状态和处理能力都是不一定的,所以分布式消息系统是没有办法保证消息的处理顺序的。但如果你了解了一般消息队列的文件结构,你就会发现不管是 Kafka 的 partition 那种方式,还是 RocketMQ 的方式,都可以保证同一个 partition 或者同一个 ConsumerQueue 内的消息是可以保证顺序的。剩下的,我们需要做的就是将需要保证顺序的消息放入到同一个 partiton 或者 queue 中就好了, 最简单的方式是我们只为主题分配一个 partition 或者 queue,这样就可以保证严格的顺序,但是这样就不能体现分布式系统的性能优势了,集群的处理能力没有办法横向扩展。
在实际的生产中,大多数情况下我们其实并不需要所有的消息都顺序处理,更多时候只要求具有相同特征的消息保证顺序,如电商系统中,一般要求具有相同订单号的消息需要保证顺序,不同的订单之间可以乱序,也就是说我们只要保证有办法将具有相同订单编号的消息放入到同一个队列或者 partition 中即可。
Kafka 提供了指定 partition 发送的功能,使用者可以在客户端根据业务逻辑自行处理,还有的消息队列支持根据某个字段的值,将消息 hash 到消息指定消息队列中。 指定 partition 和 hash 两种方式的主要区别,就是当有某个分片故障时,指定 partition 的方式会导致部分消息发送失败,而 hash 的方式有可能造成少量消息的乱序。
6-2. 事物消息
事务消息主要指消息生产过程中,需要确保发送操作和其它业务逻辑处理结果的一致性,要么都成功要么都失败。 比如要同时执行写入 MySQL 数据库和发送消息两种操作,要保证写库成功同时发送消息也成功,如果写库失败,消息也要取消发送。事务消息的实现一般是依赖两步提交策略。 已写库并发消息为例,首先客户端将消息发送到 Broker,Broker 收到消息后,给客户端返回一个确认信息。 这时消息在服务端是处于一种中间状态,消费者不可以消费这种状态的消息。 客户端收到确认消息后,执行写数据库的操作,写库成功后,向 Broker 再发送一个提交信息。 服务端收到提交信息后将消息更改就绪状态,允许消费者正常消费。 同时,生产者客户端还要提供一个回调方法,当 Broker 收到消息后,长时间没有收到确认信息时,调用客户端提供的回调方法进行回滚,如重置数据库。
6-3. 消息回放
有时,消费者可能会因为系统问题或其它原因,需要重新消费已经消费完的消息。大部分消息队列都可以实现这个功能,一般将次功能称为消息回放。要实现消息回放的功能,需要保证消息不会在消费成功之后立刻删除,而是保存一段时间后,根据一定策略,如一周后删除。同时,还需要对消费者当前消费的消费位置进行记录,RocketMQ 和 Kafka 都会通过一个 Offset 文件来记录消费者的消费位置,当消费者消费完成功,更新并提交 Offset。一般来说,Offset 文件中还需要记录最大消费位置,即已经入队的最新一条消息所在的位置和最小消费位置,即还没删除的最老的消息所在的位置。Offset 文件可以保存在服务端,也可以保存在客户端,也可以保存在 ZooKeeper 中,或者其它如 Redis 之类的第三方存储。 早期版本(0.9.0 之前)的 Kafka 是将消息保存在 ZooKeeper 中,之后为了减轻 ZooKeeper 的负担,将 Offset 保存到 Broker 对应的 topic 中。 RocketMQ 则支持有两种模式,默认是集群模式,topic 中的每条消息只会集群中的一个实例消费,这种模式由服务端管理 Offset,还有一种是广播模式,集群中的所有实例都会消费一份全量消息,这种模式由客户端管理 Offset。
7. 影响单机性能的因素
Kafka 和 RocketMQ 都是优秀的分布式消息系统,当需要服务于有较高高吞吐量要求的服务时,都可以通过扩容来解决需求。虽然如此,我们也不应放弃对单机吞吐量的追求,毕竟单机处理能力越高,意味着可以节省更多资源。而决定单机性能的因素,我能想到的主要有下面几个方面:
7-1. 硬件层面
- 硬盘:一般来说消息队列的瓶颈主要在磁盘 IO,更好的硬盘会带来更高的性能,正常情况性能由高到低排序为 NVMe >传统 SSD(Non-Volatile Memory express) >SAS >SATA。 对于 SAS 盘和 SATA 盘这种机械硬盘来说,还要看具体硬盘的转速。
- Raid 卡: Raid 卡的型号和性能,以及是否带有 Raid 卡缓存,Raid 卡的鞋策略是 WriteThough 还是 WriteBack 也会影响到服务的 I/O 性能进而影响到吞吐量。
7-2. 系统层面
- Raid 级别:当有 4 块盘时,Raid0,Raid5 和 Raid10 三种形式的 Raid 会对 I/O 性能造成不同的影响。Raid0 因为不需要任何其它操作,速度是最快的,几乎等于单盘写速度的四倍。Raid10 需要写一份数据和一份镜像,写性能是略小于单盘写速度的两倍的;Raid5 可以有三个盘提供写服务,另一个盘来存放校验和,由于计算校验和存在一定的性能损耗,写速度略小于单盘写速度的三倍,而且随着硬盘数量的增多,Raid5 计算校验和造成的开销会随之增大,如果没有 Raid 卡缓存支撑的话,在磁盘数量超过一定值后,性能是低于 Raid10 的。
- Linux I/O 调度算法:Linux 内核包含 Noop,Deadline,CFG, Anticipatory 四种 I/O 调度算法,需要结合应用特性和硬件选择合适的调度算法。 据说 SSD 硬盘更适合使用 Noop 算法。
- 文件系统的 block size:调整合适的文件系统的 block size 也会提高吞吐量。
- SWAP 的使用: SWAP 空间使用过程中会造成一定的 I/O 开销,如果内存充足的情况下,可以关闭 SWAP 功能。
7-3. 应用层面
- 文件读写的方式:一般来说,顺序读写速度远高于随机读写,且一次性读写的文件越大相对来说效率越高。应用可以据此来对文件结构和读写方式做一定优化。
- 缓存策略: 应用可以通过一定的缓存策略,提前将可能用到的数据读到内存中,当收到请求时,如果能命中缓存中的数据,在缓存中直接读取效率远高于读写磁盘。同样,写操作时也可以通过缓存将零散的写操作进行汇集,提高写操作的效率。 所有适合的缓存策略将显著提高 Broker 的处理能力。
作者简介
赵晓磊,京东集团基础架构部运维工程师,关注消息队列。
[source]漫谈消息队列:以Kafka和RocketMQ为例