1、什么是分布式消息队列?
- 消息队列 是指利用 高效可靠 的 消息传递机制 进行与平台无关的 数据交流,并基于 数据通信 来进行分布式系统的集成。
- 通过提供 消息传递 和 消息排队 模型,它可以在 分布式环境 下提供 应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步 等等功能,其作为 分布式系统架构 中的一个重要组件,有着举足轻重的地位。
2、消息队列--历程与区别
2.1、发布历程
下图根据时间线展示了不同时间点产生的消息队列产品,主要的产品有:
2.2、各个中间件的区别
3、中间件--详解
3.1、Apache Kafka
- 简介
Apache Kafka 是一个 分布式消息发布订阅 系统。它最初由 LinkedIn 公司基于独特的设计实现为一个 分布式的日志提交系统 (a distributed commit log),之后成为 Apache 项目的一部分。Kafka 性能高效、可扩展良好 并且 可持久化。它的 分区特性,可复制 和 可容错 都是其不错的特性。 - 架构图
基本术语
- Producer:消息生产者。
- Topic:Topic是个抽象的虚拟概念,一个集群可以有多个Topic,作为一类消息的标识。
- Partition:Partition是个物理概念,一个Topic对应一个或多个Partition。
- Replicas:一个Partition有多个Replicas副本。
- Consumer:消息读取者。消费者订阅主题,并按照一定顺序读取消息。
- Offset:偏移量是一种元数据,是不断递增的整数。
- Broker:独立的Kafka服务器。
主要特性
- 快速持久化:可以在 O(1) 的系统开销下进行 消息持久化;
- 高吞吐:在一台普通的服务器上既可以达到 10W/s 的 吞吐速率;
- 完全的分布式系统:Broker、Producer 和 Consumer 都原生自动支持 分布式,自动实现 负载均衡;
- 支持 同步 和 异步 复制两种 高可用机制;
- 支持 数据批量发送 和 拉取;
- 零拷贝技术(zero-copy):减少 IO 操作步骤,提高 系统吞吐量;
- 数据迁移、扩容 对用户透明;
- 无需停机 即可扩展机器;
- 其他特性:丰富的 消息拉取模型、高效 订阅者水平扩展、实时的 消息订阅、亿级的 消息堆积能力、定期删除机制;
优点
- 客户端语言丰富:支持 Java、.Net、PHP、Ruby、Python、Go 等多种语言;
- 高性能:单机写入 TPS 约在 100 万条/秒,消息大小 10 个字节;
- 提供 完全分布式架构,并有 replica 机制,拥有较高的 可用性 和 可靠性,理论上支持 消息无限堆积;
- 支持批量操作;
- 消费者 采用 Pull方式 获取消息。消息有序,通过控制 能够保证所有消息被消费且仅被消费一次;
- 有优秀的第三方 Kafka Web 管理界面 Kafka-Manager;
- 在 日志领域 比较成熟,被多家公司和多个开源项目使用。
缺点
- Kafka 单机超过64个 队列/分区 时,Load 时会发生明显的飙高现象。队列 越多,负载 越高,发送消息 响应时间变长;
- 使用 短轮询方式,实时性 取决于 轮询间隔时间;
- 消费失败 不支持重试;
- 支持 消息顺序,但是 一台代理宕机 后,就会产生 消息乱序;
- 社区更新较慢。
使用场景
- 日志收集:大量的日志消息先写入kafka,数据服务通过消费kafka消息将数据落地;
- 消息系统:解耦生产者和消费者、缓存消息等;
- 用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动
- 运营指标:记录运营、监控数据,包括收集各种分布式应用的数据,生产各种操作的集中反馈。
- 流式处理:比如spark streaming
3.2、RabbitMQ
- 简介
RabbitMQ 是实现了 高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件(英语:Message-oriented middleware))。RabbitMQ服务器是用 Erlang 语言 编写的,而 群集和故障转移 是构建在开放电信平台框架上的。所有主要的编程语言均有与 代理接口通讯 的客户端函式库。 - 架构图
基本术语
- Broker:接收客户端链接实体,实现AMQP消息队列和路由功能;
- Virtual Host:是一个虚拟概念,权限控制的最小单位。
- Exchange:接收消息生产者的消息并将消息转发到队列。
- Message Queue:消息队列,存储为被消费的消息;
- Message:由Header和Body组成,Header是生产者添加的各种属性;
- Binding:Binding连接起了Exchange和Message Queue。
- Connection:在Broker和客户端之间的TCP连接;
- Channel:信道。Broker和客户端只有tcp连接是不能发送消息的,必须创建信道
- Command:AMQP命令,客户端通过Command来完成和AMQP服务器的交互。
优点
- 基于AMQP协议:除了Qpid,RabbitMQ 是唯一一个 实现了AMQP标准的消息服务器;
- 健壮、稳定、易用;
- 社区活跃,文档完善;
- 支持定时消息;
- 可插入的身份验证,授权,支持TLS和LDAP;
- 支持根据消息标识查询消息,也支持根据消息内容查询消息。
缺点
- erlang 开发源码难懂,不利于做二次开发和维护;
- 接口和协议复杂,学习和维护成本较高。
3.3、Apache ActiveMQ
- 简介
ActiveMQ 是由 Apache 出品,ActiveMQ 是一个完全支持 JMS 1.1 和 J2EE 1.4 规范的 JMS Provider 实现。它非常快速,支持 多种语言的客户端 和 协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。 - 架构图
主要特性
- 服从JMS规范:JMS 规范提供了良好的标准和保证,包括:同步 或 异步 的消息分发,一次和仅一次的消息分发,消息接收 和 订阅 等等。
- 连接灵活性:ActiveMQ 提供了广泛的 连接协议,支持的协议有:HTTP/S,IP多播,SSL,TCP,UDP 等等。
- 支持的协议种类多:OpenWire、STOMP、REST、XMPP、AMQP;
- 持久化插件和安全插件:ActiveMQ 提供了 多种持久化 选择。
- 支持的客户端语言种类多:除了 Java 之外,还有:C/C++,.NET,Perl,PHP,Python,Ruby;
- 代理集群:多个 ActiveMQ代理 可以组成一个 集群 来提供服务;
- 异常简单的管理:ActiveMQ 是以开发者思维被设计的。
优点
- 跨平台 (JAVA 编写与平台无关,ActiveMQ 几乎可以运行在任何的 JVM 上);
- 可以用 JDBC:可以将 数据持久化 到数据库。虽然使用 JDBC 会降低 ActiveMQ 的性能,但是数据库一直都是开发人员最熟悉的存储介质;
- 支持 JMS 规范:支持 JMS 规范提供的 统一接口;
- 支持 自动重连 和 错误重试机制;
- 有安全机制:支持基于 shiro,jaas 等多种 安全配置机制,可以对 Queue/Topic 进行 认证和授权;
- 监控完善:拥有完善的 监控,包括 Web Console,JMX,Shell 命令行,Jolokia 的 RESTful API;
- 界面友善:提供的 Web Console 可以满足大部分情况,还有很多 第三方的组件 可以使用,比如 hawtio;
缺点
- 社区活跃度不及 RabbitMQ 高;
- 根据其他用户反馈,会出莫名其妙的问题,会 丢失消息;
- 目前重心放到 activemq 6.0 产品 Apollo,对 5.x 的维护较少;
- 不适合用于 上千个队列 的应用场景;
3.4、RocketMQ
- 简介
RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ 是2012年阿里巴巴开源的第三代分布式消息中间件。 - 架构图
基本术语
- Topic:一个Topic可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的Topic发送消息。
- Tag:消息二级类型,可以为用户提供额外的灵活度,一条消息可以没有tag;
- Producer:消息生产者;
- Broker:存储消息,以Topic为纬度轻量级的队列;
- Consumer:消息消费者,负责接收并消费消息;
- MessageQueue:消息的物理管理单位,一个Topic可以有多个Queue,Queue的引入实现了水平扩展的能力;
- NameServer:负责对原数据的管理,包括Topic和路由信息,每个NameServer之间是没有通信的;
- Group:一个组可以订阅多个Topic,ProducerGroup、ConsumerGroup分别是一类生产者和一类消费者;
- Offset:通过Offset访问存储单元,RocketMQ中所有消息都是持久化的,且存储单元定长。
- Consumer:支持PUSH和PULL两种消费模式,支持集群消费和广播消费。
优点
- 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型;
- 顺序队列:在一个队列中可靠的先进先出(FIFO)和严格的顺序传递;
- 支持拉(pull)和推(push)两种消息模式;
- 单一队列百万消息的堆积能力;
- 支持多种消息协议,如 JMS、MQTT 等;
- 分布式横向扩展架构
- 满足至少一次消息传递语义;
- 提供丰富的Dashboard,包含配置、指标和监控等。
缺点
- 支持的客户端语言不多,目前是java及c++,其中c++不成熟
- 社区活跃度一般
- 延时消息:开源版不支持任意时间精度,仅支持特定的level
3.5、Apache Pulsar
- 简介
Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。Pulsar 是一个 pub-sub (发布-订阅)模型的消息队列系统。 - 架构图
基本术语
- Property:代表租户,每个property都可以代表一个团队、一个功能、一个产品线。
- Namespace:Pulsar的基本管理单元,在namaspace级别可设置权限、消息TTL、Retention 策略等。
- Producer:数据生产方,负责创建消息并将消息投递到 Pulsar 中;
- Consumer:数据消费方,连接到 Pulsar接收消息并进行相应的处理;
- Broker:无状态Proxy服务,负责接收消息、传递消息、集群负载均衡等操作;
- BookKeeper:有状态,负责持久化存储消息。
- ZooKeeper:存储 Pulsar 、 BookKeeper 的元数据,集群配置等信息,负责集群间的协调、服务发现等;
- Topic:用作从producer到consumer传输消息。
- Ledger:即Segment,Pulsar底层数据以Ledger的形式存储在BookKeeper上。
- Fragment : 每个 Ledger 由若干 Fragment 组成。
优点
- 灵活扩容
- 无缝故障恢复
- 支持延时消息
- 内置的复制功能,用于跨地域复制如灾备
- 支持两种消费模型:流(独享模式)、队列(共享模式)
- 缺点
暂无
3.6、综上所述
- Kafka 在于 分布式架构,
- RabbitMQ 基于 AMQP 协议 来实现,
- RocketMQ 的思路来源于 Kafka,改成了 主从结构,在 事务性 和 可靠性 方面做了优化。
- 广泛来说,电商、金融 等对 事务一致性 要求很高的,可以考虑 RabbitMQ 和 RocketMQ,
- 对 性能要求高 的可考虑 Kafka。
4、举例说明
由于仅仅是demo,默认安装部署都会,就不再提了。下面只展示 Kafka 和 RocketMQ 的,其他都大同小异。
4.1、Kafka Demo
在用户添加成功的时候发送消息,将添加的参数作为消息body发送,控制层防止重复提交添加;监听器里接收消息,使用redisson分布式锁防止重复消费,使用ack手动确认,防止消息丢失;将接收到的消息存到刚插入的那行数据中。
4.1.1、部分效果
- 控制台信息
- 数据库
4.1.2、配置文件
- pom.xml
<dependencies>
<!--kafka的依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--分布式锁-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.23.1</version>
</dependency>
<!--spring boot 测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--spring web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--hutool 工具类-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.22</version>
</dependency>
<!--mysql 驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!--mybatis plus-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.3.2</version>
</dependency>
<!--redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
- application.yml
server:
port: 9200
spring:
application:
name: llh-contract # 注册到eureka上面的应用名称
# Kafka配置
kafka:
bootstrap-servers: 192.168.126.137:9092 # 指定kafka地址,可以多个
# bootstrap-servers: 192.168.126.134:9092,192.168.126.135:9092,192.168.126.136:9092 # 指定kafka地址,可以多个
consumer:
enable-auto-commit: false # 关闭自动提交
group-id: test-consumer-group
auto-offset-reset: latest # 从当前时间开始接收
auto-commit-interval: 100
# 指定消息key和value的编码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# 0表示不进行消息接收是否成功的确认,1表示当Leader接收成功时确认,-1表示Leader和Follower都接收成功时确认
acks: 1
# 重试机制
retries: 1
# 每次批量发送消息的数量
batch-size: 16384
buffer-memory: 3355443
# 指定消息key和value的编码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# redis配置
redis:
host: 127.0.0.1
port: 6379
# 数据源
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/activiti?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&nullCatalogMeansCurrent=true
username: root
password: root
# mybatis plus配置
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# 日志管理
logging:
level:
root: info
- KafkaConfig.java
package com.llh.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;
/**
* User: lilinhan
* DateTime: 2023/11/14 16:26
*/
@Component
@Configuration
public class KafkaConfig {
@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory ackContainerFactory(ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(consumerFactory);
return factory;
}
}
4.1.3、部分代码
- 控制层:UserController.java
package com.llh.controller;
import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.json.JSONUtil;
import com.llh.domain.User;
import com.llh.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
/**
* User: lilinhan
* DateTime: 2023/11/22 16:26
*/
@RestController
@RequestMapping("/user")
public class UserController {
@Autowired
RedisTemplate redisTemplate;
@Autowired
UserService userService;
@Autowired
KafkaTemplate kafkaTemplate;
@RequestMapping("/save")
public String save(User user){
String jsonStr = JSONUtil.toJsonStr(user);
String md5 = DigestUtil.md5Hex(jsonStr);
// 重复提交
Boolean b = redisTemplate.opsForValue().setIfAbsent(md5, md5, 1, TimeUnit.MINUTES);
if(!b){
return "一分钟禁止重复提交";
}
userService.save(user);
// 获取自增的id
String id = user.getId()+ "";
// 使用kafka发送消息并处理发送结果
// send参数的key和value值必须要一致!!
kafkaTemplate.send("test",id,jsonStr).addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
System.err.println("Kafka----发送失败");
}
@Override
public void onSuccess(Object result) {
System.err.println("Kafka----发送成功:");
}
});
return "添加成功";
}
}
- 监听器:MyListener.java
package com.llh.listener;
import cn.hutool.json.JSON;
import cn.hutool.json.JSONUtil;
import com.llh.domain.User;
import com.llh.service.UserService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.io.Serializable;
/**
* User: lilinhan
* DateTime: 2023/11/22 16:41
*/
@Component
public class MyListener {
@Autowired
RedissonClient redissonClient;
@Autowired
UserService userService;
// 使用redisson分布式锁和ack确认消息
@KafkaListener(topics = "test",containerFactory = "ackContainerFactory")
public void msg(ConsumerRecord consumerRecord, Acknowledgment ack){
Object key = consumerRecord.key();
// 使用redisson获取锁
RLock lock = redissonClient.getLock((String) key);
// 如果成功获取到锁
if(lock.tryLock()){
try {
JSON parse = JSONUtil.parse(consumerRecord.value());
System.err.println("收到的消息----"+parse);
// 根据自增id查询数据
User userDB = userService.getById((Serializable) key);
System.err.println("刚刚添加的数据----"+userDB);
userDB.setKafkaMsg(JSONUtil.toJsonStr(parse));
// 把接收到的json字符串存入mysql
userService.updateById(userDB);
// 手动确认消息
ack.acknowledge();
}catch (Exception e){
throw new RuntimeException(e);
}finally {
// 释放锁
if(lock!=null&&lock.isHeldByCurrentThread()){
lock.unlock();
}
}
}else {
System.err.println("重复消费!!");
}
}
}
4.2、RocketMQ Demo
同步消息、异步消息、延迟消息、同步有序以及队列消息示例
4.2.1、部分效果
- 控制台打印
- 可视化界面
- 订阅主题
- 消费者
4.2.2、配置文件
- pom.xml
<dependencies>
<!-- 单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<!--spring web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rocketmq 依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
- application.yml
server:
port: 9203
spring:
application:
name: llh-mq # 注册导eureka上面的应用名称
# mq的配置
rocketmq:
name-server: 127.0.0.1:9876 # 服务地址
# 生产者
producer:
group: test #生产者组
retry-times-when-send-failed: 2 # 同步发送
retry-times-when-send-async-failed: 2 # 异步发送
# 消费者
consumer:
topic: top
pull-batch-size: 10 # 拉取数量
group: test
4.2.3、部分代码
- 控制层:ProducerController.java
package com.llh.controller;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* User: lilinhan
* DateTime: 2023/10/27 8:48
*/
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
RocketMQTemplate rocketMQTemplate;
@RequestMapping("/put")
public void put(){
// 构建消息对象
Message<String> message = MessageBuilder.withPayload("hello2").build();
SendResult result = rocketMQTemplate.syncSend("sync", message, 500, 4);
if(SendStatus.SEND_OK.equals(result.getSendStatus())){
System.err.println("延迟消息发送成功");
}
}
// 异步发送
@RequestMapping("/async")
public void async(){
// 构建消息对象
Message<String> message = MessageBuilder.withPayload("hello").build();
rocketMQTemplate.asyncSend("async", message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
SendStatus sendStatus = sendResult.getSendStatus();
// 确认消息
if(SendStatus.SEND_OK.equals(sendStatus)){
System.err.println("发送成功!消息----------"+sendStatus.name());
}else {
System.err.println("发送成功!消息----------"+sendStatus.name());
}
}
@Override
public void onException(Throwable throwable) {
System.out.println("mq连接失败");
}
});
}
// 同步发送
@RequestMapping("/sync")
public void sync(){
SendResult result = rocketMQTemplate.syncSend("sync", "hello1");
if(SendStatus.SEND_OK.equals(result.getSendStatus())){
System.err.println("发送成功");
}
rocketMQTemplate.syncSend("sync","hello2");
rocketMQTemplate.syncSend("sync","hello3");
rocketMQTemplate.syncSend("sync","hello4");
System.err.println("发送成功");
}
// 同步发送 有序消息
@RequestMapping("/sync2")
public void async2(){
rocketMQTemplate.syncSendOrderly("sync2", "订单1创建", "order1");
rocketMQTemplate.syncSendOrderly("sync2", "订单1支付", "order1");
rocketMQTemplate.syncSendOrderly("sync2", "订单1完成", "order1");
rocketMQTemplate.syncSendOrderly("sync2", "订单2创建", "order2");
rocketMQTemplate.syncSendOrderly("sync2", "订单2支付", "order2");
rocketMQTemplate.syncSendOrderly("sync2", "订单2完成", "order2");
}
}
- 实现RocketMQListener接口的监听器
package com.llh.mq;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* User: lilinhan
* DateTime: 2023/10/26 16:58
*/
@Component
@RocketMQMessageListener(topic = "sync",consumerGroup = "sdsd",consumeMode = ConsumeMode.ORDERLY)
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.err.println("接受的消息:"+s);
}
}
- 实现MessageListenerOrderly接口的监听器
package com.llh.mq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* User: lilinhan
* DateTime: 2023/10/27 11:26
*/
@Component
public class TestListener implements MessageListenerOrderly {
// 初始化mq
@PostConstruct
public void init() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setNamesrvAddr("127.0.0.1:9876"); // nameserv地址
consumer.subscribe("sync2","*"); // 订阅的主题
consumer.setConsumerGroup("dsf"); //消费者的组
consumer.setInstanceName("topname");
consumer.registerMessageListener(this); // 监听处理的对象
consumer.start();
}
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
// 判断消息是否为空
if(CollectionUtils.isEmpty(list)){
return ConsumeOrderlyStatus.SUCCESS;
}
// 遍历消息队列
for (MessageExt messageExt : list) {
int queueId = messageExt.getQueueId();
String body = new String(messageExt.getBody());
System.err.println("这个队列:"+queueId+"-----消息:"+body+"-------"+Thread.currentThread().getId()+":"+Thread.currentThread().getName());
}
// 返回成功
return ConsumeOrderlyStatus.SUCCESS;
}
}
111 comments
这篇文章如同一首动人的乐章,触动了读者内心深处的柔软。
文章的叙述风格独特,用词精准,让人回味无穷。
独特的构思和新颖的观点,让这篇文章在众多作品中脱颖而出。
部分语句稍显冗长,可精简以增强节奏感。
平淡中见真章,质朴处显功力。
观点新颖,见解独到,发人深省。
意象选取精妙,营造出空灵意境。
场景转换稍显突兀,可增加过渡描写。
对权力结构的解构充满勇气与智慧。
文字流畅如丝,语言优美动人,读来令人心旷神怡。
文字流畅如丝,语言优美动人,读来令人心旷神怡。
对传统与现代的融合思考颇具启发性。
作者的布局谋篇匠心独运,让读者在阅读中享受到了思维的乐趣。
选材新颖独特,通过细节描写赋予主题鲜活生命力。
文字流畅如丝,语言优美动人,读来令人心旷神怡。
作者的布局谋篇匠心独运,让读者在阅读中享受到了思维的乐趣。
文字流畅如丝,语言优美动人,读来令人心旷神怡。
?语言类评语?
终极关怀的缺失可尝试补充升华。
?内容类评语?