强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

微服务拆分精讲 / 第 08 章:消息队列

第 08 章:消息队列

消息队列不只是"异步处理"的工具,它是微服务解耦、削峰填谷、最终一致性的基础设施。


8.1 为什么需要消息队列

8.1.1 同步通信的痛点

  问题1:强耦合
  ──────────────
  订单服务 ──直接调用──▶ 支付服务
  订单服务 ──直接调用──▶ 库存服务
  订单服务 ──直接调用──▶ 通知服务
  (任何一个服务不可用,订单创建就失败)

  问题2:性能瓶颈
  ──────────────
  创建订单 = 调用支付(100ms) + 调用库存(50ms) + 调用通知(200ms)
  总延迟 = 350ms(串行)   用户等待时间长

  问题3:流量洪峰
  ──────────────
  秒杀活动 → 10000 QPS → 直接打到数据库 → 数据库崩溃

8.1.2 消息队列的三大作用

┌──────────────────────────────────────────────────────────────┐
│                  消息队列核心价值                               │
├──────────────────────────────────────────────────────────────┤
│                                                              │
│  1. 异步处理 (Async Processing)                              │
│  ┌──────────┐    ┌─────┐    ┌──────────┐                   │
│  │ 订单服务  │───▶│ MQ  │───▶│ 通知服务  │                   │
│  └──────────┘    └─────┘    └──────────┘                   │
│  订单创建后立即返回,通知异步发送                              │
│                                                              │
│  2. 应用解耦 (Decoupling)                                    │
│  ┌──────────┐    ┌─────┐    ┌──────────┐                   │
│  │ 订单服务  │───▶│ MQ  │───▶│ 支付服务  │                   │
│  └──────────┘    │     │    └──────────┘                   │
│                  │     │───▶ ┌──────────┐                   │
│                  │     │    │ 库存服务  │                   │
│                  │     │    └──────────┘                   │
│                  └─────┘                                   │
│  新增消费者不需要修改生产者                                   │
│                                                              │
│  3. 削峰填谷 (Load Leveling)                                 │
│  ┌──────────┐    ┌─────────────────┐    ┌──────────┐      │
│  │ 10000QPS │───▶│ MQ (缓冲 100万) │───▶│ 1000QPS  │      │
│  │ 峰值流量  │    │ 慢慢消费         │    │ 稳定处理  │      │
│  └──────────┘    └─────────────────┘    └──────────┘      │
└──────────────────────────────────────────────────────────────┘

8.2 消息队列选型

8.2.1 主流 MQ 对比

维度KafkaRabbitMQRocketMQPulsar
开发语言Java/ScalaErlangJavaJava
吞吐量极高 (百万级/秒)中 (万级/秒)高 (十万级/秒)极高
延迟ms 级μs 级ms 级ms 级
消息模型发布-订阅 (Log)多种 (Queue/Topic)发布-订阅发布-订阅
消息顺序分区内有序队列内有序队列内有序分区内有序
消息回溯✅ 支持❌ 不支持✅ 支持✅ 支持
延迟消息❌ 需插件✅ 原生支持✅ 原生支持✅ 原生支持
事务消息✅ 支持✅ 支持✅ 支持✅ 支持
社区活跃度极高
适用场景大数据/日志/事件流业务消息/RPC电商/金融多租户/云原生

8.2.2 选型建议

  需求分析 → 选型

  大数据/日志/流处理    → Kafka
  复杂路由/企业集成     → RabbitMQ
  电商/金融/事务消息    → RocketMQ
  云原生/多租户        → Pulsar

  简单决策:
  ┌────────────────────────────────────────┐
  │ 吞吐量要求 > 10万/秒?                   │
  │   是 → Kafka / Pulsar                   │
  │   否 → 需要复杂路由?                     │
  │         是 → RabbitMQ                    │
  │         否 → 金融场景?                   │
  │               是 → RocketMQ             │
  │               否 → Kafka (通用选择)      │
  └────────────────────────────────────────┘

8.3 Kafka 深入

8.3.1 Kafka 架构

┌──────────────────────────────────────────────────────────────┐
│                     Kafka 架构                                │
├──────────────────────────────────────────────────────────────┤
│                                                              │
│  Producer (生产者)                                           │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐                     │
│  │ 订单服务  │  │ 用户服务  │  │ 商品服务  │                     │
│  └────┬────┘  └────┬────┘  └────┬────┘                     │
│       └────────────┼────────────┘                           │
│                    ▼                                         │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              Kafka Cluster                           │   │
│  │                                                     │   │
│  │  ┌──────────────────────────────────────────────┐  │   │
│  │  │  Topic: order-events                         │  │   │
│  │  │  ┌──────────┬──────────┬──────────┐         │  │   │
│  │  │  │Partition0│Partition1│Partition2│         │  │   │
│  │  │  │ msg,msg  │ msg,msg  │ msg,msg  │         │  │   │
│  │  │  └──────────┴──────────┴──────────┘         │  │   │
│  │  └──────────────────────────────────────────────┘  │   │
│  │                                                     │   │
│  │  ┌──────────────┐  ┌──────────────┐                │   │
│  │  │  Broker 1    │  │  Broker 2    │  ...           │   │
│  │  └──────────────┘  └──────────────┘                │   │
│  │                                                     │   │
│  │  ┌──────────────┐                                  │   │
│  │  │ ZooKeeper /  │                                  │   │
│  │  │ KRaft (元数据)│                                  │   │
│  │  └──────────────┘                                  │   │
│  └─────────────────────────────────────────────────────┘   │
│                    │                                         │
│       ┌────────────┼────────────┐                           │
│       ▼            ▼            ▼                           │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐                     │
│  │ 支付服务  │  │ 库存服务  │  │ 通知服务  │  Consumer Group   │
│  └─────────┘  └─────────┘  └─────────┘                     │
└──────────────────────────────────────────────────────────────┘

8.3.2 核心概念

概念说明
Topic消息的逻辑分类(如 order-events
PartitionTopic 的物理分区,保证分区内有序
BrokerKafka 服务器节点
Producer消息生产者
Consumer消息消费者
Consumer Group消费者组,组内每个分区只被一个消费者消费
Offset消费者在分区中的消费位置
Replication分区副本,保证高可用

8.3.3 Kafka 生产者配置

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 可靠性配置
props.put("acks", "all");                    // 所有副本确认
props.put("retries", 3);                     // 重试次数
props.put("enable.idempotence", true);        // 幂等生产者

// 性能配置
props.put("batch.size", 16384);               // 批量大小 16KB
props.put("linger.ms", 10);                   // 等待时间 10ms
props.put("compression.type", "lz4");         // 压缩算法

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>(
    "order-events",
    orderId,    // key(同一 key 的消息进入同一分区)
    jsonPayload // value
);

producer.send(record, (metadata, exception) -> {
    if (exception == null) {
        log.info("消息发送成功: partition={}, offset={}",
            metadata.partition(), metadata.offset());
    } else {
        log.error("消息发送失败", exception);
    }
});

8.3.4 消费者组与分区再平衡

  消费者组 (Consumer Group)
  ─────────────────────────

  Topic: order-events (3 个分区)

  ┌──────────┐  ┌──────────┐  ┌──────────┐
  │Partition0│  │Partition1│  │Partition2│
  └─────┬────┘  └─────┬────┘  └─────┬────┘
        │             │             │
        ▼             ▼             ▼
  ┌──────────┐  ┌──────────┐  ┌──────────┐
  │Consumer A│  │Consumer B│  │Consumer C│
  └──────────┘  └──────────┘  └──────────┘
  (同一 Consumer Group)

  当 Consumer C 挂掉时:
  ┌──────────┐  ┌──────────┐  ┌──────────┐
  │Partition0│  │Partition1│  │Partition2│
  └─────┬────┘  └─────┬────┘  └─────┬────┘
        │             │             │
        ▼             ▼             ▼
  ┌──────────┐  ┌──────────┐
  │Consumer A│  │Consumer B│  ← Consumer C 挂掉
  │ P0 + P2  │  │ P1       │    P2 重新分配给 A
  └──────────┘  └──────────┘

8.4 RabbitMQ

8.4.1 RabbitMQ 模型

┌──────────────────────────────────────────────────────────┐
│                   RabbitMQ 消息模型                        │
├──────────────────────────────────────────────────────────┤
│                                                          │
│  Producer ──▶ Exchange ──binding──▶ Queue ──▶ Consumer   │
│                                                          │
│  Exchange 类型:                                         │
│                                                          │
│  1. Direct (直连)                                        │
│     Exchange ──routing_key="order"──▶ Queue A            │
│                                                          │
│  2. Fanout (广播)                                        │
│     Exchange ─────────────────────▶ Queue A              │
│     Exchange ─────────────────────▶ Queue B              │
│     Exchange ─────────────────────▶ Queue C              │
│                                                          │
│  3. Topic (主题)                                         │
│     Exchange ──"order.*"──────────▶ Queue A              │
│     Exchange ──"order.created"────▶ Queue B              │
│     Exchange ──"#.payment"────────▶ Queue C              │
│                                                          │
│  4. Headers (头部)                                       │
│     Exchange ──按 Header 匹配────▶ Queue                 │
└──────────────────────────────────────────────────────────┘

8.4.2 RabbitMQ vs Kafka 选型

场景KafkaRabbitMQ
日志收集/流处理✅ 首选
大数据管道✅ 首选
复杂路由规则⚠️ 需要设计✅ 原生支持
请求-应答模式⚠️ 不原生✅ 原生支持
延迟队列⚠️ 需插件✅ 原生支持
消息确认机制✅ Offset✅ ACK
低延迟(μs 级)❌ ms 级✅ μs 级

8.5 事件驱动架构(EDA)

8.5.1 事件驱动的核心思想

  命令式(Imperative)          事件驱动(Event-Driven)
  ───────────────────          ──────────────────────

  订单服务                       订单服务
  "你应该扣库存"                 "订单已创建"(不知道谁关心)
      │                              │
      ▼                              ▼
  库存服务                       [事件总线]
  (强耦合,订单服务知道库存服务)      ├──▶ 库存服务(自己订阅)
                                   ├──▶ 支付服务(自己订阅)
                                   └──▶ 通知服务(自己订阅)
                                   (松耦合,生产者不知道消费者)

8.5.2 事件设计原则

原则说明示例
事件自描述事件包含足够的上下文OrderCreated{orderId, items, amount, customer}
事件不可变事件一旦发布不能修改使用过去时命名
事件可重放新消费者可以从头消费Kafka 保留期
事件版本化事件结构变更需要版本管理OrderCreated_v2
携带因果 ID便于追踪事件链correlationId

8.5.3 事件 Schema 管理

// Schema Registry 中注册的事件 Schema
{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.example.events",
  "version": "2",
  "fields": [
    {"name": "eventId", "type": "string"},
    {"name": "eventType", "type": "string", "default": "OrderCreated"},
    {"name": "timestamp", "type": "long"},
    {"name": "orderId", "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "totalAmount", "type": "double"},
    {"name": "currency", "type": "string", "default": "CNY"},
    {
      "name": "items",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "OrderItem",
          "fields": [
            {"name": "productId", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "price", "type": "double"}
          ]
        }
      }
    }
  ]
}

8.6 最终一致性模式

8.6.1 事务性发件箱(Transactional Outbox)

问题:如何保证"写数据库"和"发消息"同时成功?

  ❌ 不可靠的方式:
  写数据库 ──成功──▶ 发消息 ──失败──▶ 数据库已写,消息没发(不一致)

  ✅ 事务性发件箱:
  ┌─────────────────────────────────────────────────┐
  │  数据库事务                                       │
  │  ┌────────────────────────────────────────────┐ │
  │  │  INSERT INTO orders ...                    │ │
  │  │  INSERT INTO outbox (event_type, payload)  │ │ ← 同一个事务
  │  └────────────────────────────────────────────┘ │
  └─────────────────────────────────────────────────┘
                          │
                          ▼
  ┌──────────────────────────────────────────────────┐
  │  Outbox Poller / CDC                             │
  │  读取 outbox 表 → 发送到消息队列                   │
  └──────────────────────────────────────────────────┘
                          │
                          ▼
  ┌──────────────────────────────────────────────────┐
  │  消息队列 (Kafka)                                 │
  └──────────────────────────────────────────────────┘

8.6.2 实现代码示例

@Transactional
public Order createOrder(CreateOrderCommand command) {
    // 1. 创建订单(写入 orders 表)
    Order order = orderRepository.save(new Order(command));

    // 2. 将事件写入 outbox 表(同一个事务)
    OutboxEvent event = new OutboxEvent(
        "OrderCreated",
        orderId,
        objectMapper.writeValueAsString(new OrderCreatedEvent(order))
    );
    outboxRepository.save(event);

    // 事务提交后,outbox poller 会读取并发送到 Kafka
    return order;
}

8.6.3 消费者幂等处理

消息队列至少投递一次(At-Least-Once),消费者必须幂等:

public void handleOrderCreated(OrderCreatedEvent event) {
    // 幂等检查:根据消息 ID 判断是否已处理
    if (messageProcessedRepository.exists(event.getEventId())) {
        log.info("消息已处理,跳过: {}", event.getEventId());
        return;
    }

    try {
        // 业务处理
        inventoryService.deductStock(event.getItems());

        // 记录已处理的消息
        messageProcessedRepository.save(new ProcessedMessage(event.getEventId()));
    } catch (Exception e) {
        log.error("处理失败,等待重试", e);
        throw e; // 抛出异常,消息会重新投递
    }
}

8.7 业务场景:电商订单事件流

  ┌──────────────────────────────────────────────────────────────┐
  │              订单创建完整事件流                                │
  ├──────────────────────────────────────────────────────────────┤
  │                                                              │
  │  1. 用户下单                                                 │
  │  ┌──────────┐                                               │
  │  │ 订单服务  │  写入订单 + 写入 outbox                        │
  │  └────┬─────┘                                               │
  │       │ OrderCreated                                        │
  │       ▼                                                     │
  │  ┌─────────────────────────────────────────────────────┐   │
  │  │                Kafka: order-events                   │   │
  │  └──┬──────────┬──────────┬──────────┬────────────────┘   │
  │     │          │          │          │                     │
  │     ▼          ▼          ▼          ▼                     │
  │  ┌──────┐  ┌──────┐  ┌──────┐  ┌──────┐                 │
  │  │支付  │  │库存  │  │通知  │  │积分  │                 │
  │  │服务  │  │服务  │  │服务  │  │服务  │                 │
  │  └──┬───┘  └──┬───┘  └──┬───┘  └──┬───┘                 │
  │     │         │         │         │                      │
  │     │ PaymentCompleted  │         │                      │
  │     ▼         │         │         │                      │
  │  ┌────────┐   │         │         │                      │
  │  │ order- │   │         │         │                      │
  │  │ events │   │         │         │                      │
  │  └──┬─────┘   │         │         │                      │
  │     │         ▼         ▼         │                      │
  │     │    ┌──────┐  ┌──────┐      │                      │
  │     │    │库存  │  │订单  │      │                      │
  │     │    │扣减  │  │更新  │      │                      │
  │     │    └──────┘  └──────┘      │                      │
  │     │                             │                      │
  │     │ OrderShipped                │                      │
  │     ▼                             ▼                      │
  │  ┌──────┐                   ┌──────┐                    │
  │  │通知  │                   │积分  │                    │
  │  │服务  │                   │增加  │                    │
  │  └──────┘                   └──────┘                    │
  └──────────────────────────────────────────────────────────┘

⚠️ 注意事项

  1. 消息不是万能的——不要把所有通信都改成异步,有些场景需要同步响应
  2. 消费者必须幂等——At-Least-Once 语义下消息可能重复
  3. 消息顺序性——Kafka 只保证分区内有序,需要有序的消息用相同 key
  4. 死信队列——消费多次失败的消息要进入死信队列,不能无限重试
  5. 消息体大小——大消息(> 1MB)考虑存外部存储,消息只传引用
  6. 监控消费延迟——Consumer Lag 是重要的运维指标

📖 扩展阅读

  1. Apache Kafka Documentation (kafka.apache.org) — Kafka 官方文档
  2. RabbitMQ Documentation (rabbitmq.com) — RabbitMQ 官方文档
  3. Designing Event-Driven Systems — Ben Stopford (Confluent) — 免费电子书
  4. Microservices Patterns Chapter 6 — Chris Richardson — 事务性发件箱模式
  5. Building Event-Driven Microservices — Adam Bellemare — 事件驱动架构实践

本章小结

要点说明
MQ 作用异步处理、应用解耦、削峰填谷
Kafka vs RabbitMQKafka 适合高吞吐/流处理,RabbitMQ 适合复杂路由
事件设计自描述、不可变、可重放、版本化
最终一致性事务性发件箱 + 消费者幂等
关键指标消费延迟 (Lag)、消息积压、消费失败率

📌 下一章第 09 章:服务网格 — 用服务网格(Istio/Linkerd)管理服务间通信。