rocketMQ offset/丢消息/重复消费
消息队列常见的问题:如消息什么场景会重复消费,如何解决。什么场景下会丢失,如何解决。
重复消费
rocketMQ什么场景会重复消费?
这个就跟offset有关系了。
在rocketMQ中,一种类型的消息会放到一个topic里,一般为了并行,一个topic会有多个queue,offset是指某个topic下的一条消息在msg queue中的位置。通过offset可以定位到这条消息,或者指示消费者从这个消息开始向后处理。
consumer消费数据后,每隔一段时间会把已经消费过的的消息的offset提交一下,表示该消息已消费,若重启,下次会从上次提交的offset的下一个开始消费。
但凡是都有意外,比如系统重启了,进程挂掉了,就可能导致consumer消费了数据,但是没来得及提交offset,这样就导致恢复后的consumer会从上次提交的offset开始消费,所以就产生重复消费。
offset分为两种,本地文件类型和broker代存。
rocketMQ集群有两种消费模式:
1.clustering(集群)模式(默认),就是同一个Consumer group里的多个消费者每人消费一部分,各自收到的消息不一样。这种情况下,由broker存储和控制offset的值,使用RemoteBrokerOffsetStore
结构。
2.broadcasting(广播)模式,每个consumer都收到这个topic的全部消息,consumer间互不干扰,使用LocalfileOffsetStore
,把offset存到本地。
如何解决重复消费的问题
消费方要保证幂等性,也就是消费多次和消费一次的效果是一样的。
如何保证幂等呢?
两种方案:
1.唯一索引,通过mysql唯一索引约束,保证不会重复插入。
2.缓存,将消费过的数据缓存起来,下次消费先校验,消费过直接返回。
消息丢失
场景:
producer生产消息、Broker处理消息、消费时都会出现丢失消息的情况。
可分为三个阶段:
1.Producer 发送消息阶段
2.Broker 处理消息阶段
3.Consumer 消费消息阶段
1.Producer 发送消息阶段
发送消息阶段设计Produc到Broker的网络通信,因此会有丢失消息的可能。tcp也存在丢包可能,当发送数据频率过快,接收端来不及处理,会造成数据丢失。(原因有缓冲区溢出、多线程同步问题)。tcp的解决方案,是控制发送频率,重试机制。
rocketMQ的解决方案:
1.使用同步发送方式,等待broker结果。
2.发送失败或超时,重发。
3.broker多master架构模式,即使某台broker挂了,也能保证消息可以投到另一个正常的broker。
1.使用同步发送方式
rocketMQ提供了三种发送方式:
1.同步发送:Producer向broker发送消息,阻塞当前线程等待响应结果。
2.异步发送:Producer构建一个向broker发消息的任务,把任务丢到线程池,执行完该任务时,回调用户自定义的回调函数,执行处理结果。
3.One way发送:只发不管结果。
在实际调用send方法时,不指定回调函数,则默认使用同步发送方式,这是丢失几率最小的方式。
2.超时重发
在循环里校验发送结果,结果异常时重新循环发送,默认3次。
3.broker多master架构模式,即使某台broker挂了,也能保证消息可以投到另一个正常的broker
如果只有一个broker节点,那么当broker宕机了,即使Producer有重试机制,那么也无法保存消息。多主模式,提高可用性。
总结:
利用同步发送 + 重试机制 + 多个 master 节点,尽可能减小消息丢失的可能性。
2.Broker 处理消息阶段
broker处理阶段有两个场景可能会出现丢消息:
1.消息投递到broker时,并没有直接落到磁盘,而是先存到page cache,然后通过配置的刷盘策略将page cache的数据保存到磁盘中。这个机制可以提高broker的性能,因为写入page cache要比page cache写入磁盘快的多。broker不会等待消息写入磁盘才响应结果,但是这种异步保存,当服务器宕机了,则会丢失page cache中的消息。
解决方案:
rocketMQ也提供了同步刷盘的策略。
public enum FlushDiskType {SYNC_FLUSH, // 同步刷盘 ASYNC_FLUSH// 异步刷盘(默认)}
2.即使设置同步刷盘,但是如果磁盘损坏,消息还是会丢失。
解决方案:
rocketMQ对broker采取主从模式,可以给broker指定slave,slave为同步刷盘策略。
此模式下,Producer每次发送,都会等broker主从节点都落盘成功,broker才会当做消息投递成功,保证消息不丢失。
总结:
在broker端,消息丢失场景主要在刷盘策略和同步机制。
RocketMQ默认broker的刷盘策略为异步刷盘,即使有主从,同步方式也默认为异步同步,这样可以提高处理消息的效率,同时,会有丢消息的可能。因此,为了避免丢消息,可通过同步刷盘策略 + 主从节点 + 同步slave策略。
3.consumer 消费阶段
即使前面流程都没问题,但是也存在consumer没有消费到消息的可能。
首先要了解consumer是如何消费broker的消息的。
Consumer先pull消息到本地,消费完成后返回ack。
通常消费消息的ack机制分两种方式:
1.先提交后消费。
2.先消费,消费成功后提交(at least once)。
当采用方式1时,会有丢消息的可能,因为消费进程可能在提交ack后挂掉,重启后会从下一个消息开始消费,漏掉上一个消息。不过方式1可以解决重复消费问题。因此,rocketMQ默认实现的是方式2,也就是通过at least once机制保证消息可靠消费。
场景2:
消费失败
如果consumer只消费一次,那么消费失败了也算是丢失该消息。
因此,提供了消费重试机制。
总结:
Consumer 消费阶段主要通过 at least once机制 + 消费重试机制 保证消费可靠性。
消息的有序性
对于链路较长的消息,要保证消费时的顺序是和发送的顺序相同。比如:创建订单消息M1,支付消息M2,支付完成消息M3,即便发送时是按顺序发的,那broker接收时有没有可能顺序是乱的呢?是有可能的。网络原因。即便按顺序到达broker,如果落到不同queue,消费顺序依然不能保证。
所以,解决接收时的有序问题,可通过同步发送方式解决。
对于落到不同队列上的问题,可通过分区,确保消息都落到同一个队列。
落到同一个队列也无法保证Consumer消费时的顺序,因为消费M1消息时可能由于网络问题,或者程序问题,导致M1的消费落后于M2.为了解决这个问题,可以通过将这三个消息都用同一个Consumer来消费。
顺序消费包括全局有序和局部有序,rocketMQ只是分区顺序。要全局顺序只能一个分区。
默认情况下,rocketMQ的Producer发消息是采用轮训的方式发送到不同的队列。这种情况下是无法保证有序发送的。
RocketMQ是如何保证分区顺序的: