0%

RocketMQ

⭐应用场景

异步处理

比如某些业务请求需要3个步骤才能完成,如果前2个步骤完成之后,就已经可以保证业务可以正常完成,就可以直接返回给客户端响应结果,然后把第三个步骤通过消息队列发送给消费者,由消费者来完成后续的流程。这样就可以提高服务的响应速度。

比如我们系统中的某些发行业务都是需要请求确认2个步骤才能完成,如果每一个请求都需要把这2个步骤执行完才给用户响应结果的话,就会导致处理请求的时间变长。如果想要提高程序处理请求的速度,就可以在完成第1个步骤之后,直接给用户返回结果,然后把确认消息放到消息队列中,由消费者完成后续的步骤。

流量控制

通过消息队列把前端和后端隔离起来,后端按照自己的处理能力从消息队列中处理请求。

这样就可以避免大量的请求直接冲击到后端,可以减少后端服务的压力。

这种方案的缺点是,消息队列会导致总体的响应时间变长,增加系统的复杂度。

服务解耦

在微服务架构中,有些下游服务需要获取上游服务的数据,但是下游服务是会增加或者减少的的,如果每次增减下游服务,都需要调整上游服务的接口,这样服务之间的耦合度就太高了。

可以通过消息队列让服务之间解耦,上游服务可以把数据发送到消息队列中,下游服务只需要订阅这个消息队列就可以实时获得数据。无论增加还是减少下游服务,上游服务都不需要修改。

如何选择消息队列

RocketMQ 对响应时延做了很多的优化,大多数情况下可以做到毫秒级的响应,对于响应时延有要求的业务场景,那应该选择使用 RocketMQ

Kafka在设计上采用的是批量和异步的思想,这种设计让Kafka能做到超高的性能。但是带来的问题是,它同步收发消息的响应时延比较高,因为当生产者发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送。所以,Kafka更适合离线类的场景

也可以通过linger_ms_configbatch_size_config这两个参数控制发送消息的时机。

batch.size:用来控制单次提交的字节数大小,默认是16k。当缓冲区中的数据达到16k时,就会触发一次提交。

linger_ms:用来控制两次提交的时间间隔,如果两次提交的时间间隔达到了这个阈值,不管batch有没有满,都会发送出去。

⭐消息模型

RocketMQ使用的消息模型是发布-订阅模型。

可靠性

RocketMQ是通过请求-确认机制,来保证消息传递的可靠性。

具体的逻辑是这样的:

  • 生产者先将消息发送给Broker,Broker收到消息会把消息写进主题和队列中,然后给生产者发送确认的响应。如果生产者没有收到服务端的确认或者收到了失败的响应,就表示消息发送失败。

  • 消费者在收到消息并完成自己的消费业务逻辑后,也会给服务端发送确认消息,服务端只有收到消费者的确认后,才认为一条消息被消费成功。否则消费者下次拉取的数据还是同一条消息。

有序性

RocketMQ只在队列层面保证有序性,主题层面是无法保证有序性的,在同一个队列中,必须要等前一条消息被成功消费,才能继续消费下一条消息。

也就是说同一个队列,同一时刻最多只能有一个消费者进行消费,所以如果消费者的节点数量大于队列的数量,多出来的消费者是没有意义的。

在RocketMQ中每个消费者组都会消费主题中一份完整的消息,不同消费组之间互不影响,也就是说一条消息被消费组A消费过,还会再给消费组B消费。同一个组内的消费者是竞争关系,如果一条消息被一个消费者消费了,组内其他消费者就不会再收到这条消息了。

消息需要被不同的组进行多次消费,所以消息被消费后不会被删除。RocketMQ会为每个消费者组维护一个offset,每成功消费一条消息,就把对应的offset就+1。

⭐RocketMQ分布式事务

逻辑是这样的:

  1. 首先在消息队列上开启一个事务,然后给消息队列发送一个“事务消息”,这个事务消息在事务提交之前,对消费者是不可见的。
  2. 事务消息发送成功之后,就可以开始执行本地事务了。
  3. 然后根据本地事务的执行结果决定是提交事务或者回滚事务,这样就基本实现了 “要么都成功,要么都失败”的一致性要求。

但是还有一个问题是提交事务的消息有可能发送失败,所以RocketMQ增加了事务反查的机制来解决事务消息提交失败的问题。具体来说就是,如果Producer在提交或者回滚事务消息的时候发生了异常,Broker没有收到事务消息,这时候Broker就会定期去Producer上面反查这个事务对应的本地事务的状态,然后根据反查结果来决定提交或者回滚事务

为了支持事务反查机制,还需要实现一个反查本地事务状态的接口,告知RocketMQ本地事务是否成功

比如根据消息中的主键ID,在数据库中查询这条记录是否存在,如果存在就返回成功,否则返回失败。RocketMQ会根据事务反查的结果来提交或者回滚事务。

这个反查本地事务的实现,并不依赖消息的发送方,就算发送事务消息的那个服务宕机了,RocketMQ也可以通过其他实例节点来执行反查,确保事务的完整性。

⭐事务实现原理

RocketMQ的事务是基于两阶段提交来实现的。

  1. 首先在Producer端,首先要重写TransactionListener中的两个方法:executeLocalTransaction()checkLocalTransaction()

    • **excuteLocalTransaction()**:是用来执行本地事务的,比如可以把数据插入到数据库中,并返回执行结果。
    • **checkLocalTransaction()**:是用来做事务反查的,比如可以判断数据库中是否存在刚刚插入的数据,如果存在就提交,如果不存在,就可能是本地事务执行失败了,就回滚。
  2. 然后,在发送消息的时候,先给待发送的消息添加一个属性PROPERTY_TRANSACTION_PREPARED,用来声明这是一个事务消息,然后把这条消息发送给Broker。

  3. 如果发送成功了,就会开始调用TransactionListener的实现类,然后执行executeLocalTransaction()这个方法来跑本地事务。

  4. 最后根据事务消息发送的结果和本地事务的执行结果,来决定是提交或者回滚事务。

  5. Broker收到消息的时候,会判断一下这条消息是普通消息还是事务消息。如果是普通消息,就直接把消息放到指定的队列里面。如果是事务消息,会先记录一下指定的主题和队列,然后把消息保存在一个特殊的内部主题。这个主题对消费者是不可见的,这样就保证了事务在提交之前,这个事务消息消费者是无法消费的。

  6. RocketMQ还会启动一个定时器,定时从事务消息队列中读取所有长时间未提交的事务消息,然后调用事务反查接口,根据返回的结果,来决定这个事务消息是提交还是回滚。

  7. 如果需要提交事务,那就把事务消息从这个特殊队列里面移动到真正的主题和队列中去。如果是回滚事务,就直接结束删除这个事务消息。

⭐如何保证消息不丢失

判断是否有消息丢失,可以利用消息队列的有序性来验证是否有消息丢失,逻辑是这样的:

  1. 在Producer端,给每个发出去的消息附加一个连续递增的序号。
  2. 然后在Consumer端来检查这个序号的连续性
  3. 如果没有消息丢失,那Consumer接收到的消息必然是连续递增的,如果检测到序号不连续,就说明丢消息了。还可以通过丢失的序号确定是哪条消息丢失了,方便排查原因。

大部分的消息队列一般都会有拦截器机制,可以利用拦截器,在Producer发送消息之前,在拦截器中将序号注入到消息里面

但是像Kafka和RocketMQ这样的消息队列,它并不保证Topic层面的顺序,只能保证partition上的消息是有序的,所以在发送消息的时候需要指定分区,并且每个分区都要单独校验序号的连续性。

如果Producer是有多个实例的,也需要每个Producer分别生成各自的序号,并且还需要加上Producer的标识,在Consumer端按照每个Producer分别来检测序号的连续性。

一条消息的传递主要分三个阶段:Producer阶段Broker阶段Consumer阶段

Producer阶段

在Producer端,消息队列通过请求-确认机制,来保证消息的可靠传递。

具体来说就是,Producer向Broker发送一条消息时,Broker收到消息之后会给Producer返回一个确认响应,表示消息已经收到了。只要Producer收到了Broker的确认响应,就可以保证消息在Producer阶段不会丢失。

但是有些消息队列在长时间没收到确认响应,会自动重试,如果重试失败就会以返回值或者抛异常的方式告诉用户消息发送失败。所以在写代码的时候,需要注意处理下返回值或者捕获异常,就可以保证这个发送阶段消息不会丢失

以Kafka为例,同步发送:

1
2
3
4
5
6
7
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println(" 消息发送成功。");
} catch (Throwable e) {
System.out.println(" 消息发送失败!");
System.out.println(e);
}

异步发送:

1
2
3
4
5
6
7
8
producer.send(record, (metadata, exception) -> {
if (metadata != null) {
System.out.println(" 消息发送成功。");
} else {
System.out.println(" 消息发送失败!");
System.out.println(exception);
}
});

Broker阶段

在Broker阶段,一般情况下,只要Broker正常运行,就不会出现丢消息的情况,但是如果Broker出现故障,比如进程死掉,或者服务器宕机,就可能会丢消息。

如果对消息的可靠性要求非常高,可以通过配置Broker参数来避免因为宕机导致丢消息

如果是单个节点的Broker,可以配置一下Broker的参数,比如RocketMQ可以把flushDiskType配置为SYNC_FLUSH同步刷盘。这样Broker在收到一条消息后,会先把消息写到磁盘后,再给Producer返回确认响应。这样就算发生了宕机,因为消息已经被写到了磁盘上,所以是不会丢消息的。

如果Broker是由多个节点组成的集群,需要把Broker配置成:至少把消息发送到2个以上的节点,然后再给Producer发送确认响应,这样当某个Broker宕机后,其他的Broker可以替代宕机的Broker,也不会丢消息。

Consumer阶段

Consumer阶段也是通过请求-确认机制来保证消息的可靠传递。

具体来说就是,Consumer从Broker拉取一条消息后,开始执行消费业务的逻辑,执行成功后,向Broker发送消费确认的响应,如果Broker没有收到消费确认的响应,下次拉取的消息还会是同一条,这样就可以保证消息不会在网络传输过程中丢失,也不会因为消费逻辑出错导致数据丢失。

所以在写代码的时候需要注意,不要在收到消息后立刻就发送消费确认,而是应该在执行完消费业务的逻辑之后,再发送消费确认的响应

比如应该先把消息保存到数据库之后,再发送消费确认的响应。这样就算保存消息失败了,因为没有执行消费确认的代码,下次拉取的还是这条消息。

⭐如何处理重复消息

在MQTT协议里面,有三种传递消息的标准:

  • At most once:在传递消息时,消息最多会被发送一次,换一个说法就是,没什么消息可靠性保证,允许丢消息,一般都是一些对消息可靠性要求不高,可以接收数据少量丢失的场景用。
  • At least once:在传递消息时,消息最少会被发送一次,也就是说,不允许丢消息,但是允许有少量的重复消息出现。
  • Exactly once:在传递消息时,消息只会被发送一次,不允许丢失,也不允许重复,这个是最高的等级。

这三种标准对所有的消息队列都是适用的,现在常用的大部分消息队列提供的标准都是At most once,比如RocketMQ、Kafka、RabbitMQ都是这样。也就是说这些消息队列都有重复消息的可能。

一般解决重复消息的办法是让消费消息的接口具备幂等性

幂等常见的实现方式主要有2种方案:通过数据库唯一约束实现幂等通过前置条件实现幂等

幂等(Idempotence):一个幂等的方法,使用相同的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的,所以幂等的方法不用担心重复数据会对系统造成影响。

比如:一个方法可以把张三的账户余额设置为100元,执行一次对系统产生的影响是,把张三的账户余额设置成了100元,只要提供的参数100元不变,就算执行多少次,张三的账户余额始终都是100元,这就是一个幂等操作。

通过数据库唯一约束实现幂等

可以在表中定义唯一约束来实现幂等,如果入库时发生了唯一键冲突,就表示这条记录已经消费过了,可以直接丢弃。

通过前置条件实现幂等

就是给修改数据设置一个前置条件,如果满足条件就更新数据,否则就拒绝更新。

可以给数据增加一个版本号,每次更新数据前比较一下当前数据的版本号和消息中的版本号是否一致,如果不一致就拒绝更新。更新数据后同时把版本号+1,这样也可以实现幂等。

⭐消息积压如何处理

一般出现消息积压的主要原因是,生产者发送消息的速度大于消费者处理消息的速度

如果只是短时间内有大量的请求发送到后端,这种情况消息积压是正常的现象,因为这些积压的消息会被逐渐消费掉。如果消息队列中一直积压了大量的数据,就需要排查问题了:

  1. 首先可以排查日志,看是不是消费逻辑报错,导致同一条消息反复消费。
  2. 排查消费端和Broker之间的网络状态是否正常
  3. 扩容,增加partition和consumer节点,来提高消费端的处理能力。
  4. 优化消费逻辑,如果代码执行到某一个步骤就可以确定消息可以被正确处理,就可以提前给Broker发送消费成功的响应,然后把后续流程交给其它线程进行异步处理。