会出现问题总结
消息队列造成系统可用性降低
系统复杂性增加
消息丢失问题
消息重复消费
消费顺序问题
会出现问题总结
消息队列造成系统可用性降低
系统复杂性增加
消息丢失问题
消息重复消费
消费顺序问题
千万注意!使用消息队列时一定要考虑这些问题,否则可能会造成不可估量的后果!!
问题解决
消息队列造成系统可用性降低
这个很好理解,也相对很好解决。就是原本好好的项目,现在加了一层中间件,如果中间件挂掉了那么系统也会崩了。因此可以说消息队列降低了系统的可用性。
为了提高系统的可用性,最好的办法就是搭建集群。RabbitMQ的集群不同于RocketMQ,Kafka的分布式架构,使用的是主从架构。集群搭建的模式有两种,分别是也有普通集群和镜像集群模式。
集群不是这篇博客的重点,有想了解的小伙伴们可以单独去搜MQ集群相关的博客就OK啦~
系统复杂性增加
这里应该很好理解,就是说原本没用MQ的时候很多东西都不用考虑,用了MQ之后就需要考虑很多很多问题,消息丢失啊,重复消费啊,消息积压呀等等等等。。。但是欲戴皇冠,必承其重的道理想必大家也都很清楚。
消息丢失问题
不用MQ还好,用了MQ如果不考虑消息丢失问题,可能会给公司带来不可预估的财产损失,所以大家使用的时候也需要斟酌。在网络环境中可能会遇到各种不可估计的错误导致数据丢失,接下来从三个角度来介绍防止消息丢失的一些解决办法。
生产者数据丢失
真实开发中会有两种方式解决生产者数据丢失的情况。
第一种:使用了事务的机制,生产者发送消息前开启事务channel.txSelect(),开启事务后去发送消息,如果发送消息出现异常事务就会回滚,channel.txRollback(),如何不发生异常的话,事务就会提交成功发送消息channel.txCommit()。然而因为用到了事务机制,程序的效率肯定会受到影响,因此有了第二种解决办法。
第二种:第二种解决生产者消息丢失的方法是confirm确认模式。这个模式的逻辑也很简单,就是一旦channel进入了confirm模式,发布的消息都会被指派一个唯一的ID,从1开始。如果RabbitMQ收到了消息,则会返回一个消息唯一ID的ACK给生产者,如果没有收到则会返回一个NACK给生产者,生产者通过这个辨别消息队列是否收到了消息。
消息队列数据丢失
当消息成功抵达消息队列后,消息的生产者的任务算是结束了。这时就该考虑如果消息队列把消息弄丢了咋整。
前人栽树,后人乘凉。这些问题很久前就已经有了很好的解决办法。
这里解决消息队列丢失消息的方法就是持久化机制搭配着confirm确认模式来一起使用,也就是当消息队列接收到生产者消息,并且持久化成功后才会返回ACK消息给生产者,否则返回NACK消息,这样一来,如果持久化失败,生产者也会自动重发消息,如果持久化成功,就不用担心消息队列将消息丢失掉啦。就算MQ挂掉了,重启一下也会将消息回复回来。
关于持久化消息的操作很简单:
1、将queue的持久化开关打开,durable设置为true,代表是一个持久的队列
2、发送消息的时候将deliveryMode=2
消费者数据丢失
消费者消息丢失一般是因为使用的是自动确认消息模式,这种模式下消费者会自动确认收到消息,此时MQ会立即删除消息。这种情况下如果消费者消费出现了异常,消费者就会丢失掉消息。
知道了原因,解决起来便很方便,即不采用自动确认消息模式,更换成手动确认消息的模式。
AcknowledgeMode.NONE:不确认
AcknowledgeMode.AUTO:自动确认
AcknowledgeMode.MANUAL:手动确认
在SpringBOOT中手动确认消息的配置如下
// spring-boot中配置方法:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
1
2
消息被重复消费
防止消息的重复消费换句话说就是保证消息的幂等性
这里简单说一下幂等性的意思就是对相同资源的一次请求或者多次请求得到的结果是一致的
这个问题时消息队列的基本问题,用来考察你的能力,也就是说可以结合实际场景来答,没有固定的答案的。
不管哪种消息队列都有可能出现重复消费的情况,但是大多处理的形式都差不多,就是消费者消费了之后会发一个消费成功的信号给MQ,然后MQ会去删除消息,防止了重复消费。不同的MQ有不同的做法而已。
例如RabbitMQ是发一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念, 就是每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了。
至于重复消费的原因也很多很多,比如网络故障导致MQ无法收到消费成功的返回消息等等。
如何解决这个问题?这个可以针对不同的业务场景来回答。
如果使用这个消息去进行一个插入的INSERT操作
这样的场景很好解决,加一个唯一的主键,再次消费的时候就会出现主键冲突防止了出现数据库的脏数据。
如果使用这个消息去做一个Redis的Set操作
这就不用担心了,本身Redis的set操作就是幂等的,set几次的结果都是一样。
如果还有其他需求的话?
可以做一个中间的记录介质的存在,来记录每一个消息的消费情况,就像使用redis的K-V结构将消息的消费情况存在redis中去。
消息积压过多
一般这种情况发生在消费者服务挂掉了,导致没办法去消费,从而消息队列里的数据大量积压。如果不是热门数据还好,就怕是热门数据,一下积压了很多很多的消息,很让人头疼。
解决办法首当其冲的是先修复好consumer的问题然后再考虑怎么去处理积压的消息,如果消费者仍然在挂掉的状态,那么消息积压只会越来越多。
解决好了消费服务器的问题之后就开始着手处理积压的消息了
新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量
然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据
消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue
接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据
这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据
等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息
这里还会有一些问题,比如消息积压时间太长导致超时了,有些会被MQ自动清理掉,那么这些消息就彻底消失了。
这里可以写一个临时程序,将丢掉的那些消息再一点点查出来然后批量导入到消息队列中去。