# Rocket 消息队列

# 概述

Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者 以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案, 被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。

# 配置

# 引入依赖

<dependency>
    <groupId>com.mediway.hos</groupId>
    <artifactId>hos-framework-rocketmq-starter</artifactId>
</dependency>

# 配置项

yml配置说明

framework:
  mq:
    ##消息队列地址(rocketMq中为namesrv的地址)
    url:
    ##认证设置,如果没有可不填
    accessKey:
    ##认证设置,如果没有可不填
    secretKey:
    rocketMq:
      ##消费超时,默认3000毫秒
      consumeTimeout:
      ##发送超时,默认3000毫秒
      sendTimeout:
      ##重试次数,默认为2
      retryTimesWhenSendFailed: 2
      ##异步重试次数,默认为2
      retryTimesWhenSendAsyncFailed: 2
      ##是否是事务模式
      isTransaction: false
      ##生产者组名称,必填
      producerGroupName: test-group
      ##rocketmq集群名称,用于创建topic,默认为DefaultCluster
      clusterName:
      ##rocketmq创建topic的等待时间,默认1s,单位:秒
      createTopicWaitTime: 1
      ##每个topic对应的写队列的数量,默认16
      writeQueueNums: 16
      ##每个topic对应的读队列的数量
      private int readQueueNums:  16
      ## topic权限设置
      perm: 6
      ##消费线程池最小线程数
      consumeThreadMin: 5
      ##消费线程池最大线程数
      consumeThreadMax: 64

# 使用说明

# 简单使用

# 消费者使用

需要实现ConsumerListener接口,实现方法receive,并添加注解HosConsumerAnno,例如

@HosConsumerAnno(topic = "test", group = "test-group")
public class TestConsumers implements ConsumerListener<String> {
  @Override
  public boolean receive(List<HosReceiveMessage<String>> list) {
    list.forEach(message -> {
      System.out.println("=========================receive message:================================");
      System.out.println(message.getMessageId());
      System.out.println(message.getTopic());
      System.out.println(message.getProperties());
      System.out.println(message.getData());
      });
    return true;
  }
}

其中,receive方法中为具体的业务处理逻辑 HosConsumerAnno注解的参数如下

/**
* 主题
  */
  String[] topic();

/**
* 消费者组名
  */
  String group() default "";

/**
* 其他参数
* @return
  */
  HosConsumerListenerParams[] params() default {};

# 生产者使用

使用@Autowired注入HosMqMessageSendService,例如

@Autowired
private HosMqMessageSendService hosMqMessageSendService;

@RequestMapping("/sendMsg")
public String sendMsg(@RequestParam String content, @RequestParam String topic) {
hosMqMessageSendService.sendMsg(topic,content);
return "success";
}

HosMqMessageSendService方法如下

//根据主题发送消息
public HosMqSendResult sendMsg(String topic,Object data)

//根据主题异步发送消息
public void sendMsgAsync(String topic,Object data,  AsyncCallBackBuilder asyncCallBackBuilder)

# RocketMq中的特殊使用

# Tag的使用

关于Tag的使用,在使用注解中,可使用::分割topic和tag,例如 @HosConsumerAnno(topic = "topicName::tag", group = "test-group") 使用HosMqMessageSendService发送时,亦然,例如 hosMqMessageSendService.sendMsg("topicName::tag",content);

# 事务使用

rocketMq中事务的使用,针对在生产者,关于事务的详细讲解可参考https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage/, 1、将framework.mq.rocketMqisTransaction设置为true 2、定义事务监听器:实现TransactionListener,并注册为Bean

public class DefaultTransactionListener implements TransactionListener {

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        // 实现事务状态检查
        return LocalTransactionState.COMMIT_MESSAGE;
    }


    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 实现事务状态检查
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
@Bean
public TransactionListener defaultTransactionListener(){
    return new DefaultTransactionListener();
}

# 特有功能说明

# 发送消息RocketHosMqProducer
/**
*  单向发送
* @param rocketMqSendParam
* @param data
* @return
* @param <T>
*/
public <T> void sendMsgOneWay(RocketMqSendParam rocketMqSendParam, T data)

    /**
     *  发送顺序消息
     * @param rocketMqSendParam
     * @param data
     * @return
     * @param <T>
     */
    public <T> HosMqSendResult sendMsgQueue(RocketMqSendParam rocketMqSendParam, T data,
                                            MessageQueueSelector  selector, Object arg) 

    //同步发送消息
    public <T> HosMqSendResult sendMsg(RocketMqSendParam rocketMqSendParam, T data) 
    
    //批量发送同步消息
    public <T> HosMqSendResult sendBatchMsg(RocketMqSendParam rocketMqSendParam, List<T> data) 

    //发送消息,异步回调
    public <T> void sendAsyncMsg(RocketMqSendParam rocketMqSendParam, T data, AsyncCallBackBuilder asyncCallBackBuilder) 

    //批量发送消息,异步回调
    public <T> void sendAsyncBatchMsg(RocketMqSendParam rocketMqSendParam, List<T> data, AsyncCallBackBuilder asyncCallBackBuilder) {

rocketMqSendParam--发送参数,属性如下

/**
* 待发送主题,只允许设置一个,不允许为空
  */
  private String topic;

/**
* 待发送tag, 可为 空
  */
  private String tag;

/**
* 待发送时间,格式为 yyyy-MM-dd HH:mm:ss
  */
  private String sendTime;

/**
* 延迟时间,单位为秒
  */
  private Long delaySeconds;

data--消息内容 AsyncCallBackBuilder -- 异步回调的方法,属性如下

//成功后的回调
private Consumer<HosMqSendResult> successCallback;

//异常回调
private Consumer<Throwable> failCallback;
# 消费者的特有参数

除了topic和group外,其他的参数通过params属性配置,params中为HosConsumerListenerParams注解 该注解中包含三个参数key, value , type, key--键, value -- 值, type类型-- 包含BOOL,INT,LONG,FLOAT,DOUBLE,STRING,DATE_Y4M2D2,DATE_Y4M2D2_H2M2S2 分别代表布尔,整形,长整形,浮点,双浮点,字符串,yyyy-mm-dd类型的日期,yyyy-mm-dd hh:mm:ss类型的日期,默认为字符串

@HosConsumerAnno(topic = {"test", "test5"}, group = "test-group"
,params = {
  @HosConsumerListenerParams(key = "broadcast", value = "true", type = AnnoTypeEnum.BOOL),
  @HosConsumerListenerParams(key = "idempotent", value = "true", type = AnnoTypeEnum.BOOL),
  @HosConsumerListenerParams(key = "idempotentKey", value = "id"),
  }
)
public class TestConsumers implements ConsumerListener<String>

其含义如下: broadcast--是否为广播模式 idempotent--是否开启幂等性判断 idempotentKey--幂等性的key

# 消息发送前以及接收到消息的增强处理

实现HosMessageDealProcessor接口,并注册为Bean

public interface HosMessageDealProcessor {

    /**
     * 消息处理前置处理
     * @param msg 消息对象(封装后的对象),包含了消息内容等信息
     */
    void processMessageBeforeMsgSet(MqMsg msg);

    /**
     * 获取消息处理参数
     * @param mqSendParam 发送消息参数
     * @param message 消息对象(各种消息队列的原生对象)
     * @return
     */
    void processMessageBeforePull(JSONObject mqSendParam, Object message);

    /**
     * 消息接收到,进行业务逻辑前置处理
     * @param messageExt 消息对象(各种消息队列的原生对象)
     * @param receiveMessage 接收到的消息对象(封装后的对象)包含了消息内容、消息ID等信息
     */
    void processMessageReceive(Object messageExt, HosReceiveMessage receiveMessage);

}

# 自定义扩展

可在已有的基础上自定义相关类扩展 HosMqProducer--生产者的处理类

//初始化
void init(MqProperties config);

<T> HosMqSendResult sendMsg(String topic,T data);

/**
* @param data  待发送数据
* @return
  */
  <T> HosMqSendResult sendMsg(Map<String, String> params,T data);


/**
* @param data  待发送数据
* @return
  */
  <T> void sendMsgAsync(String topic,T data, AsyncCallBackBuilder asyncCallBackBuilder);


/**
* @param data  待发送数据
* @return
  */
  <T> void sendMsgAsync(Map<String, String> params,T data, AsyncCallBackBuilder asyncCallBackBuilder);

HosMqMessageHandler--消费者的处理类

/**
* 初始化
* @param config
  */
  void init(MqProperties config);

/*
* 初始化所有的消费者
  */
  void initConsumerLister(List<ConsumerListener> toDealListenerList);

注入为Bean,例如

@Bean
public HosMqProducer rocketHosMqProducer(@Autowired(required = false) TransactionListener transactionListener,
@Autowired(required = false) RocketMessageDealProvider rocketMessageDealProvider){
    return new RocketHosMqProducer(transactionListener,rocketMessageDealProvider);
}

@Bean
public HosMqMessageHandler rocketHosMqMessageHandler(@Autowired(required = false) RocketMessageDealProvider rocketMessageDealProvider){
    return new RocketHosMqMessageHandler(rocketMessageDealProvider);
}