消息队列如何保证消息的可靠传输
由于网络延迟、机器等原因,消息队列可能出现消息的丢失,那么如何解决这个问题呢。
RabbitMQ
生产者丢了数据
生产者发送消息的时候,可能网络原因没有到达消息队列保存。
RabbitMQ提供了事务的处理,发送消息前开启一个事务channel.txSelect
,然后发送消息,如果消息没有被mq收到,那么生产者会抛出异常,并回滚事务channel.txRollback
,重新发送数据,如果成功收到,则提交事务channel.txCommit
。
还有一种方法是,确认机制,每次写消息都会生成一个唯一的id,写成功后会收到一个ack,如果没有处理,要调用nack回调,重新发送。这一过程是异步的。
mq本身丢失了数据
开启 RabbitMQ 的持久化,消息写入之后会持久化到磁盘, RabbitMQ 挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。RabbitMQ 还没持久化,就挂了,可能导致少量数据丢失,这个概率较小。
消费端丢失数据
RabbitMQ 如果丢失了数据,主要是因为消费的时候,刚消费到,还没处理,结果进程挂了,RabbitMQ 认为消费了,数据就丢了。
这个时候得用 RabbitMQ 提供的 ack 机制,必须关闭 RabbitMQ 的自动 ack ,可以通过一个 api 来调用,然后每次确保处理完的时候,再在程序里通知RabbitMQ。如何进程挂了,RabbitMQ 就认为没处理完,RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
kafka
生产者丢失数据
如何设置了ack=all,那么leader接收到数据后,所有的follower都同步后,才认为发送成功,否则生产者会不断重试,不会丢失数据。
kafka丢失数据
Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,就丢了一些数据。
需要设置如下参数:
- 给 topic 设置
replication.factor
参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。 - 在 Kafka 服务端设置
min.insync.replicas
参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,这样才能确保 leader 挂了还有一个 follower 。 - 在 producer 端设置
acks=all
:这个是要求每条数据,必须是写入所有 replica 之后,才能认为写成功了。 - 在 producer 端设置
retries=MAX
(很大的一个值,无限次重试):这个是要求一旦写入失败,就无限重试。
这样配置之后,在 Kafka broker 端可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。
消费者丢失数据
唯一可能导致消费者弄丢数据的情况,消费到了这个消息,然后消费者自动提交了 offset,让 Kafka 以为已经消费了这个消息,但其实刚准备处理这个消息,还没处理就挂了,此时这条消息就丢了。
Kafka 会自动提交 offset,关闭自动提交 offset,在处理完之后手动提交 offset,可以保证数据不会丢。此时可能会有重复消费,刚处理完,还没提交 offset,结果挂了,会重复消费一次,保证幂等性就好了。