由于网络延迟、机器等原因,消息队列可能出现消息的丢失,那么如何解决这个问题呢。

RabbitMQ

生产者丢了数据

生产者发送消息的时候,可能网络原因没有到达消息队列保存。
RabbitMQ提供了事务的处理,发送消息前开启一个事务channel.txSelect,然后发送消息,如果消息没有被mq收到,那么生产者会抛出异常,并回滚事务channel.txRollback,重新发送数据,如果成功收到,则提交事务channel.txCommit
还有一种方法是,确认机制,每次写消息都会生成一个唯一的id,写成功后会收到一个ack,如果没有处理,要调用nack回调,重新发送。这一过程是异步的。

mq本身丢失了数据

开启 RabbitMQ 的持久化,消息写入之后会持久化到磁盘, RabbitMQ 挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。RabbitMQ 还没持久化,就挂了,可能导致少量数据丢失,这个概率较小。

消费端丢失数据

RabbitMQ 如果丢失了数据,主要是因为消费的时候,刚消费到,还没处理,结果进程挂了,RabbitMQ 认为消费了,数据就丢了。
这个时候得用 RabbitMQ 提供的 ack 机制,必须关闭 RabbitMQ 的自动 ack ,可以通过一个 api 来调用,然后每次确保处理完的时候,再在程序里通知RabbitMQ。如何进程挂了,RabbitMQ 就认为没处理完,RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

kafka

生产者丢失数据

如何设置了ack=all,那么leader接收到数据后,所有的follower都同步后,才认为发送成功,否则生产者会不断重试,不会丢失数据。

kafka丢失数据

Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,就丢了一些数据。
需要设置如下参数:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,这样才能确保 leader 挂了还有一个 follower 。
  • 在 producer 端设置 acks=all :这个是要求每条数据,必须是写入所有 replica 之后,才能认为写成功了。
  • 在 producer 端设置 retries=MAX (很大的一个值,无限次重试):这个是要求一旦写入失败,就无限重试。
    这样配置之后,在 Kafka broker 端可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。

消费者丢失数据

唯一可能导致消费者弄丢数据的情况,消费到了这个消息,然后消费者自动提交了 offset,让 Kafka 以为已经消费了这个消息,但其实刚准备处理这个消息,还没处理就挂了,此时这条消息就丢了。
Kafka 会自动提交 offset,关闭自动提交 offset,在处理完之后手动提交 offset,可以保证数据不会丢。此时可能会有重复消费,刚处理完,还没提交 offset,结果挂了,会重复消费一次,保证幂等性就好了。