kafka重试队列
admin
2024-04-20 16:31:50
0

        kafka没有重试机制不支持消息重试,也没有死信队列,因此使用kafka做消息队列时,需要自己实现消息重试的功能。

实现

创建新的kafka主题作为重试队列:

  1. 创建一个topic作为重试topic,用于接收等待重试的消息。
  2. 普通topic消费者设置待重试消息的下一个重试topic。
  3. 从重试topic获取待重试消息储存到redis的zset中,并以下一次消费时间排序
  4. 定时任务从redis获取到达消费事件的消息,并把消息发送到对应的topic
  5. 同一个消息重试次数过多则不再重试

代码实现

1、新建springboot项目


4.0.0org.springframework.bootspring-boot-starter-parent2.2.8.RELEASE com.lagou.kafka.demodemo-retryqueue0.0.1-SNAPSHOTdemo-retryqueueDemo project for Spring Boot1.8org.springframework.bootspring-boot-starter-data-redisorg.springframework.bootspring-boot-starter-weborg.springframework.kafkaspring-kafkacom.alibabafastjson1.2.73org.springframework.bootspring-boot-starter-testtestorg.junit.vintagejunit-vintage-engineio.projectreactorreactor-testtestorg.springframework.kafkaspring-kafka-testtestorg.springframework.bootspring-boot-maven-plugin

2、添加application.properties

# bootstrap.servers
spring.kafka.bootstrap-servers=node1:9092
# key序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# value序列化器
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer# 消费组id:group.id
spring.kafka.consumer.group-id=retryGroup
# key反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# value反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer# redis数据库编号
spring.redis.database=0
# redis主机地址
spring.redis.host=node1
# redis端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=1000# Kafka主题名称,业务主题
spring.kafka.topics.test=tp_demo_retry_01
# 重试队列,重试主题
spring.kafka.topics.retry=tp_demo_retry_02

3、RetryqueueApplication

package com.lagou.kafka.demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RetryqueueApplication {public static void main(String[] args) {SpringApplication.run(RetryqueueApplication.class, args);}}

4、AppConfig

package com.lagou.kafka.demo.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;@Configuration
public class AppConfig {@Beanpublic RedisTemplate redisTemplate(RedisConnectionFactory factory) {RedisTemplate template = new RedisTemplate<>();// 配置连接工厂template.setConnectionFactory(factory);return template;}}

5、RetryController

package com.lagou.kafka.demo.controller;import com.lagou.kafka.demo.service.KafkaService;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.ExecutionException;@RestController
public class RetryController {@Autowiredprivate KafkaService kafkaService;@Value("${spring.kafka.topics.test}")private String topic;@RequestMapping("/send/{message}")public String sendMessage(@PathVariable String message) throws ExecutionException, InterruptedException {ProducerRecord record = new ProducerRecord<>(topic,message);// 向业务主题发送消息String result = kafkaService.sendMessage(record);return result;}}

6、KafkaService

package com.lagou.kafka.demo.service;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.concurrent.ExecutionException;@Service
public class KafkaService {private Logger log = LoggerFactory.getLogger(KafkaService.class);@Autowiredprivate KafkaTemplate kafkaTemplate;public String sendMessage(ProducerRecord record) throws ExecutionException, InterruptedException {SendResult result = this.kafkaTemplate.send(record).get();RecordMetadata metadata = result.getRecordMetadata();String returnResult = metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset();log.info("发送消息成功:" + returnResult);return returnResult;}}

7、RetryService

package com.lagou.kafka.demo.service;import com.alibaba.fastjson.JSON;
import com.lagou.kafka.demo.entity.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;import java.nio.ByteBuffer;
import java.util.Calendar;
import java.util.Date;@Service
public class RetryService {private static final Logger log = LoggerFactory.getLogger(RetryService.class);/*** 消息消费失败后下一次消费的延迟时间(秒)* 第一次重试延迟10秒;第	二次延迟30秒,第三次延迟1分钟...*/private static final int[] RETRY_INTERVAL_SECONDS = {10, 30, 1*60, 2*60, 5*60, 10*60, 30*60, 1*60*60, 2*60*60};/*** 重试topic*/@Value("${spring.kafka.topics.retry}")private String retryTopic;@Autowiredprivate KafkaTemplate kafkaTemplate;public void consumerLater(ConsumerRecord record){// 获取消息的已重试次数int retryTimes = getRetryTimes(record);Date nextConsumerTime = getNextConsumerTime(retryTimes);// 如果达到重试次数,则不再重试if(nextConsumerTime == null) {return;}// 组织消息RetryRecord retryRecord = new RetryRecord();retryRecord.setNextTime(nextConsumerTime.getTime());retryRecord.setTopic(record.topic());retryRecord.setRetryTimes(retryTimes);retryRecord.setKey(record.key());retryRecord.setValue(record.value());// 转换为字符串String value = JSON.toJSONString(retryRecord);// 发送到重试队列kafkaTemplate.send(retryTopic, null, value);}/*** 获取消息的已重试次数*/private int getRetryTimes(ConsumerRecord record){int retryTimes = -1;for(Header header : record.headers()){if(RetryRecord.KEY_RETRY_TIMES.equals(header.key())){ByteBuffer buffer = ByteBuffer.wrap(header.value());retryTimes = buffer.getInt();}}retryTimes++;return retryTimes;}/*** 获取待重试消息的下一次消费时间*/private Date getNextConsumerTime(int retryTimes){// 重试次数超过上限,不再重试if(RETRY_INTERVAL_SECONDS.length < retryTimes) {return null;}Calendar calendar = Calendar.getInstance();calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]);return calendar.getTime();}
}

8、ConsumerListener

package com.lagou.kafka.demo.listener;import com.lagou.kafka.demo.service.RetryService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListener {private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);@Autowiredprivate RetryService kafkaRetryService;private static int index = 0;@KafkaListener(topics = "${spring.kafka.topics.test}", groupId = "${spring.kafka.consumer.group-id}")public void consume(ConsumerRecord record) {try {// 业务处理log.info("消费的消息:" + record);index++;if (index % 2 == 0) {throw new Exception("该重发了");}} catch (Exception e) {log.error(e.getMessage());// 消息重试,实际上先将消息放到rediskafkaRetryService.consumerLater(record);}}}

9、RetryListener

package com.lagou.kafka.demo.listener;import com.alibaba.fastjson.JSON;
import com.lagou.kafka.demo.entity.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.Set;
import java.util.UUID;@Component
@EnableScheduling
public class RetryListener {private Logger log = LoggerFactory.getLogger(RetryListener.class);private static final String RETRY_KEY_ZSET = "_retry_key";private static final String RETRY_VALUE_MAP = "_retry_value";@Autowiredprivate RedisTemplate redisTemplate;@Autowiredprivate KafkaTemplate kafkaTemplate;@Value("${spring.kafka.topics.test}")private String bizTopic;@KafkaListener(topics = "${spring.kafka.topics.retry}")
//    public void consume(List> list) {
//        for(ConsumerRecord record : list){public void consume(ConsumerRecord record) {System.out.println("需要重试的消息:" + record);RetryRecord retryRecord = JSON.parseObject(record.value(), RetryRecord.class);/*** 防止待重试消息太多撑爆redis,可以将待重试消息按下一次重试时间分开存储放到不同介质* 例如下一次重试时间在半小时以后的消息储存到mysql,并定时从mysql读取即将重试的消息储储存到redis*/// 通过redis的zset进行时间排序String key = UUID.randomUUID().toString();redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value());redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime());}
//    }/*** 定时任务从redis读取到达重试时间的消息,发送到对应的topic*/
//    @Scheduled(cron="2 * * * * *")@Scheduled(fixedDelay = 2000)public void retryFromRedis() {log.warn("retryFromRedis----begin");long currentTime = System.currentTimeMillis();// 根据时间倒序获取Set> typedTuples =redisTemplate.opsForZSet().reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime);// 移除取出的消息redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime);for(ZSetOperations.TypedTuple tuple : typedTuples){String key = tuple.getValue().toString();String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP, key).toString();redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key);RetryRecord retryRecord = JSON.parseObject(value, RetryRecord.class);ProducerRecord record = retryRecord.parse();ProducerRecord recordReal = new ProducerRecord(bizTopic,record.partition(),record.timestamp(),record.key(),record.value(),record.headers());kafkaTemplate.send(recordReal);}// todo 发生异常将发送失败的消息重新发送到redis}
} 

10、RetryRecord

package com.lagou.kafka.demo.entity;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;public class RetryRecord {public static final String KEY_RETRY_TIMES = "retryTimes";private String key;private String value;private Integer retryTimes;private String topic;private Long nextTime;public RetryRecord() {}public String getKey() {return key;}public void setKey(String key) {this.key = key;}public String getValue() {return value;}public void setValue(String value) {this.value = value;}public Integer getRetryTimes() {return retryTimes;}public void setRetryTimes(Integer retryTimes) {this.retryTimes = retryTimes;}public String getTopic() {return topic;}public void setTopic(String topic) {this.topic = topic;}public Long getNextTime() {return nextTime;}public void setNextTime(Long nextTime) {this.nextTime = nextTime;}public ProducerRecord parse() {Integer partition = null;Long timestamp = System.currentTimeMillis();List
headers = new ArrayList<>();ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4);retryTimesBuffer.putInt(retryTimes);retryTimesBuffer.flip();headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer));ProducerRecord sendRecord = new ProducerRecord(topic, partition, timestamp, key, value, headers);return sendRecord;} }

启动,访问浏览器,进行测试

相关内容

热门资讯

linux入门---制作进度条 了解缓冲区 我们首先来看看下面的操作: 我们首先创建了一个文件并在这个文件里面添加了...
C++ 机房预约系统(六):学... 8、 学生模块 8.1 学生子菜单、登录和注销 实现步骤: 在Student.cpp的...
A.机器学习入门算法(三):基... 机器学习算法(三):K近邻(k-nearest neigh...
数字温湿度传感器DHT11模块... 模块实例https://blog.csdn.net/qq_38393591/article/deta...
有限元三角形单元的等效节点力 文章目录前言一、重新复习一下有限元三角形单元的理论1、三角形单元的形函数(Nÿ...
Redis 所有支持的数据结构... Redis 是一种开源的基于键值对存储的 NoSQL 数据库,支持多种数据结构。以下是...
win下pytorch安装—c... 安装目录一、cuda安装1.1、cuda版本选择1.2、下载安装二、cudnn安装三、pytorch...
MySQL基础-多表查询 文章目录MySQL基础-多表查询一、案例及引入1、基础概念2、笛卡尔积的理解二、多表查询的分类1、等...
keil调试专题篇 调试的前提是需要连接调试器比如STLINK。 然后点击菜单或者快捷图标均可进入调试模式。 如果前面...
MATLAB | 全网最详细网... 一篇超超超长,超超超全面网络图绘制教程,本篇基本能讲清楚所有绘制要点&#...
IHome主页 - 让你的浏览... 随着互联网的发展,人们越来越离不开浏览器了。每天上班、学习、娱乐,浏览器...
TCP 协议 一、TCP 协议概念 TCP即传输控制协议(Transmission Control ...
营业执照的经营范围有哪些 营业执照的经营范围有哪些 经营范围是指企业可以从事的生产经营与服务项目,是进行公司注册...
C++ 可变体(variant... 一、可变体(variant) 基础用法 Union的问题: 无法知道当前使用的类型是什...
血压计语音芯片,电子医疗设备声... 语音电子血压计是带有语音提示功能的电子血压计,测量前至测量结果全程语音播报࿰...
MySQL OCP888题解0... 文章目录1、原题1.1、英文原题1.2、答案2、题目解析2.1、题干解析2.2、选项解析3、知识点3...
【2023-Pytorch-检... (肆十二想说的一些话)Yolo这个系列我们已经更新了大概一年的时间,现在基本的流程也走走通了,包含数...
实战项目:保险行业用户分类 这里写目录标题1、项目介绍1.1 行业背景1.2 数据介绍2、代码实现导入数据探索数据处理列标签名异...
记录--我在前端干工地(thr... 这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助 前段时间接触了Th...
43 openEuler搭建A... 文章目录43 openEuler搭建Apache服务器-配置文件说明和管理模块43.1 配置文件说明...