RabbitMQ
# RabbitMQ
# 消息队列了解多少
tag:
携程、用友count:2
# RabbitMQ 有几种工作模式?
- 简单模式:一个生产者,一个消费者
- work 模式:一个生产者,多个消费者,每个消费者获取到的消息唯一。
- 订阅模式:一个生产者发送的消息会被多个消费者获取。
- 路由模式:发送消息到交换机并且要指定路由 key ,消费者指定路由 key 将队列绑定到交换机
- topic 模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上 RPC
# 保证消息的不丢失
在我们物流项目中的支付模块中的支付方式的模板做了缓存需要达到 mysql,redis 双写一致性,我们使用 RabbitMq 来实现的,此时我们就需要把证 Rabbit 中数据的不丢失。
我们需要从三个方面进行考虑:
开启生产者的确认机制,成功返回 ack,失败则返回 nack,包括 publisher-confirm 机制和 publisher-return 机制,前者作用在生产者发送消息到交换机和消费者消费消息,后者作用在交换机发送消息到消息队列中。
开启持久化功能,设置交换机(在创建交换机的函数中可以设置为持久化),消息队列(通过 queueBuilder 中设置持久化),消息的持久化(可以在创建消息时设置其模式)。
开启消费者的确认机制,设置消费者的确认机制的模式为 auto,在 spring 处理消息成功后返回 ack(此时就会将消息从消息队列中删除),反之返回 nack,我们可以通过 spring 的 retry 机制设置重试次数,在我们的项目中设置为三次,如果三次都失败了,则直接将消息发送到异常队列中,人工去处理异常的消息。
# 重复消费
因为网络的问题,消费者没有及时的发送 ack,然后此时消费者服务又宕机了,在服务重启后就会重复消费消息。
解决方案:
为每条消息设置一个唯一的 Id 标识,当消费者在消费消息的时候会先判断 id 在数据库中是否存在,如果不存在就消费信息,如果将 id 存储数据库中,如果存在则就不消费消息。
使用分布式锁,其性能比较低。
# 死信队列
延迟队列的实现方式就是:死信队列 + TTL
在我们的消息队列中通过属性
dead-letter-exchange设置死信交换机。自定义死信交换机和绑定对应的自定义的死信消息队列。给需要延迟发送的消息设置 ttl。当消息队列中的消息过期时,消息被拒接消费时,队列满之后,消息就会进入私死信队列中。
我们就可以设置一个消息队列用于存储需要延迟的消息,当消息过期后直接进入死信队列中,消费者去消费死信队列中的消息达到延迟处理的效果。我们项目中就是这么实现的。
还有延迟队列插件
# 消息堆积该怎么解决呢
消息堆积的原因是因为消费信息的速度小于生成消息的速度,所以我们可以多增加几个消费者。
在消费者中开启线程池加快消费速度。
找到消息队列的容量。通过间消息队列设置为惰性队列,造创建队列的时候使用 lazy 方法实现。
惰性队列的优点:存储空间大。缺点:因为消息都是存储在磁盘上的,所以需要做 IO 操作,效率比较低。