# Rocket 消息队列
# 概述
Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者 以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案, 被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。
# 配置
# 引入依赖
<dependency>
<groupId>com.mediway.hos</groupId>
<artifactId>hos-framework-rocketmq-starter</artifactId>
</dependency>
# 配置项
yml配置说明
framework:
mq:
##消息队列地址(rocketMq中为namesrv的地址)
url:
##认证设置,如果没有可不填
accessKey:
##认证设置,如果没有可不填
secretKey:
rocketMq:
##消费超时,默认3000毫秒
consumeTimeout:
##发送超时,默认3000毫秒
sendTimeout:
##重试次数,默认为2
retryTimesWhenSendFailed: 2
##异步重试次数,默认为2
retryTimesWhenSendAsyncFailed: 2
##是否是事务模式
isTransaction: false
##生产者组名称,必填
producerGroupName: test-group
##rocketmq集群名称,用于创建topic,默认为DefaultCluster
clusterName:
##rocketmq创建topic的等待时间,默认1s,单位:秒
createTopicWaitTime: 1
##每个topic对应的写队列的数量,默认16
writeQueueNums: 16
##每个topic对应的读队列的数量
private int readQueueNums: 16
## topic权限设置
perm: 6
##消费线程池最小线程数
consumeThreadMin: 5
##消费线程池最大线程数
consumeThreadMax: 64
# 使用说明
# 简单使用
# 消费者使用
需要实现ConsumerListener接口,实现方法receive,并添加注解HosConsumerAnno,例如
@HosConsumerAnno(topic = "test", group = "test-group")
public class TestConsumers implements ConsumerListener<String> {
@Override
public boolean receive(List<HosReceiveMessage<String>> list) {
list.forEach(message -> {
System.out.println("=========================receive message:================================");
System.out.println(message.getMessageId());
System.out.println(message.getTopic());
System.out.println(message.getProperties());
System.out.println(message.getData());
});
return true;
}
}
其中,receive方法中为具体的业务处理逻辑 HosConsumerAnno注解的参数如下
/**
* 主题
*/
String[] topic();
/**
* 消费者组名
*/
String group() default "";
/**
* 其他参数
* @return
*/
HosConsumerListenerParams[] params() default {};
# 生产者使用
使用@Autowired注入HosMqMessageSendService,例如
@Autowired
private HosMqMessageSendService hosMqMessageSendService;
@RequestMapping("/sendMsg")
public String sendMsg(@RequestParam String content, @RequestParam String topic) {
hosMqMessageSendService.sendMsg(topic,content);
return "success";
}
HosMqMessageSendService方法如下
//根据主题发送消息
public HosMqSendResult sendMsg(String topic,Object data)
//根据主题异步发送消息
public void sendMsgAsync(String topic,Object data, AsyncCallBackBuilder asyncCallBackBuilder)
# RocketMq中的特殊使用
# Tag的使用
关于Tag的使用,在使用注解中,可使用::分割topic和tag,例如 @HosConsumerAnno(topic = "topicName::tag", group = "test-group") 使用HosMqMessageSendService发送时,亦然,例如 hosMqMessageSendService.sendMsg("topicName::tag",content);
# 事务使用
rocketMq中事务的使用,针对在生产者,关于事务的详细讲解可参考https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage/, 1、将framework.mq.rocketMqisTransaction设置为true 2、定义事务监听器:实现TransactionListener,并注册为Bean
public class DefaultTransactionListener implements TransactionListener {
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
// 实现事务状态检查
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 实现事务状态检查
return LocalTransactionState.COMMIT_MESSAGE;
}
}
@Bean
public TransactionListener defaultTransactionListener(){
return new DefaultTransactionListener();
}
# 特有功能说明
# 发送消息RocketHosMqProducer
/**
* 单向发送
* @param rocketMqSendParam
* @param data
* @return
* @param <T>
*/
public <T> void sendMsgOneWay(RocketMqSendParam rocketMqSendParam, T data)
/**
* 发送顺序消息
* @param rocketMqSendParam
* @param data
* @return
* @param <T>
*/
public <T> HosMqSendResult sendMsgQueue(RocketMqSendParam rocketMqSendParam, T data,
MessageQueueSelector selector, Object arg)
//同步发送消息
public <T> HosMqSendResult sendMsg(RocketMqSendParam rocketMqSendParam, T data)
//批量发送同步消息
public <T> HosMqSendResult sendBatchMsg(RocketMqSendParam rocketMqSendParam, List<T> data)
//发送消息,异步回调
public <T> void sendAsyncMsg(RocketMqSendParam rocketMqSendParam, T data, AsyncCallBackBuilder asyncCallBackBuilder)
//批量发送消息,异步回调
public <T> void sendAsyncBatchMsg(RocketMqSendParam rocketMqSendParam, List<T> data, AsyncCallBackBuilder asyncCallBackBuilder) {
rocketMqSendParam--发送参数,属性如下
/**
* 待发送主题,只允许设置一个,不允许为空
*/
private String topic;
/**
* 待发送tag, 可为 空
*/
private String tag;
/**
* 待发送时间,格式为 yyyy-MM-dd HH:mm:ss
*/
private String sendTime;
/**
* 延迟时间,单位为秒
*/
private Long delaySeconds;
data--消息内容 AsyncCallBackBuilder -- 异步回调的方法,属性如下
//成功后的回调
private Consumer<HosMqSendResult> successCallback;
//异常回调
private Consumer<Throwable> failCallback;
# 消费者的特有参数
除了topic和group外,其他的参数通过params属性配置,params中为HosConsumerListenerParams注解 该注解中包含三个参数key, value , type, key--键, value -- 值, type类型-- 包含BOOL,INT,LONG,FLOAT,DOUBLE,STRING,DATE_Y4M2D2,DATE_Y4M2D2_H2M2S2 分别代表布尔,整形,长整形,浮点,双浮点,字符串,yyyy-mm-dd类型的日期,yyyy-mm-dd hh:mm:ss类型的日期,默认为字符串
@HosConsumerAnno(topic = {"test", "test5"}, group = "test-group"
,params = {
@HosConsumerListenerParams(key = "broadcast", value = "true", type = AnnoTypeEnum.BOOL),
@HosConsumerListenerParams(key = "idempotent", value = "true", type = AnnoTypeEnum.BOOL),
@HosConsumerListenerParams(key = "idempotentKey", value = "id"),
}
)
public class TestConsumers implements ConsumerListener<String>
其含义如下: broadcast--是否为广播模式 idempotent--是否开启幂等性判断 idempotentKey--幂等性的key
# 消息发送前以及接收到消息的增强处理
实现HosMessageDealProcessor接口,并注册为Bean
public interface HosMessageDealProcessor {
/**
* 消息处理前置处理
* @param msg 消息对象(封装后的对象),包含了消息内容等信息
*/
void processMessageBeforeMsgSet(MqMsg msg);
/**
* 获取消息处理参数
* @param mqSendParam 发送消息参数
* @param message 消息对象(各种消息队列的原生对象)
* @return
*/
void processMessageBeforePull(JSONObject mqSendParam, Object message);
/**
* 消息接收到,进行业务逻辑前置处理
* @param messageExt 消息对象(各种消息队列的原生对象)
* @param receiveMessage 接收到的消息对象(封装后的对象)包含了消息内容、消息ID等信息
*/
void processMessageReceive(Object messageExt, HosReceiveMessage receiveMessage);
}
# 自定义扩展
可在已有的基础上自定义相关类扩展 HosMqProducer--生产者的处理类
//初始化
void init(MqProperties config);
<T> HosMqSendResult sendMsg(String topic,T data);
/**
* @param data 待发送数据
* @return
*/
<T> HosMqSendResult sendMsg(Map<String, String> params,T data);
/**
* @param data 待发送数据
* @return
*/
<T> void sendMsgAsync(String topic,T data, AsyncCallBackBuilder asyncCallBackBuilder);
/**
* @param data 待发送数据
* @return
*/
<T> void sendMsgAsync(Map<String, String> params,T data, AsyncCallBackBuilder asyncCallBackBuilder);
HosMqMessageHandler--消费者的处理类
/**
* 初始化
* @param config
*/
void init(MqProperties config);
/*
* 初始化所有的消费者
*/
void initConsumerLister(List<ConsumerListener> toDealListenerList);
注入为Bean,例如
@Bean
public HosMqProducer rocketHosMqProducer(@Autowired(required = false) TransactionListener transactionListener,
@Autowired(required = false) RocketMessageDealProvider rocketMessageDealProvider){
return new RocketHosMqProducer(transactionListener,rocketMessageDealProvider);
}
@Bean
public HosMqMessageHandler rocketHosMqMessageHandler(@Autowired(required = false) RocketMessageDealProvider rocketMessageDealProvider){
return new RocketHosMqMessageHandler(rocketMessageDealProvider);
}