rmq批量消息
admin
2024-02-20 14:36:38
0

普通批量消息
批量消息对生产者来说就是可以一次性发送多条消息,支持同步、异步发送,不支持单向发送,主题和waitStoreMsgOK必须相同,tags可以不同,并且不支持延时消息和事务消息

		List messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagB", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagC", "OrderID003", "Hello world 2".getBytes()));
//批量消息,支持同步、异步发送,不支持单向发送//主题和waitStoreMsgOK必须相同,并且不支持延时消息SendResult sendResult = producer.send(messages);System.out.println(sendResult);

MessageBatch#encode:生产者将多条消息封装成一个批量消息MessageBatch,它的body是多条消息encode序列化后拼接的结果,发送时放入RemotingCommand.body

public class MessageBatch extends Message implements Iterable {private final List messages;public byte[] encode() {//多条消息encode序列化后拼接的结果return MessageDecoder.encodeMessages(messages);}
}public class DefaultMQProducer extends ClientConfig implements MQProducer {private MessageBatch batch(Collection msgs) throws MQClientException {MessageBatch msgBatch;try {msgBatch = MessageBatch.generateFromList(msgs);for (Message message : msgBatch) {Validators.checkMessage(message, this);MessageClientIDSetter.setUniqID(message);message.setTopic(withNamespace(message.getTopic()));}MessageClientIDSetter.setUniqID(msgBatch);//设置bodymsgBatch.setBody(msgBatch.encode());} catch (Exception e) {throw new MQClientException("Failed to initiate the MessageBatch", e);}msgBatch.setTopic(withNamespace(msgBatch.getTopic()));return msgBatch;}
}public SendResult sendMessage(...){
...RemotingCommand request = null;request.setBody(msg.getBody());...
}

broker端将RemotingCommand.body取出来放入MessageExtBatch.body,填充属性后,写入日志文件

//SendMessageProcessor
private RemotingCommand sendBatchMessage(final RemotingCommand request,...) {...messageExtBatch.setBody(request.getBody());...
}

最终返回多个消息id,普通批量消息是生产者将多条消息打包发送,broker对批量消息的存储和消费者的消费和普通消息一样

SendResult [sendStatus=SEND_OK, msgId=7F000001388018B4AAC284D2FDCD0000,7F000001388018B4AAC284D2FDCD0001,7F000001388018B4AAC284D2FDCD0002, offsetMsgId=C0A8074100002A9F00000000000F2B38,C0A8074100002A9F00000000000F2C58,C0A8074100002A9F00000000000F2D78, messageQueue=MessageQueue [topic=BatchTest, brokerName=broker-a, queueId=1], queueOffset=6]

内部批量消息

内部批量消息需要在创建主题时指定queue.type属性为BatchCQ,dashboard上不支持该配置,需要通过mqAdmin工具创建,并且该配置只能在创建时指定

.\mqadmin.cmd updateTopic -c DefaultCluster -n 192.168.7.65:9876 -t InnerBatchTopic -a +queue.type=BatchCQ
create topic to 192.168.7.65:10911 success.
TopicConfig [topicName=InnerBatchTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+queue.type=BatchCQ}]

内部批量消息是生产者打包发送,broker端把多条消息当作1条写入,而不是先拆分成多条,因此发送结果中也只有一个消息id

SendResult [sendStatus=SEND_OK, 
msgId=7F000001149018B4AAC2851B0BAC0003, offsetMsgId=C0A8074100002A9F00000000000F38F9, messageQueue=MessageQueue [topic=InnerBatchTopic, brokerName=broker-a, queueId=7], queueOffset=3]

内部批量消息的特点:

  • 日志文件的存储:相当于将多条消息合并为一条存储,body中包含了多条消息,sysFlag有INNER_BATCH_FLAG、NEED_UNWRAP_FLAG,属性:INNER_NUM,记录了内部包含的消息个数;INNER_BASE:在消费队列的位点(第几条消息),它之后的消息的位点会加上内部消息个数
  • 消费队列的存储(BatchConsumeQueue):队列文件路径和普通消息完全不同,是${home}/store/batchconsumequeue,其中存放了消息的物理偏移量,大小,tagsCode、存储时间、其中第一条消息的消费位点、内部消息数,其中的tagsCode=0

消费(BatchConsumeQueue#getBatchMsgIndexBuffer):根据消费位点在消费队列中定位消息的位置时,因为每条批量消息包含的消息数不是固定的,所以无法根据消费位点直接计算出消息在队列的偏移量。

  1. 通过读取每个队列文件的第一条消息的起始消费位点来确定待拉取消息处于哪个文件,这里通过ConCurrentSkipListMap缓存了每个文件的第一条消息的位点,获取小于等于指定位点的最大Entry即可
public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {private ConcurrentSkipListMap offsetCache;private MappedFile searchOffsetFromCache(long msgOffset) {//获取小于等于指定位点的最大Entry,跳表支持范围查找Map.Entry floorEntry = this.offsetCache.floorEntry(msgOffset);...}//在新建队列文件时进行缓存private void cacheBcq(MappedFile bcq) {BatchOffsetIndex min = getMinMsgOffset(bcq, false, true);this.offsetCache.put(min.getMsgOffset(), min.getMappedFile());this.timeCache.put(min.getStoreTimestamp(), min.getMappedFile());}
}
  1. 得到文件后,使用二分法在文件中查找批量消息的偏移量
	private int binarySearch(ByteBuffer byteBuffer/*文件*/, int left/*第一条批量消息的偏移量*/, int right/*最后一条批量消息的偏移量*/, final int unitSize/*批量消息大小,队列中的批量消息是定长的,但代表的实际消息数不是定长的*/, final int unitShift/*消费位点对于每条消息的相对偏移量*/,long targetValue/*带查找的位点*/) {int maxRight = right;int mid = -1;while (left <= right) {//这里不是Math.ceil,是(pos / CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE,确保是CQ_STORE_UNIT_SIZE的整数倍mid = ceil((left + right) / 2);//中间消息位点long tmpValue = byteBuffer.getLong(mid + unitShift);if (tmpValue == targetValue) {return mid;}if (tmpValue > targetValue) {//中位点>待查找的,排除中位点到right的right = mid - unitSize;} else {if (mid == left) {//mid有可能等于left,但不可能等于right,因为是除法截断了小数//当mid等于left时,说明left和right相邻了,这时待查找的要么是left,要么是right,//先判断right的位点如果小于待找的,则right,否则leftif (mid + unitSize <= maxRight&& byteBuffer.getLong(mid + unitSize + unitShift) <= targetValue) {return mid + unitSize;} else {return mid;}} else {left = mid;}}}return -1;}
  1. 消费者(PullAPIWrapper#processPullResult):消费端根据sysFlag是否开启了INNER_BATCH_FLAG和NEED_UNWRAP_FLAG来判断是否是批量消息,如果是,将批量消息的body中的真正的消息的消息体和属性读取出来,其余的使用批量消息的属性(MessageDecoder#decodeMessage)

总结

内部批量消息不支持tag过滤,只能消费指定主题的所有消息,因为它在broker端的存储是单条tagsCode为0的消息存储的,如果消费者指定了tags过滤,只要指定的tags的hashcode不为0,所有的消息将在broker端过滤掉,一条都拉取不到

相关内容

热门资讯

linux入门---制作进度条 了解缓冲区 我们首先来看看下面的操作: 我们首先创建了一个文件并在这个文件里面添加了...
C++ 机房预约系统(六):学... 8、 学生模块 8.1 学生子菜单、登录和注销 实现步骤: 在Student.cpp的...
A.机器学习入门算法(三):基... 机器学习算法(三):K近邻(k-nearest neigh...
数字温湿度传感器DHT11模块... 模块实例https://blog.csdn.net/qq_38393591/article/deta...
有限元三角形单元的等效节点力 文章目录前言一、重新复习一下有限元三角形单元的理论1、三角形单元的形函数(Nÿ...
Redis 所有支持的数据结构... Redis 是一种开源的基于键值对存储的 NoSQL 数据库,支持多种数据结构。以下是...
win下pytorch安装—c... 安装目录一、cuda安装1.1、cuda版本选择1.2、下载安装二、cudnn安装三、pytorch...
MySQL基础-多表查询 文章目录MySQL基础-多表查询一、案例及引入1、基础概念2、笛卡尔积的理解二、多表查询的分类1、等...
keil调试专题篇 调试的前提是需要连接调试器比如STLINK。 然后点击菜单或者快捷图标均可进入调试模式。 如果前面...
MATLAB | 全网最详细网... 一篇超超超长,超超超全面网络图绘制教程,本篇基本能讲清楚所有绘制要点&#...
IHome主页 - 让你的浏览... 随着互联网的发展,人们越来越离不开浏览器了。每天上班、学习、娱乐,浏览器...
TCP 协议 一、TCP 协议概念 TCP即传输控制协议(Transmission Control ...
营业执照的经营范围有哪些 营业执照的经营范围有哪些 经营范围是指企业可以从事的生产经营与服务项目,是进行公司注册...
C++ 可变体(variant... 一、可变体(variant) 基础用法 Union的问题: 无法知道当前使用的类型是什...
血压计语音芯片,电子医疗设备声... 语音电子血压计是带有语音提示功能的电子血压计,测量前至测量结果全程语音播报࿰...
MySQL OCP888题解0... 文章目录1、原题1.1、英文原题1.2、答案2、题目解析2.1、题干解析2.2、选项解析3、知识点3...
【2023-Pytorch-检... (肆十二想说的一些话)Yolo这个系列我们已经更新了大概一年的时间,现在基本的流程也走走通了,包含数...
实战项目:保险行业用户分类 这里写目录标题1、项目介绍1.1 行业背景1.2 数据介绍2、代码实现导入数据探索数据处理列标签名异...
记录--我在前端干工地(thr... 这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助 前段时间接触了Th...
43 openEuler搭建A... 文章目录43 openEuler搭建Apache服务器-配置文件说明和管理模块43.1 配置文件说明...