rocketMQ -- offset管理
在rocketMQ中,offset用来管理每个消费队列的不同消费组的消费进度。对offset的管理分为本地模式和远程模式,本地模式是以文本文件的形式存储在客户端,而远程模式是将数据保存到broker端,对应的数据结构分别为LocalFileOffsetStore和RemoteBrokerOffsetStore。
默认情况下,当消费模式为广播模式时,offset使用本地模式存储,因为每条消息会被所有的消费者消费,每个消费者管理自己的消费进度,各个消费者之间不存在消费进度的交集;当消费模式为集群消费时,则使用远程模式管理offset,消息会被多个消费者消费,不同的是每个消费者只负责消费其中部分消费队列,添加或删除消费者,都会使负载发生变动,容易造成消费进度冲突,因此需要集中管理。同时,RocketMQ也提供接口供用户自己实现offset管理(实现OffsetStore接口)。
生产环境上一般使用集群模式,本文主要记录集群模式下offset的管理,即RemoteBrokerOffsetStore。
rocketMQ的broker端中,offset的是以json的形式持久化到磁盘文件中,文件路径为${user.home}/store/config/consumerOffset.json。其内容示例如下:
broker端启动后,会调用BrokerController.initialize()方法,方法中会对offset进行加载,consumerOffsetManager.load()。获取文件内容后,序列化为ConsumerOffsetManager对象,实质是其属性ConcurrentMap offsetTable, offsetTable的数据结构为ConcurrentMap,是一个线程安全的容器,key的形式为topic@group(每个topic下不同消费组的消费进度),value也是一个ConcurrentMap,key为queueId,value为消费位移(这里不是offset而是位移)。通过对全局ConsumerOffsetManager对象就可以对各个topic下不同消费组的消费位移进行获取与管理。
如下图所示,producer发送消息到broker之后,会将消息具体内容持久化到commitLog文件中,再分发到topic下的消费队列consume Queue,消费者提交消费请求时,broker从该consumer负责的消费队列中根据请求参数起始offset获取待消费的消息索引信息,再从commitLog中获取具体的消息内容返回给consumer。在这个过程中,consumer提交的offset为本次请求的起始消费位置,即beginOffset;consume Queue中的offset定位了commitLog中具体消息的位置。
consume Queue中每个消息索引信息长度为20bytes,包括8位长度的offset,记录commitLog中消息内容的位移;4位长度的size,记录具体消息内容的长度;8位长度的tagHashCode,记录消息的tag的哈希值(订阅时如果指定tag,会根据HashCode快速查找订阅的消息)
对于consumer的消费请求处理(PullMessageProcessor.processRequest()),除了待消费的消息内容,broker在responseHeader(PullMessageResponseHeader)附带上当前消费队列的最小offset(minOffset)、最大offset(maxOffset)、及下次拉取的起始offset(nextBeginOffset)。
其中nextBeginOffset是consumer在下一轮消息拉取时offset的重要依据,无论当次拉取的消息消费是否正常,nextBeginOffset都不会回滚,这是因为rocketMQ对消费异常的消息的处理是将消息重新发回broker端的重试队列(会为每个topic创建一个重试队列,以%RERTY%开头),达到重试时间后将消息投递到重试队列中进行消费重试。对消费异常的处理不是通过offset回滚,这使得客户端简化了offset的管理。
consumer启动过程中(Consumer主函数默认调用DefaultMQPushConsumer.start()方法)根据MessageModel选择对应的offsetStore,然后调用offsetStore.load()对offset进行加载,LocalFileOffsetStore是对本地文件的加载,而RemotebrokerOffsetStore是没有本地文件的,因此load()方法没有实现。在rebalance完成对messageQueue的分配之后会对messageQueue对应的消费位置offset进行更新。
Push模式下,computePullFromWhere()方法的实现类为RebalancePushImpl.class。根据配置信息consumeFromWhere进行不同的操作。ConsumeFromWhere的类型枚举如下,其中有三个已经被标记为Deprecated(基于rocketmq-all 4.6.0版本)
从最新的offset开始消费。
获取consumer对当前消息队列messageQueue的消费进度lastOffset,如果lastOffset>=0,从lastOffset开始消费;如果lastOffset小于0说明是first start,没有offset信息,topic为重试topic时从0开始消费,否则请求获取该消息队列对应的消费队列consumeQueue的最大offset(maxOffset),从maxOffset开始消费
从第一个offset开始消费。
获取consumer对当前消息队列messageQueue的消费进度lastOffset,如果lastOffset>=0,从lastOffset开始消费;
否则从0开始消费。
获取consumer对当前消息队列messageQueue的消费进度lastOffset,如果lastOffset>=0,从lastOffset开始消费;
当lastOffset
鹏仔微信 15129739599 鹏仔QQ344225443 鹏仔前端 pjxi.com 共享博客 sharedbk.com
图片声明:本站部分配图来自网络。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!