消息队列
想象一家生意火爆的奶茶店。如果顾客点了单、店员立刻做、做完立刻喊号——顾客挤满吧台,店员手忙脚乱。聪明的老板会在前台放一个取号牌架:顾客点单后拿号走开,店员按号做,做完叫号——大家各跑各的,互不阻塞。
这个”取号牌架”就是消息队列(Message Queue,MQ)。它是个中间人——生产者把消息丢进来就走,消费者按自己的节奏取。简单到不能再简单,却解决了三个让系统头疼的问题:异步、解耦、削峰。
这一章我们看消息队列为什么这么重要,RabbitMQ 和 Kafka 两大主流长什么样,以及 Java 怎么和它们打交道。
一、为什么需要消息队列
1.1 三大作用
异步 —— 用户注册时,要发欢迎邮件、发短信、送积分。如果同步调用,用户要等所有事情做完才返回;用消息队列,主流程只把”注册成功”消息丢进队列就返回,邮件/短信/积分各消费各的——用户感觉飞快。
解耦 —— 订单服务下单后要通知库存、物流、积分服务。如果直接调用,订单服务要依赖三个服务,任一挂了订单就失败。用消息队列,订单只管发消息,谁消费、怎么消费、何时消费,订单不关心——订单与下游服务解耦。
削峰 —— 秒杀活动瞬时 10 万请求,数据库扛不住。请求先丢进消息队列,消费者按数据库能承受的速度慢慢处理——峰值被队列”削平”了。
1.2 代价
消息队列不是免费的午餐:
- 复杂度上升——多一个组件,多一种故障可能。
- 延迟增加——异步意味着不立即可见,用户要等消费者处理完。
- 一致性问题——消息可能丢失、重复、乱序,需要专门处理。
二、RabbitMQ:灵巧的邮差
RabbitMQ 是 Erlang 写的消息中间件,基于 AMQP(Advanced Message Queuing Protocol)协议。它的模型像一个邮政系统——发件人不知道收件人是谁,只把信投到邮筒(Exchange),邮局按规则(Binding)把信投到信箱(Queue),收件人从信箱取信。
2.1 核心概念
Producer → Exchange ──(binding)──→ Queue → Consumer
- Producer(生产者)——发消息的。
- Exchange(交换机)——消息的”分拣中心”,按规则把消息路由到一个或多个队列。
- Queue(队列)——消息存储的地方,FIFO。
- Binding(绑定)——Exchange 和 Queue 之间的关联规则。
- Consumer(消费者)——取消息的。
2.2 四种 Exchange 类型
| 类型 | 路由规则 | 场景 |
|---|---|---|
| Direct | 按 routing key 完全匹配 | 点对点定向 |
| Fanout | 广播到所有绑定的队列 | 群发 |
| Topic | 按 routing key 模式匹配(通配符) | 订阅子集 |
| Headers | 按 header 匹配(不用 routing key) | 复杂条件 |
Direct 例子——routing key 是 order.create,只有绑定了 order.create 的队列能收到。
Fanout 例子——广播日志,所有日志队列都收到同一条消息。
Topic 例子——routing key 是 order.create.paid,绑定模式 order.create.* 能匹配,order.# 也能匹配(* 匹配一段,# 匹配多段)。
2.3 Java 客户端示例
// 生产者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel channel = conn.createChannel()) {
// 声明队列
channel.queueDeclare("hello", false, false, false, null);
// 发消息
String message = "Hello RabbitMQ!";
channel.basicPublish("", "hello", null, message.getBytes());
}
// 消费者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare("hello", false, false, false, null);
DeliverCallback callback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("收到: " + message);
};
channel.basicConsume("hello", true, callback, consumerTag -> {});
2.4 Spring Boot 整合
Spring Boot 把 RabbitMQ 封装得极简——用注解消费,用 RabbitTemplate 发送:
// 生产者
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbit;
public void createOrder(Order order) {
rabbit.convertAndSend("order.exchange", "order.create", order);
}
}
// 消费者
@Component
public class OrderConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("order.queue"),
exchange = @Exchange("order.exchange"),
key = "order.create"
))
public void handle(Order order) {
System.out.println("处理订单: " + order.getId());
}
}
三、Kafka:日志的巨人
Kafka 由 LinkedIn 开源,用 Scala 写。它的设计哲学和 RabbitMQ 完全不同——RabbitMQ 像邮局,Kafka 像日志。消息按顺序追加到一个不可变日志,消费者各自维护”读到哪了”(offset)。
3.1 核心概念
- Topic(主题)——消息分类,类似 RabbitMQ 的 routing key。
- Partition(分区)——一个 Topic 切成多个分区,分布在不同 broker,是 Kafka 水平扩展的关键。
- Offset(偏移量)——消费者在分区中的位置,由消费者自己管理。
- Consumer Group(消费者组)——同组消费者分担消费一个 Topic,每个分区只被组内一个消费者消费。
- Broker(代理)——Kafka 集群中的一个服务器节点。
Topic: order-events (3 partitions)
Partition 0: [msg0, msg1, msg4, msg7, ...]
Partition 1: [msg2, msg3, msg5, msg8, ...]
Partition 2: [msg6, msg9, msg10, ...]
3.2 Kafka vs RabbitMQ
| 维度 | RabbitMQ | Kafka |
|---|---|---|
| 模型 | 队列(推/拉) | 分区日志(拉) |
| 消息保留 | 消费后删除 | 按时间/大小保留 |
| 吞吐量 | 万级 QPS | 百万级 QPS |
| 延迟 | 微秒级 | 毫秒级 |
| 顺序 | 队列内有序 | 分区内有序 |
| 场景 | 业务消息、任务分发 | 日志、流处理、事件溯源 |
简单说——RabbitMQ 适合业务消息(订单、通知、任务),Kafka 适合大数据流(日志、监控、用户行为)。
3.3 Java 客户端示例
// 生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "StringSerializer");
props.put("value.serializer", "StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("order-events", "order-1", "created"));
producer.close();
// 消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-group");
props.put("key.deserializer", "StringDeserializer");
props.put("value.deserializer", "StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("order-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> r : records) {
System.out.println("收到: " + r.value());
}
}
3.4 Spring Boot 整合
// 生产者
@Service
public class EventPublisher {
@Autowired
private KafkaTemplate<String, String> kafka;
public void publish(String event) {
kafka.send("order-events", event);
}
}
// 消费者
@Component
public class EventConsumer {
@KafkaListener(topics = "order-events", groupId = "order-group")
public void handle(String event) {
System.out.println("消费: " + event);
}
}
四、消息可靠性:不丢不重不乱
消息队列的三大难题——消息丢失、消息重复、消息乱序。
4.1 生产者确认:消息不丢
生产者发消息,怎么知道 broker 收到了?
- RabbitMQ——开启 publisher confirm,broker 收到后回 ack。
- Kafka——配置
acks=all,所有副本都确认才算成功。
// Kafka 生产者配置
props.put("acks", "all"); // 所有副本确认
props.put("retries", Integer.MAX_VALUE); // 失败重试
props.put("enable.idempotence", "true"); // 幂等生产
4.2 消费者确认:处理完再确认
消费者取到消息,处理到一半崩了,消息算消费了吗?
- RabbitMQ——手动 ACK,处理完才告诉 broker”我消费完了”。自动 ACK 会在消息一发出就确认,处理失败消息就丢了。
- Kafka——手动提交 offset,处理完再
consumer.commitSync()。
// RabbitMQ 手动 ACK
channel.basicConsume("queue", false, (tag, delivery) -> {
try {
process(delivery); // 处理业务
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 确认
} catch (Exception e) {
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); // 重新入队
}
}, tag -> {});
4.3 死信队列:处理失败的消息
消息消费失败、过期、队列满了,会被丢到死信队列(Dead Letter Queue,DLQ)。死信队列可以被另一个消费者监听,做告警、重试、人工处理。
// RabbitMQ 死信队列: 绑定时指定 x-dead-letter-exchange
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare("main.queue", false, false, false, args);
4.4 幂等:消息不重复处理
网络抖动会让消息重复投递——同一条订单消息消费两次,库存扣两次就糟了。幂等(Idempotent)——同一操作执行多次结果一致。
常用方案:
- 业务唯一键——消费前查”这条消息处理过没”,处理过就跳过。
- 数据库唯一约束——用消息 ID 作为唯一键,重复插入会失败。
- Redis 去重——
SETNX msgId 1,已存在就跳过。
@Transactional
public void consume(OrderMessage msg) {
if (redis.setNx("msg:" + msg.getId(), "1", 86400)) {
// 第一次处理
orderService.process(msg);
} else {
log.info("消息 {} 已处理, 跳过", msg.getId());
}
}
4.5 顺序:分区内有序
Kafka 保证分区内有序——同一 key 的消息进同一分区,消费时按 offset 顺序。跨分区不保证。
RabbitMQ 队列内 FIFO——天然有序。但多消费者分担消费时,处理快慢不同,最终顺序可能乱——需要单消费者或顺序锁。
五、其他常见消息队列
- RocketMQ——阿里开源,Java 写,事务消息支持好,国内电商常用。
- ActiveMQ——老牌,JMS 规范实现,逐渐被取代。
- Pulsar——Yahoo 开源,存算分离,云原生新秀。
六、实战:用 Java SE 模拟消息队列
Piston 在线环境跑不了 RabbitMQ/Kafka 服务器。我们用 Java SE 的 BlockingQueue 模拟一个迷你消息队列——支持 Topic 订阅、多消费者、消息确认、死信处理,演示消息队列的核心机制。
观察重点:消费者处理失败会重试,超过 3 次进死信队列;同 ID 消息被幂等去重,不会重复处理;削峰场景下,生产者瞬时发 20 条,消费者按自己节奏慢慢消费——队列吸收了峰值。
七、消息队列的取舍
选 MQ 时要考虑:
| 需求 | 推荐 |
|---|---|
| 业务消息、任务分发、延迟队列 | RabbitMQ |
| 高吞吐日志、流处理、事件溯源 | Kafka |
| 事务消息、电商订单 | RocketMQ |
| 简单内部异步 | Redis Stream / Java BlockingQueue |
不要为了用 MQ 而用——内部简单异步用 BlockingQueue 就够了,引入 MQ 等于引入运维负担。
八、本章小结
| 知识点 | 要点 |
|---|---|
| 三大作用 | 异步、解耦、削峰 |
| RabbitMQ 模型 | Exchange → Binding → Queue |
| 四种 Exchange | Direct(点对点)/ Fanout(广播)/ Topic(通配)/ Headers |
| Kafka 模型 | Topic → Partition → Offset |
| Kafka vs RabbitMQ | RabbitMQ 业务消息,Kafka 大数据流 |
| 可靠性 | 生产者确认 + 消费者手动 ACK + 死信队列 |
| 幂等 | 业务唯一键 / 数据库唯一约束 / Redis SETNX |
| 顺序 | Kafka 分区内有序,RabbitMQ 队列内有序 |
记忆口诀
- 三作用——异(异步)解(解耦)削(削峰)。
- RabbitMQ 四路由——直(Direct)广(Fanout)题(Topic)头(Headers)。
- Kafka 三件套——主(Topic)分(Partition)偏(Offset)。
- 可靠性三招——生产确认、消费 ACK、死信兜底。
- 幂等三法——唯一键、唯一约束、SETNX 去重。
结语
消息队列让系统从”同步紧耦合”走向”异步松耦合”——生产者和消费者各跑各的,靠队列这座桥传递消息。它换来了吞吐和韧性,代价是延迟、复杂度、一致性的权衡。
下一章我们看微服务——当单体应用大到无法维护时,怎么把它拆成多个独立服务,每个服务各自部署、各自演进。Spring Cloud、Nacos、Gateway、Sentinel,这是云原生时代的全家桶。