本文共 3530 字,大约阅读时间需要 11 分钟。
消息队列重复消费问题
如何保证消息不被重复消费?(如何保证消息消费时的幂等性) 其实这是一个很常见的问题,这两个问题基本上可以连起来问。既然是消费消息,那肯定要考虑会不会重复消费?能不能避免重复消费?或者重复消费也别造成系统异常可以吗?这个是 MQ 领域的基本问题,其实本质上还是问你使用消息队列如何保证幂等性,这个是你架构里面需要考虑的一个问题首先介绍一下什么叫做消息重复消费的问题:
kafka实际上有个 offset 的概念,就是每个消息写进去都有一个 offset ,这个代表他的序号,然后 consumer 消费了数据之后,每隔一段时间就会把自己消费过的消息的 offset 提交一下,代表我已经消费过了,下次要是我重启啥的,你就让我继续从上次消费的 offset 来继续消费 但是凡事都有意外,比如我们生产经常遇到的就是你的有时候重启系统,这里面看你的重启方式,要是有时候重启的比较急直接把进程 kill 了,再重启。这会导致 consumer 有些消息处理了但是还没有提交 offset ,这个时候会导致消息再消费一次重复消费问题不大,怕的是你没有考虑到重复消费后,怎么保证幂等性的。举个例子:假设你有个系统,消费一条数据就往数据库插入一条,要是你的一个消息重复插入两次,你不就插入两条数据了,这个数据不就有问题了?要是你第二次消费的时候,自己判断一下已经消费过了直接扔了,就能保证只有一条数据了 幂等性通俗点来说,就是一个数据或者一个请求给你重复来多少次,你都得保证对应的数据是不会发生改变的,不能出错。所以第二个问题来了,怎么保证消息队列的幂等性?
其实还是的结合业务来思考,这里有几个思路:
比如你要拿个数据写库,你先根据主键查询一下,如果这数据有了,就不要插入了,就update一下
如果是写redis,那没有关系,每次都是set天然的幂等
如果不是上面两个场景,实际稍微复杂一点,你需要让生产者发送消息的时候,每条数据加上一个全局的唯一id,类似于订单id之内的东西,然后你这里消息到了之后,先根据这个id去redis里面查询一下,判断一下之前有消费过吗?如果没有消息就进行处理,然后把id写入redis。如果消费过来,就不要进行处理,保证别重复处理相同消息即可
具体如何保证MQ的消息幂等,还得的结合具体业务进行分析
如何保证消息可靠性传输(如何处理消息丢失问题)?
分析:消息丢失是肯定存在的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条,不能对就是说的重复消费和幂等性维妮塔,不能少说的是数据别丢失了,这个问题是使用 MQ 必须要考虑的 这个数据丢失一般分为两种,要么是 MQ 自己丢了,要么就是我们消息的时候丢了。我们从 rabbitMQ 和 kafka 分别来分析一下RabbitMQ:
生产者弄丢了数据
生产者发送数据到 RabbitMQ ,可能数据再半路就丢失了,因为网络的问题,都有可能 此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitQ 事务( channel .txSelect ),然后发送消息,如果消息没有被成功被Rabbit MQ 接收到,那么生产者回到异常报错,此时就可以进行回滚事务( channel .txRollback ) , 然后重试发送消息,如果收到了消息就可以提交事务( channel .t x Commit )。但问题是, RabbitMQ 事务机制一搞,基本上吞吐量就会下来,因为太消耗性能 所以一般来说,如果你要确保说写 RabbitM Q的消息不丢失,可以开始 confirm 模式,再生产者哪里设置开启 confirm 模式之后,你每次写都会分配一个唯一的 ID ,然后如果写入了 RabbitMQ 中, RabbitMQ 会给你回传一个 ack 确认消息,告诉你这个消息 ok 了。如果 RabbitMQ 没有处理这个消息,会回调你的n ack 接口,告诉你这个消息接收失败了,你可以进行重试而且你可以结合这个机制在自己的内存里面维护每个消息 ID 的状态,如果超过一定的时间还没有接收到这个消息的回调,你就可以重发 事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务后会阻塞在哪里,但是 confirm 机制是异步的,你发送那个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收到后会异步回调你一个接口通知你这个消息收到了 所以一般在生产者这快避免数据丢失,都是用 confirm 机制rabbitMQ弄丢数据:
就是rabbitMQ自己弄丢了数据,但是你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化操磁盘,哪怕是 RabbitMQ 自己挂了,回复之后会自动读取之前存储的数据,一般数据不会丢失,除非是极其罕见的 RabbitMQ 还没有持久化,自己就挂了,但是可能会导致少量数据丢失,但是这个概率比较小。 设置持久化有两个步骤,第一个是传教 queue 的时候将其设置为持久化,这样可以保证 RabbitMQ 持久化 queue 的元数据,但是不会持久化queue里的数据;第二个是发送消息的时候见消息的 deliveryMode 是指为 2 ,这就是将消息设置为持久化,此时 RabbitMQ 就会将消息持久化到磁盘上去,必须要同时设置这两个持久化才行,哪怕是 RabbitMQ 挂了,再次重启,也会从磁盘上去重启恢复Queue,恢复这个Queue里面的数据 而且持久化可以跟 Rabbit MQ 那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知ack了,所以哪怕是持久化到硬盘之前, RabbitMQ 挂了,数据丢了,生产者收不到ack,你也是可以自己重发的 但是存在一种比较极端的情况会丢失数据,就是你消息写到了 RabbitMQ 里面但是还没有来得及持久化到磁盘里面,刚好这个时候 RabbitMQ 挂了,就会导致存在内存里的一点数据会丢失消费者弄丢了数据:
再rabbitMQ中,消费者弄丢数据只有一种可能,就是使用了默认autoAck机制,当你的消费者使用默认机制消费数据时,刚好你接收到一条消息的时候,你的消费者挂了,因为你用的是自动提交机制,此时RabbitMQ会认为你的消息消费了,就会给你传下一条消息过来消费,所以造成了消息丢失
消费者端进行消息消费需要改成手动ack,只有当你程序正常运行完后,你才会收到去提交akc,所以不会导致消息丢失
kafka:
消费端弄丢了数据:
大家都知道 Kafka 会自动提交 offset ,那么只要关闭自动提交offset,再处理完之后手动进行提交,就可以保证数据不会丢失。但是此时还是会有重复消费,比如你刚处理完数据,还没提交offset结果自己挂了,此时肯定会重复消费一次,需要自己保证幂等性就好。kafka弄丢了数据:
这块有一个比较常见的一个场景,就kafka某个 broker 宕机,然后重新选举 partition 的 leader 时。大家想想要是此时其他的 follower 刚好还没没有同步数据,结果此时的leader就挂了,然后选举某个 follower 成leader之后,他就少了一部分数据,这样会造成数据丢失。所以此时一般是要求起码设置如下4个参数:
给这个topic设置replication.factor参数:这个值必须大1,要求每个partition必须至少有2个副本
在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这是要求一个leader至少感知到有至少一个follower跟自己保持联系,没有掉队,这样才能确保leader挂了还有一个follower
在producer端设置acks=all:这个是要求每条数据,必须是写入所有的replica之后,才能认为是写入成功
在producer端设置retries=MAX:这是是要求一旦写入失败,就无限重试,卡在这里
生产者端:
如果你按照上面的思路设置了ack =all ,一定不丢失,要求是你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才会认为本次写成功了。如果没有满足这个条件,生产者会自动不断的重试,重试无限次转载地址:http://cgjva.baihongyu.com/