Skip to the content.

walk-mq 消息组件使用说明

为什么使用

版本及引用

当前版本

1.10(2020/8/12)

引用方式

<!-- 阿里ons消息队列组件支持 -->
<dependency>
    <groupId>org.walkframework.boot</groupId>
    <artifactId>walk-mq-ons-starter</artifactId>
    <version>${最新版本}</version>
</dependency>
<!-- kafka消息队列组件支持 -->
<dependency>
    <groupId>org.walkframework.boot</groupId>
    <artifactId>walk-mq-kafka-starter</artifactId>
    <version>${最新版本}</version>
</dependency>

快速开始

加入配置信息

指定消息类型,参考对应的Properties定义

##ons配置,type声明对应到OnsProperties定义
walk.mq.base.type = ons  
walk.mq.base.servers = ip:port
walk.mq.base.access-key = xx
walk.mq.base.secret-key = xx
walk.mq.base.producer.producer-id = PID_XXXX
walk.mq.base.consumer.consumer-id = CID_XXXX

启用生产者

@Component
@EnableScheduling
public class Producer {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private MqOrderedProducer baseMqOrderedProducer;

    private int counter = 0;

    @Scheduled(fixedRate = 5000)
    public void produce() {
        String message = "this is a test message by order " + (++counter);
        logger.info("[test] start send message [{}]", message);
        baseMqOrderedProducer.send("test", "test", message);
    }
}

启用消费者监听

@Component
@MqListener(topics = "${配置项}", profile="指定使用哪个消息配置") // 声明要监听的topic列表...其他配置详见注解
public class ConsumerListener extends AbstractOrderedMessageListener {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void execute(String message) {
        logger.info("[test]start consume message [{}]", message);
    }
}

更多设置

org.walkframework.boot.mq.kafka.config.KafkaProperties; // kafka配置项
org.walkframework.boot.mq.ons.config.OnsProperties; // ons配置项

<--缓存
回列表
分布式定时任务-->