0%

Kafka

⭐应用场景

异步处理

比如某些业务请求需要3个步骤才能完成,如果前2个步骤完成之后,就已经可以保证业务可以正常完成,就可以直接返回给客户端响应结果,然后把第三个步骤通过消息队列发送给消费者,由消费者来完成后续的流程。这样就可以提高服务的响应速度。

比如我们系统中的某些发行业务都是需要请求确认2个步骤才能完成,如果每一个请求都需要把这2个步骤执行完才给用户响应结果的话,就会导致处理请求的时间变长。如果想要提高程序处理请求的速度,就可以在完成第1个步骤之后,直接给用户返回结果,然后把确认消息放到消息队列中,由消费者完成后续的步骤。

流量控制

通过消息队列把前端和后端隔离起来,后端按照自己的处理能力从消息队列中处理请求。

这样就可以避免大量的请求直接冲击到后端,可以减少后端服务的压力。

这种方案的缺点是,消息队列会导致总体的响应时间变长,增加系统的复杂度。

服务解耦

在微服务架构中,有些下游服务需要获取上游服务的数据,但是下游服务是会增加或者减少的的,如果每次增减下游服务,都需要调整上游服务的接口,这样服务之间的耦合度就太高了。

可以通过消息队列让服务之间解耦,上游服务可以把数据发送到消息队列中,下游服务只需要订阅这个消息队列就可以实时获得数据。无论增加还是减少下游服务,上游服务都不需要修改。

如何选择消息队列

RocketMQ 对响应时延做了很多的优化,大多数情况下可以做到毫秒级的响应,对于响应时延有要求的业务场景,那应该选择使用 RocketMQ

Kafka在设计上采用的是批量和异步的思想,这种设计让Kafka能做到超高的性能。但是带来的问题是,它同步收发消息的响应时延比较高,因为当生产者发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送。所以,Kafka更适合离线类的场景

也可以通过linger_ms_configbatch_size_config这两个参数控制发送消息的时机。

batch.size:用来控制单次提交的字节数大小,默认是16k。当缓冲区中的数据达到16k时,就会触发一次提交。

linger_ms:用来控制两次提交的时间间隔,如果两次提交的时间间隔达到了这个阈值,不管batch有没有满,都会发送出去。

⭐消息模型

Kafka使用的消息模型是发布-订阅模型。

可靠性

Kafka是通过请求-确认机制,来保证消息的可靠性。

具体来说就是:

  • 生产者先将消息发送给Broker,Broker收到消息会把消息写进topic的partition中,然后给生产者发送确认的响应。如果生产者没有收到服务端的确认或者收到了失败的响应,就表示消息发送失败。

  • 消费者在收到消息并完成消费业务逻辑后,也会给Broker发送确认响应,Broker收到消费者的确认后,才会认为消息被消费成功。否则消费者下一次拉取到的数据还是同一条消息。

有序性

Kafka只在partition层面保证有序性,topic层面是无法保证有序性的,在同一个partition中,前一条消息被成功消费,才能继续消费下一条消息。

也就是说同一个partition,同一时刻最多只能有一个消费者进行消费,所以如果消费者的节点数量大于partition的数量,多出来的消费者是没有意义的。

在Kafka中每个消费者组都会消费主题中一份完整的消息,不同消费组之间互不影响,也就是说一条消息被消费组A消费过,还会再给消费组B消费。同一个组内的消费者是竞争关系,如果一条消息被一个消费者消费了,组内其他消费者就不会再收到这条消息了。

消息需要被不同的组进行多次消费,所以消息被消费后不会被删除。Kafka会为每个消费者组维护一个offset,每成功消费一条消息,就把对应的offset就+1。

⭐Kafka分布式事务

逻辑是这样的:

  1. 首先在消息队列上开启一个事务,然后给消息队列发送一个“事务消息”,这个事务消息在事务提交之前,对消费者是不可见的。
  2. 事务消息发送成功之后,就可以开始执行本地事务了。
  3. 然后根据本地事务的执行结果决定是提交事务或者回滚事务,这样就基本实现了 “要么都成功,要么都失败”的一致性要求。

但是最后提交事务的消息有可能会发送失败,Kafka的解决方案比较简单,就是直接抛异常,让用户自己处理,我们可以在业务代码中重试,或者回滚之前的操作。所以在写代码的时候,调用send方法时一定要捕获异常。

⭐事务实现原理

Kafka中的事务是基于两阶段提交来实现的。

Kafka引入了事务协调者来负责管理服务端的事务,这个事务协调者是Broker的一部分。

Kafka中有一个特殊的Topic,用来记录事务日志。在Kafka集群中,可以存在多个协调者,每个协调者负责管理事务日志中的几个分区,这样是为了并行执行多个事务。

具体的逻辑是这样的:

  1. 在开启事务的时候,生产者会给协调者发送一个开启事务的请求,协调者会在事务日志中记录下事务ID。

  2. 然后,Producer在发送消息之前,还要给协调者发送请求,告诉消息属于哪个topic和哪个partition。这些信息会被协调者记录在事务日志里面。

  3. 然后就可以正常发送消息了,Kafka会在Consumer端过滤掉未提交的事务消息。

  4. 消息发送完成后,Producer会给协调者发送提交或者回滚事务的请求,然后协调者就会开始两阶段提交

    • 第一阶段:协调者会把事务的状态设置为prepare预提交,然后写到事务日志里。
    • 第二阶段:协调者会写一条“事务结束”的消息并把事务状态改为 “已提交”。Consumer读到事务提交的消息之后,就会去消费对应的事务消息。
  5. 最后协调者会记录最后一条事务日志,表示这个事务已经结束了。

⭐Kafka高性能IO原理

  1. 批处理消息,减少网络通信的开销
  2. 顺序读写磁盘,减少磁盘寻址时间
  3. 利用PageCache加速消息的读写
  4. 零拷贝

批量收发消息减少网络通信的开销

在Kafka内部消息都是批量处理的:

  • 在生产端,虽然Kafka的Producer只提供了单条发送的API,实际上Kafka采用了批量和异步的发送机制。

    • 调用send()方法时,不管是同步发送还是异步发送,Kafka都不会立刻把这条消息发送出去,而是放到内存中缓存起来,然后在合适的时候一次性把缓存中的数据全部发送出去。这样可以减少网络通信的次数。

      可以通过linger_ms_configbatch_size_config这两个参数控制发送消息的时机。

      batch.size:用来控制单次提交的字节数大小,默认是16k。当缓冲区中的数据达到16k时,就会触发一次提交。

      linger_ms:用来控制两次提交的时间间隔,如果两次提交的时间间隔达到了这个阈值,不管batch有没有满,都会发送出去。

  • 在消费端,消息也是批量处理的,Consumer拉到一批消息后,会把消息拆分开,再一条一条发送给我们的程序来处理。

利用顺序读写提升磁盘IO性能

磁盘有一个特性,就是顺序读写的性能要比随机读写快好几倍。

因为操作系统每次从磁盘读写数据的时候,需要先寻址,也就是找到数据在磁盘上的物理位置,然后再进行数据读写。

顺序读写比随机读写省去了大部分的寻址时间,因为它只要一次寻址,就可以连续的写下去,所以性能会好很多。

Kafka利用了磁盘顺序读写速度很快的特点,对于每个分区,它从Producer接收的消息,会顺序的写入到对应的log文件中,一个文件写满了,就开启一个新的文件这样顺序写下去

消费的时候也是从某个log文件中的某个位置,顺序的把消息读出来。

利用PageCache加速消息读写

Kafka还会利用PageCache加速消息的读写。

PageCache是操作系统都有的一个特性,可以简单的理解为PageCache就是操作系统在内存中给磁盘上的文件建立的缓存,读写文件的时候,并不会直接去读写磁盘上的文件,应用程序实际上操作的是PageCache,也就是文件在内存中缓存的副本。

PageCache有两种情况:

  • 一种情况是PageCache中有数据,那就直接读取,这样就节省了从磁盘上读取数据的时间。
  • 另一种情况是PageCache中没有数据,这个时候应用程序会被阻塞,操作系统会把数据从文件复制到PageCache中,这种情况下,才会真正的读一次磁盘上的文件,这样就比较慢。

应用程序用完某块PageCache后,操作系统并不会立刻就清理PageCache中的数据,除非系统内存不够用,操作系统才会去清理一部分数据。

零拷贝技术(Zero Copy)

Kafka还使用了一种“零拷贝”的操作系统特性来提升消费性能。

在Broker中,一条消息的消费的逻辑大概是这样的:

  1. 首先从文件中把数据复制到PageCache中,如果PageCache中已经有数据了,这一步是可以省略的。

    在Kafka中,消息被写入到Broker中,通常很快就会被消费掉,像这种刚刚写入到PageCache中的数据,很快就会被消费者消费掉,PageCache命中率会非常高

  2. 然后从PageCache中把数据复制到内存中。

  3. 最后从内存中把数据复制到Socket缓冲区里面,最后发送给客户端。

Kafka使用“零拷贝”技术可以把这个复制次数减少一次,直接从PageCache里面把数据复制到Socket缓冲区里面,速度会很快。

⭐如何保证消息不丢失

判断是否有消息丢失,可以利用消息队列的有序性来验证是否有消息丢失,逻辑是这样的:

  1. 在Producer端,给每个发出去的消息附加一个连续递增的序号。
  2. 然后在Consumer端来检查这个序号的连续性
  3. 如果没有消息丢失,那Consumer接收到的消息必然是连续递增的,如果检测到序号不连续,就说明丢消息了。还可以通过丢失的序号确定是哪条消息丢失了,方便排查原因。

大部分的消息队列一般都会有拦截器机制,可以利用拦截器,在Producer发送消息之前,在拦截器中将序号注入到消息里面

但是像Kafka和RocketMQ这样的消息队列,它并不保证Topic层面的顺序,只能保证partition上的消息是有序的,所以在发送消息的时候需要指定分区,并且每个分区都要单独校验序号的连续性。

如果Producer是有多个实例的,也需要每个Producer分别生成各自的序号,并且还需要加上Producer的标识,在Consumer端按照每个Producer分别来检测序号的连续性。

一条消息的传递主要分三个阶段:Producer阶段Broker阶段Consumer阶段

Producer阶段

在Producer端,消息队列通过请求-确认机制,来保证消息的可靠传递。

具体来说就是,Producer向Broker发送一条消息时,Broker收到消息之后会给Producer返回一个确认响应,表示消息已经收到了。只要Producer收到了Broker的确认响应,就可以保证消息在Producer阶段不会丢失。

但是有些消息队列在长时间没收到确认响应,会自动重试,如果重试失败就会以返回值或者抛异常的方式告诉用户消息发送失败。所以在写代码的时候,需要注意处理下返回值或者捕获异常,就可以保证这个发送阶段消息不会丢失

以Kafka为例,同步发送:

1
2
3
4
5
6
7
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println(" 消息发送成功。");
} catch (Throwable e) {
System.out.println(" 消息发送失败!");
System.out.println(e);
}

异步发送:

1
2
3
4
5
6
7
8
producer.send(record, (metadata, exception) -> {
if (metadata != null) {
System.out.println(" 消息发送成功。");
} else {
System.out.println(" 消息发送失败!");
System.out.println(exception);
}
});

Broker阶段

在Broker阶段,一般情况下,只要Broker正常运行,就不会出现丢消息的情况,但是如果Broker出现故障,比如进程死掉,或者服务器宕机,就可能会丢消息。

如果对消息的可靠性要求非常高,可以通过配置Broker参数来避免因为宕机导致丢消息

如果是单个节点的Broker,可以配置一下Broker的参数,比如RocketMQ可以把flushDiskType配置为SYNC_FLUSH同步刷盘。这样Broker在收到一条消息后,会先把消息写到磁盘后,再给Producer返回确认响应。这样就算发生了宕机,因为消息已经被写到了磁盘上,所以是不会丢消息的。

如果Broker是由多个节点组成的集群,需要把Broker配置成:至少把消息发送到2个以上的节点,然后再给Producer发送确认响应,这样当某个Broker宕机后,其他的Broker可以替代宕机的Broker,也不会丢消息。

Consumer阶段

Consumer阶段也是通过请求-确认机制来保证消息的可靠传递。

具体来说就是,Consumer从Broker拉取一条消息后,开始执行消费业务的逻辑,执行成功后,向Broker发送消费确认的响应,如果Broker没有收到消费确认的响应,下次拉取的消息还会是同一条,这样就可以保证消息不会在网络传输过程中丢失,也不会因为消费逻辑出错导致数据丢失。

所以在写代码的时候需要注意,不要在收到消息后立刻就发送消费确认,而是应该在执行完消费业务的逻辑之后,再发送消费确认的响应

比如应该先把消息保存到数据库之后,再发送消费确认的响应。这样就算保存消息失败了,因为没有执行消费确认的代码,下次拉取的还是这条消息。

⭐如何处理重复消息

在MQTT协议里面,有三种传递消息的标准:

  • At most once:在传递消息时,消息最多会被发送一次,换一个说法就是,没什么消息可靠性保证,允许丢消息,一般都是一些对消息可靠性要求不高,可以接收数据少量丢失的场景用。
  • At least once:在传递消息时,消息最少会被发送一次,也就是说,不允许丢消息,但是允许有少量的重复消息出现。
  • Exactly once:在传递消息时,消息只会被发送一次,不允许丢失,也不允许重复,这个是最高的等级。

这三种标准对所有的消息队列都是适用的,现在常用的大部分消息队列提供的标准都是At most once,比如RocketMQ、Kafka、RabbitMQ都是这样。也就是说这些消息队列都有重复消息的可能。

一般解决重复消息的办法是让消费消息的接口具备幂等性

幂等常见的实现方式主要有2种方案:通过数据库唯一约束实现幂等通过前置条件实现幂等

幂等(Idempotence):一个幂等的方法,使用相同的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的,所以幂等的方法不用担心重复数据会对系统造成影响。

比如:一个方法可以把张三的账户余额设置为100元,执行一次对系统产生的影响是,把张三的账户余额设置成了100元,只要提供的参数100元不变,就算执行多少次,张三的账户余额始终都是100元,这就是一个幂等操作。

通过数据库唯一约束实现幂等

可以在表中定义唯一约束来实现幂等,如果入库时发生了唯一键冲突,就表示这条记录已经消费过了,可以直接丢弃。

通过前置条件实现幂等

就是给修改数据设置一个前置条件,如果满足条件就更新数据,否则就拒绝更新。

可以给数据增加一个版本号,每次更新数据前比较一下当前数据的版本号和消息中的版本号是否一致,如果不一致就拒绝更新。更新数据后同时把版本号+1,这样也可以实现幂等。

⭐消息积压如何处理

一般出现消息积压的主要原因是,生产者发送消息的速度大于消费者处理消息的速度

如果只是短时间内有大量的请求发送到后端,这种情况消息积压是正常的现象,因为这些积压的消息会被逐渐消费掉。如果消息队列中一直积压了大量的数据,就需要排查问题了:

  1. 首先可以排查日志,看是不是消费逻辑报错,导致同一条消息反复消费。
  2. 排查消费端和Broker之间的网络状态是否正常
  3. 扩容,增加partition和consumer节点,来提高消费端的处理能力。
  4. 优化消费逻辑,如果代码执行到某一个步骤就可以确定消息可以被正确处理,就可以提前给Broker发送消费成功的响应,然后把后续流程交给其它线程进行异步处理。