rmq批量消息

普通批量消息
批量消息对生产者来说就是可以一次性发送多条消息,支持同步、异步发送,不支持单向发送,主题和waitStoreMsgOK必须相同,tags可以不同,并且不支持延时消息和事务消息

		List messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagB", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagC", "OrderID003", "Hello world 2".getBytes()));
//批量消息,支持同步、异步发送,不支持单向发送//主题和waitStoreMsgOK必须相同,并且不支持延时消息SendResult sendResult = producer.send(messages);System.out.println(sendResult);

MessageBatch#encode:生产者将多条消息封装成一个批量消息MessageBatch,它的body是多条消息encode序列化后拼接的结果,发送时放入RemotingCommand.body

public class MessageBatch extends Message implements Iterable {private final List messages;public byte[] encode() {//多条消息encode序列化后拼接的结果return MessageDecoder.encodeMessages(messages);}
}public class DefaultMQProducer extends ClientConfig implements MQProducer {private MessageBatch batch(Collection msgs) throws MQClientException {MessageBatch msgBatch;try {msgBatch = MessageBatch.generateFromList(msgs);for (Message message : msgBatch) {Validators.checkMessage(message, this);MessageClientIDSetter.setUniqID(message);message.setTopic(withNamespace(message.getTopic()));}MessageClientIDSetter.setUniqID(msgBatch);//设置bodymsgBatch.setBody(msgBatch.encode());} catch (Exception e) {throw new MQClientException("Failed to initiate the MessageBatch", e);}msgBatch.setTopic(withNamespace(msgBatch.getTopic()));return msgBatch;}
}public SendResult sendMessage(...){
...RemotingCommand request = null;request.setBody(msg.getBody());...
}

broker端将RemotingCommand.body取出来放入MessageExtBatch.body,填充属性后,写入日志文件

//SendMessageProcessor
private RemotingCommand sendBatchMessage(final RemotingCommand request,...) {...messageExtBatch.setBody(request.getBody());...
}

最终返回多个消息id,普通批量消息是生产者将多条消息打包发送,broker对批量消息的存储和消费者的消费和普通消息一样

SendResult [sendStatus=SEND_OK, msgId=7F000001388018B4AAC284D2FDCD0000,7F000001388018B4AAC284D2FDCD0001,7F000001388018B4AAC284D2FDCD0002, offsetMsgId=C0A8074100002A9F00000000000F2B38,C0A8074100002A9F00000000000F2C58,C0A8074100002A9F00000000000F2D78, messageQueue=MessageQueue [topic=BatchTest, brokerName=broker-a, queueId=1], queueOffset=6]

内部批量消息

内部批量消息需要在创建主题时指定queue.type属性为BatchCQ,dashboard上不支持该配置,需要通过mqAdmin工具创建,并且该配置只能在创建时指定

.\mqadmin.cmd updateTopic -c DefaultCluster -n 192.168.7.65:9876 -t InnerBatchTopic -a +queue.type=BatchCQ
create topic to 192.168.7.65:10911 success.
TopicConfig [topicName=InnerBatchTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+queue.type=BatchCQ}]

内部批量消息是生产者打包发送,broker端把多条消息当作1条写入,而不是先拆分成多条,因此发送结果中也只有一个消息id

SendResult [sendStatus=SEND_OK, 
msgId=7F000001149018B4AAC2851B0BAC0003, offsetMsgId=C0A8074100002A9F00000000000F38F9, messageQueue=MessageQueue [topic=InnerBatchTopic, brokerName=broker-a, queueId=7], queueOffset=3]

内部批量消息的特点:

  • 日志文件的存储:相当于将多条消息合并为一条存储,body中包含了多条消息,sysFlag有INNER_BATCH_FLAG、NEED_UNWRAP_FLAG,属性:INNER_NUM,记录了内部包含的消息个数;INNER_BASE:在消费队列的位点(第几条消息),它之后的消息的位点会加上内部消息个数
  • 消费队列的存储(BatchConsumeQueue):队列文件路径和普通消息完全不同,是${home}/store/batchconsumequeue,其中存放了消息的物理偏移量,大小,tagsCode、存储时间、其中第一条消息的消费位点、内部消息数,其中的tagsCode=0

消费(BatchConsumeQueue#getBatchMsgIndexBuffer):根据消费位点在消费队列中定位消息的位置时,因为每条批量消息包含的消息数不是固定的,所以无法根据消费位点直接计算出消息在队列的偏移量。

  1. 通过读取每个队列文件的第一条消息的起始消费位点来确定待拉取消息处于哪个文件,这里通过ConCurrentSkipListMap缓存了每个文件的第一条消息的位点,获取小于等于指定位点的最大Entry即可
public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {private ConcurrentSkipListMap offsetCache;private MappedFile searchOffsetFromCache(long msgOffset) {//获取小于等于指定位点的最大Entry,跳表支持范围查找Map.Entry floorEntry = this.offsetCache.floorEntry(msgOffset);...}//在新建队列文件时进行缓存private void cacheBcq(MappedFile bcq) {BatchOffsetIndex min = getMinMsgOffset(bcq, false, true);this.offsetCache.put(min.getMsgOffset(), min.getMappedFile());this.timeCache.put(min.getStoreTimestamp(), min.getMappedFile());}
}
  1. 得到文件后,使用二分法在文件中查找批量消息的偏移量
	private int binarySearch(ByteBuffer byteBuffer/*文件*/, int left/*第一条批量消息的偏移量*/, int right/*最后一条批量消息的偏移量*/, final int unitSize/*批量消息大小,队列中的批量消息是定长的,但代表的实际消息数不是定长的*/, final int unitShift/*消费位点对于每条消息的相对偏移量*/,long targetValue/*带查找的位点*/) {int maxRight = right;int mid = -1;while (left <= right) {//这里不是Math.ceil,是(pos / CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE,确保是CQ_STORE_UNIT_SIZE的整数倍mid = ceil((left + right) / 2);//中间消息位点long tmpValue = byteBuffer.getLong(mid + unitShift);if (tmpValue == targetValue) {return mid;}if (tmpValue > targetValue) {//中位点>待查找的,排除中位点到right的right = mid - unitSize;} else {if (mid == left) {//mid有可能等于left,但不可能等于right,因为是除法截断了小数//当mid等于left时,说明left和right相邻了,这时待查找的要么是left,要么是right,//先判断right的位点如果小于待找的,则right,否则leftif (mid + unitSize <= maxRight&& byteBuffer.getLong(mid + unitSize + unitShift) <= targetValue) {return mid + unitSize;} else {return mid;}} else {left = mid;}}}return -1;}
  1. 消费者(PullAPIWrapper#processPullResult):消费端根据sysFlag是否开启了INNER_BATCH_FLAG和NEED_UNWRAP_FLAG来判断是否是批量消息,如果是,将批量消息的body中的真正的消息的消息体和属性读取出来,其余的使用批量消息的属性(MessageDecoder#decodeMessage)

总结

内部批量消息不支持tag过滤,只能消费指定主题的所有消息,因为它在broker端的存储是单条tagsCode为0的消息存储的,如果消费者指定了tags过滤,只要指定的tags的hashcode不为0,所有的消息将在broker端过滤掉,一条都拉取不到

收藏
0
有帮助
0
没帮助
0
相关内容