麦瓣健康麦瓣健康
首页
  • APP产品开发方案
  • 商业调查报告
  • 后端技术架构
  • Docker Compose部署指南
  • 技师端-功能模块与微服务对应关系
  • 数据库设计
  • 分布式事务一致性
  • 日志管理与配置
  • Netdata监控系统
  • 系统总览
  • 文档导航
  • 代码审计智能体
  • 测试生成智能体
  • 运维诊断智能体
  • APP测试智能体
  • API自动化测试多智能体协作系统
  • 项目规划
  • 开发工作手册
  • 开发周期管理
  • 任务看板总览
  • Week 3任务看板
  • Week 3周例会
  • Week 4任务看板
  • Week 5任务看板
  • Week 6任务看板
  • APP测试设备采购清单
  • 用户端APP
  • 用户端APP功能脑图
  • 技师端APP
  • 技师端APP功能脑图
  • 后台管理
  • 大数据屏幕
  • 技师端-账户系统需求明细
原型图(Demo)
GitHub
首页
  • APP产品开发方案
  • 商业调查报告
  • 后端技术架构
  • Docker Compose部署指南
  • 技师端-功能模块与微服务对应关系
  • 数据库设计
  • 分布式事务一致性
  • 日志管理与配置
  • Netdata监控系统
  • 系统总览
  • 文档导航
  • 代码审计智能体
  • 测试生成智能体
  • 运维诊断智能体
  • APP测试智能体
  • API自动化测试多智能体协作系统
  • 项目规划
  • 开发工作手册
  • 开发周期管理
  • 任务看板总览
  • Week 3任务看板
  • Week 3周例会
  • Week 4任务看板
  • Week 5任务看板
  • Week 6任务看板
  • APP测试设备采购清单
  • 用户端APP
  • 用户端APP功能脑图
  • 技师端APP
  • 技师端APP功能脑图
  • 后台管理
  • 大数据屏幕
  • 技师端-账户系统需求明细
原型图(Demo)
GitHub
  • 技术架构

    • 麦瓣健康后端技术架构文档
    • /Docker部署指南.html
    • 技师端功能模块与微服务对应关系
    • 数据库设计文档
    • 麦瓣健康 - 微服务分布式事务一致性设计方案
    • 日志管理与配置说明
    • Netdata 监控系统技术文档

麦瓣健康 - 微服务分布式事务一致性设计方案

项目名称: 麦瓣健康 - 分布式事务一致性技术方案
核心方案: 本地消息表 + RabbitMQ(最终一致性)
文档版本: v2.0(生产级优化版)
最后更新: 2025-10-21


一、唯一最优解:本地消息表 + MQ

1.1 为什么不用 Seata

性能对比:

指标本地消息表+MQSeata AT
响应时间80ms500ms (6倍慢)
吞吐量800 TPS100 TPS (8倍低)
复杂度低高(需Seata Server)
稳定性高有单点风险

结论:Seata性能代价太大,不使用。

1.2 核心设计思想

三步走策略:

1. 本地事务:保证核心数据一致(订单状态+消息记录)
2. 定时扫描:发送消息到MQ(可靠性保证)
3. 幂等消费:异步处理业务(性能优化)

适用场景(100%业务):

  • ✅ 订单完成 → 积分发放
  • ✅ 支付成功 → 通知技师
  • ✅ 用户下单 → 发送短信
  • ✅ 技师完单 → 佣金计算
  • ✅ 会员开通 → 发放优惠券
  • ✅ 退款操作 → 余额返还

二、系统架构设计

2.0 整体架构图

2.0.1 微服务全景架构

┌──────────────────────────────────────────────────────────────────────────────────┐
│                           麦瓣健康 - 微服务架构全景图                               │
├──────────────────────────────────────────────────────────────────────────────────┤
│                                                                                  │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│  │  用户端APP   │  │  技师端APP   │  │  管理后台    │  │  H5/APP   │            │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘            │
│         │                 │                 │                 │                   │
│         └─────────────────┴─────────────────┴─────────────────┘                   │
│                                     │                                             │
│                         ┌───────────▼───────────┐                                │
│                         │   API Gateway (网关)   │                                │
│                         │   - 鉴权、限流、路由    │                                │
│                         └───────────┬───────────┘                                │
│                                     │                                             │
│         ┌───────────────────────────┼───────────────────────────┐                │
│         │                           │                           │                │
│  ┌──────▼──────┐           ┌───────▼────────┐         ┌───────▼────────┐        │
│  │  用户服务    │           │   订单服务      │         │   支付服务      │        │
│  │  (8081)     │           │   (8083)       │         │   (8084)       │        │
│  │             │           │  ┌──────────┐  │         │                │        │
│  │ • 用户管理   │◄──────────┤  │ 本地消息表 │  ├────────►│ • 支付流水    │        │
│  │ • 地址管理   │   查询用户  │  │ (Outbox) │  │  发起支付 │ • 退款管理    │        │
│  │ • 家庭成员   │           │  └─────┬────┘  │         │                │        │
│  └─────────────┘           │        │       │         └────────────────┘        │
│         │                  │  ┌─────▼────┐  │                  │                │
│         │                  │  │ 扫描任务  │  │                  │                │
│  ┌──────▼──────┐           │  │ (每5秒)  │  │         ┌────────▼────────┐        │
│  │  技师服务    │           │  └─────┬────┘  │         │   营销服务       │        │
│  │  (8082)     │           │        │       │         │   (8090)        │        │
│  │             │           │        │       │         │                 │        │
│  │ • 技师认证   │◄──────────┤  ┌─────▼────┐  ├────────►│ • 优惠券        │        │
│  │ • 评价管理   │   查询技师  │  │ 事件去重表 │  │  扣券积分 │ • 积分管理      │        │
│  │ • 师徒体系   │           │  │ (Dedup)  │  │         │ • 红包活动      │        │
│  └─────────────┘           │  └──────────┘  │         └─────────────────┘        │
│         │                  └────────┬────────┘                  │                │
│         │                           │                           │                │
│  ┌──────▼──────┐                    │                  ┌────────▼────────┐        │
│  │ 健康管理服务 │                    │                  │   商城服务       │        │
│  │  (8087)     │                    │                  │   (8091)        │        │
│  │             │                    │                  │                 │        │
│  │ • 健康档案   │◄───────────────────┼─────────────────►│ • 商品管理      │        │
│  │ • 护理日志   │      事件通知       │      下单购买     │ • 购物车        │        │
│  │ • IoT设备   │                    │                  │ • 物流跟踪      │        │
│  └─────────────┘                    │                  └─────────────────┘        │
│                                     │                                             │
│         ┌───────────────────────────┼───────────────────────────┐                │
│         │                           │                           │                │
│  ┌──────▼──────┐           ┌───────▼────────┐         ┌───────▼────────┐        │
│  │  IM消息服务  │           │  运营管理服务   │         │  RabbitMQ      │        │
│  │  (8085)     │           │   (8092)       │         │  消息队列       │        │
│  │             │           │                │         │                │        │
│  │ • 即时聊天   │           │ • 审核管理      │         │ ┌────────────┐ │        │
│  │ • 推送通知   │           │ • 数据统计      │         │ │ 点对点队列  │ │        │
│  │ • MongoDB   │           │ • 权限配置      │         │ │GRANT_POINTS│ │        │
│  └─────────────┘           └────────────────┘         │ │COMMISSION  │ │        │
│                                                        │ │SEND_SMS... │ │        │
│                                                        │ └────────────┘ │        │
│                                                        │ ┌────────────┐ │        │
│                                                        │ │  延迟队列   │ │        │
│                                                        │ │  (重试)    │ │        │
│                                                        │ └────────────┘ │        │
│                                                        │ ┌────────────┐ │        │
│                                                        │ │    DLQ     │ │        │
│                                                        │ │ (死信队列)  │ │        │
│                                                        │ └────────────┘ │        │
│                                                        └────────────────┘        │
│                                                                                  │
│  ┌──────────────────────────────────────────────────────────────────────────┐  │
│  │                           基础设施层                                        │  │
│  ├──────────────────────────────────────────────────────────────────────────┤  │
│  │  MySQL 8.0 (主从)  │  Redis 7.x  │  MongoDB 5.0  │  OSS  │  Prometheus  │  │
│  └──────────────────────────────────────────────────────────────────────────┘  │
└──────────────────────────────────────────────────────────────────────────────────┘

2.0.2 分布式事务流程架构(核心)

┌────────────────────────────────────────────────────────────────────────────────┐
│                    分布式事务一致性保证 - Outbox 模式                            │
└────────────────────────────────────────────────────────────────────────────────┘

  【业务场景示例:用户下单 → 扣库存 → 支付 → 发积分 → 通知技师】

┌─────────────────────────────────────────────────────────────────────────────┐
│  阶段1:本地事务(订单服务)- 保证原子性                                       │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌──────────────────┐                                                      │
│   │   订单服务 DB     │                                                      │
│   │   (MySQL)        │                                                      │
│   └────────┬─────────┘                                                      │
│            │                                                                │
│   BEGIN TRANSACTION;                                                        │
│   ┌────────▼──────────────────────────────────────────────┐                │
│   │ 1. UPDATE orders SET status='PAID'                    │                │
│   │    WHERE order_id = 12345;                            │                │
│   │                                                        │                │
│   │ 2. UPDATE sku_stock SET stock = stock - 1            │                │
│   │    WHERE sku_id = 789;                                │                │
│   │                                                        │                │
│   │ 3. INSERT INTO local_message_table VALUES(           │                │
│   │      event_id: 'uuid-001',                           │                │
│   │      biz_type: 'ORDER_PAID',                         │                │
│   │      routing_key: 'order.paid',                      │                │
│   │      content: '{"orderId":12345,"userId":100,...}',  │                │
│   │      status: 'PENDING'                               │                │
│   │    );                                                 │                │
│   │                                                        │                │
│   │ 4. INSERT INTO local_message_table VALUES(           │                │
│   │      event_id: 'uuid-002',                           │                │
│   │      biz_type: 'GRANT_POINTS',                       │                │
│   │      routing_key: 'points.grant',                    │                │
│   │      content: '{"orderId":12345,"points":50,...}',   │                │
│   │      status: 'PENDING'                               │                │
│   │    );                                                 │                │
│   └───────────────────────────────────────────────────────┘                │
│   COMMIT;   ✅ 订单状态、库存、消息记录 100% 一致                             │
│                                                                             │
│   响应用户:200 OK "下单成功"  (耗时 < 100ms)                                │
└─────────────────────────────────────────────────────────────────────────────┘
                                     │
                                     ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│  阶段2:异步发布(订单服务)- 可靠投递                                         │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌──────────────────┐         ┌──────────────────┐                        │
│   │  定时扫描任务     │         │  RabbitMQ       │                        │
│   │  (每5秒执行)     │         │  消息代理        │                        │
│   └────────┬─────────┘         └────────▲─────────┘                        │
│            │                            │                                  │
│   ┌────────▼────────────────────────────┴────────────────┐                │
│   │ SELECT id, event_id, routing_key, content            │                │
│   │ FROM local_message_table                             │                │
│   │ WHERE status = 'PENDING'                             │
│   │   AND next_retry_time <= NOW()                       │                │
│   │ ORDER BY next_retry_time ASC                         │                │
│   │ LIMIT 200                                             │                │
│   │ FOR UPDATE SKIP LOCKED;  ← 多实例并发安全              │                │
│   │                                                       │                │
│   │ UPDATE SET status = 'SENDING';  ← 中间态防重           │                │
│   └───────────────────────────────────────────────────────┘                │
│                                                                             │
│   ┌───────────────────────────────────────────────────────┐                │
│   │ FOR EACH message:                                     │                │
│   │   rabbitTemplate.convertAndSend(                     │                │
│   │     exchange: "maiban.events",                       │                │
│   │     routingKey: message.routing_key,                 │                │
│   │     body: message.content,                           │                │
│   │     messageId: message.event_id,                     │                │
│   │     correlationData: new CorrelationData(event_id)   │                │
│   │   );                                                  │                │
│   └───────────────────────────────────────────────────────┘                │
│                                                                             │
│   ┌───────────────────────────────────────────────────────┐                │
│   │  Publisher Confirm 回调:                              │                │
│   │  ✅ ACK  → UPDATE status='SENT', sent_at=NOW()        │                │
│   │  ❌ NACK → UPDATE status='PENDING', retry_count+1,    │                │
│   │           next_retry_time = NOW() + 退避延迟            │                │
│   └───────────────────────────────────────────────────────┘                │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
                                     │
                                     ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│  阶段3:幂等消费(下游服务)- 最终一致性                                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  ┌──────────────────────────────────────────────────────────────────────┐  │
│  │  营销服务 (8090) - 监听队列: points.grant                             │  │
│  └──────────────────────────────────────────────────────────────────────┘  │
│                                     │                                       │
│   ┌─────────────────────────────────▼────────────────────────────┐         │
│   │ @RabbitListener(queues = "GRANT_POINTS.queue")               │         │
│   │ public void handle(Message msg, Channel channel) {           │         │
│   │                                                               │         │
│   │   String eventId = msg.getMessageProperties().getMessageId();│         │
│   │                                                               │         │
│   │   // 1️⃣  幂等检查(持久化去重表)                              │         │
│   │   if (!dedupRepo.markConsumed(eventId)) {                    │         │
│   │     channel.basicAck(deliveryTag, false);                    │         │
│   │     return; // 已处理过,跳过                                 │         │
│   │   }                                                           │         │
│   │                                                               │         │
│   │   try {                                                       │         │
│   │     // 2️⃣  执行业务逻辑                                        │         │
│   │     PointsEvent event = JSON.parse(msg.getBody());           │         │
│   │     pointsService.grant(event.getUserId(), 50);              │         │
│   │                                                               │         │
│   │     // 3️⃣  手动 ACK(业务成功后)                              │         │
│   │     channel.basicAck(deliveryTag, false);                    │         │
│   │                                                               │         │
│   │     // 4️⃣  更新出库表状态(可选,用于对账)                     │         │
│   │     messageMapper.updateCompleted(eventId);                  │         │
│   │                                                               │         │
│   │   } catch (Exception e) {                                    │         │
│   │     // 5️⃣  NACK 不重回队列,进入延迟队列/DLQ                   │         │
│   │     channel.basicNack(deliveryTag, false, false);            │         │
│   │   }                                                           │         │
│   │ }                                                             │         │
│   └───────────────────────────────────────────────────────────────┘         │
│                                                                             │
│  ✅ 结果:用户积分已发放,数据最终一致                                         │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
                                     │
                         ┌───────────┴───────────┐
                         │                       │
                 ✅ 消费成功                ❌ 消费失败(超3次)
                         │                       │
                         ▼                       ▼
            ┌─────────────────────┐   ┌──────────────────────┐
            │  更新状态: COMPLETED │   │   进入 DLQ 死信队列   │
            │  event_dedup 保留7天 │   │   告警 + 人工介入     │
            └─────────────────────┘   └──────────────────────┘

2.0.3 数据流与状态机

┌────────────────────────────────────────────────────────────────┐
│              本地消息表状态机 (local_message_table)              │
└────────────────────────────────────────────────────────────────┘

        ┌─────────┐
        │ PENDING │ ← 初始状态(业务事务插入)
        └────┬────┘
             │
             │ 扫描任务抢占(FOR UPDATE SKIP LOCKED)
             ▼
        ┌─────────┐
        │ SENDING │ ← 中间态(正在发送MQ)
        └────┬────┘
             │
      ┌──────┴──────┐
      │             │
   ACK确认       NACK失败
      │             │
      ▼             ▼
 ┌────────┐   ┌──────────┐
 │  SENT  │   │ PENDING  │ ← 回到待发送(增加重试次数)
 └────────┘   └────┬─────┘
                   │
                   │ 重试超限(>5次)
                   ▼
              ┌─────────┐
              │ FAILED  │ ← 失败终态(告警人工介入)
              └─────────┘


┌────────────────────────────────────────────────────────────────┐
│                消费端去重表 (event_dedup)                        │
└────────────────────────────────────────────────────────────────┘

   消息到达
       │
       ▼
   ┌────────────────┐
   │ 查询 event_id   │
   │ 是否存在?       │
   └───────┬────────┘
           │
    ┌──────┴──────┐
    │             │
  存在          不存在
    │             │
    ▼             ▼
 ┌──────┐    ┌──────────┐
 │ 跳过  │    │ 插入记录  │
 │ ACK  │    │ 执行业务  │
 └──────┘    └──────────┘
                   │
                   ▼
              ┌────────┐
              │ SUCCESS │
              │  ACK   │
              └────────┘

三、实现方案

3.1 架构流程(简化版)

┌─────────────────────────────────────────────────┐
│  步骤1:本地事务(原子性)                         │
├─────────────────────────────────────────────────┤
│  订单服务 BEGIN TRANSACTION                       │
│  ├─ UPDATE orders SET status='COMPLETED'        │
│  └─ INSERT local_message (type, biz_id, data)  │
│  COMMIT                                         │
└─────────────────────────────────────────────────┘
           ↓
┌─────────────────────────────────────────────────┐
│  步骤2:定时扫描发送(每5秒)                       │
├─────────────────────────────────────────────────┤
│  SELECT * FROM local_message WHERE pending      │
│  ↓                                              │
│  RabbitMQ.send(message)                         │
│  ↓                                              │
│  UPDATE status='SENT'                           │
└─────────────────────────────────────────────────┘
           ↓
┌─────────────────────────────────────────────────┐
│  步骤3:幂等消费(异步)                           │
├─────────────────────────────────────────────────┤
│  营销服务监听 MQ                                  │
│  ├─ Redis.setIfAbsent(key) → 幂等检查           │
│  ├─ 发放积分                                     │
│  └─ UPDATE message status='COMPLETED'           │
└─────────────────────────────────────────────────┘

3.2 数据库设计

核心设计要点:

  • 全局唯一事件ID(支持幂等与回执追踪)
  • 状态细分(PENDING/SENDING/SENT/COMPLETED/FAILED)
  • 并发安全索引(支持 SKIP LOCKED)
  • 可观测字段(错误记录、时间戳)
  • 分区与归档(历史数据治理)
CREATE TABLE local_message_table (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    event_id CHAR(36) NOT NULL COMMENT '全局唯一事件ID(UUID)',
    biz_type VARCHAR(50) NOT NULL COMMENT '业务类型/事件类型',
    biz_id BIGINT NOT NULL COMMENT '业务聚合ID(如订单ID)',
    exchange VARCHAR(100) DEFAULT NULL COMMENT '目标交换机',
    routing_key VARCHAR(100) DEFAULT NULL COMMENT '路由键',
    content MEDIUMTEXT COMMENT '事件负载(JSON,大负载建议压缩/外置)',
    status VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT 'PENDING/SENDING/SENT/COMPLETED/FAILED',
    retry_count INT NOT NULL DEFAULT 0,
    next_retry_time DATETIME,
    last_error_at DATETIME NULL COMMENT '最后失败时间',
    last_error_msg VARCHAR(500) NULL COMMENT '失败原因',
    sent_at DATETIME NULL COMMENT '发送成功时间',
    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    UNIQUE KEY uk_event_id (event_id),
    KEY idx_scan (status, next_retry_time, id),
    KEY idx_created (created_at)
) COMMENT='本地消息表(Outbox模式)';

-- 去重表(消费端幂等)
CREATE TABLE event_dedup (
    event_id CHAR(36) PRIMARY KEY,
    consumed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
    KEY idx_consumed_at (consumed_at)
) COMMENT='事件去重表(保留7天)';

运维建议:

  • 历史数据按 created_at 分区,定期归档(如T+30天)
  • 大负载消息(>1MB)考虑外置到对象存储,仅存储引用
  • event_dedup 表定期清理(保留7天即可)

3.3 核心代码实现(生产级)

3.3.1 RabbitMQ 配置(启用 Confirm & Returns)

spring:
  rabbitmq:
    host: ${RABBITMQ_HOST:localhost}
    port: 5672
    username: ${RABBITMQ_USER:guest}
    password: ${RABBITMQ_PASS:guest}
    publisher-confirm-type: correlated  # 发布确认
    publisher-returns: true             # 路由失败回调
    template:
      mandatory: true                   # 强制路由检查
    listener:
      simple:
        acknowledge-mode: manual        # 手动ACK
        prefetch: 50                    # 消费者预取

3.3.2 步骤1:本地事务保存(带事件信封)

@Service
public class OrderService {
    
    @Transactional(rollbackFor = Exception.class)
    public void completeOrder(Long orderId) {
        // 1. 更新订单状态
        orderMapper.updateStatus(orderId, OrderStatus.COMPLETED);
        
        // 2. 构建事件信封(标准化)
        String eventId = UUID.randomUUID().toString();
        EventEnvelope<PointsPayload> envelope = EventEnvelope.builder()
            .eventId(eventId)
            .eventType("GRANT_POINTS")
            .aggregateId("order:" + orderId)
            .version(1)
            .occurredAt(LocalDateTime.now())
            .traceId(MDC.get("traceId"))
            .payload(new PointsPayload(orderId, userId, points))
            .build();
        
        // 3. 保存消息记录(同一事务,原子性保证)
        LocalMessage msg = LocalMessage.builder()
            .eventId(eventId)
            .bizType("GRANT_POINTS")
            .bizId(orderId)
            .routingKey("points.grant")
            .content(JSON.toJSONString(envelope))
            .status("PENDING")
            .nextRetryTime(LocalDateTime.now())
            .build();
        messageMapper.insert(msg);
        
        // 事务提交 → 订单状态和消息记录100%一致
    }
}

3.3.3 步骤2:并发扫描发送(SKIP LOCKED + Publisher Confirms)

@Component
public class MessageScanTask {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void initCallbacks() {
        // Confirm回调:MQ Broker确认收到
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            String eventId = correlationData != null ? correlationData.getId() : null;
            if (ack) {
                messageMapper.updateStatusByEventId(eventId, "SENT", LocalDateTime.now());
            } else {
                log.error("发布失败 eventId={}, cause={}", eventId, cause);
                messageMapper.recordError(eventId, cause, calculateNextRetry());
            }
        });
        
        // Return回调:路由失败
        rabbitTemplate.setReturnsCallback(returned -> {
            String eventId = returned.getMessage().getMessageProperties().getMessageId();
            log.error("路由失败 eventId={}, reply={}", eventId, returned.getReplyText());
            messageMapper.recordError(eventId, returned.getReplyText(), calculateNextRetry());
        });
    }
    
    @Scheduled(cron = "*/5 * * * * ?") // 每5秒
    public void scan() {
        // 批量锁定(支持多实例并发,不重复)
        List<LocalMessage> batch = messageMapper.lockBatchForSending(200);
        
        for (LocalMessage msg : batch) {
            try {
                // 发送MQ(带CorrelationData用于Confirm回调)
                CorrelationData correlation = new CorrelationData(msg.getEventId());
                rabbitTemplate.convertAndSend(
                    defaultIfNull(msg.getExchange(), "maiban.events"),
                    msg.getRoutingKey(),
                    msg.getContent(),
                    message -> {
                        message.getMessageProperties().setMessageId(msg.getEventId());
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        return message;
                    },
                    correlation
                );
                // 注意:此处不更新状态,等Confirm回调时更新为SENT
            } catch (Exception e) {
                log.error("发送异常 eventId={}", msg.getEventId(), e);
                messageMapper.recordError(msg.getEventId(), e.getMessage(), calculateNextRetry());
            }
        }
    }
    
    // 指数退避 + 随机抖动(避免重试风暴)
    private LocalDateTime calculateNextRetry(int retryCount) {
        long baseDelayMs = switch(retryCount) {
            case 0 -> 5_000;      // 5秒
            case 1 -> 30_000;     // 30秒
            case 2 -> 300_000;    // 5分钟
            default -> 3600_000;  // 1小时(超限后降级)
        };
        long jitter = (long)(baseDelayMs * 0.1 * Math.random()); // ±10%抖动
        return LocalDateTime.now().plus(baseDelayMs + jitter, ChronoUnit.MILLIS);
    }
}

Mapper SQL(并发安全锁定):

<select id="lockBatchForSending" resultType="LocalMessage">
    SELECT id, event_id, biz_type, biz_id, exchange, routing_key, content
    FROM local_message_table
    WHERE status = 'PENDING' 
      AND next_retry_time &lt;= NOW()
    ORDER BY next_retry_time ASC, id ASC
    LIMIT #{batchSize}
    FOR UPDATE SKIP LOCKED
</select>

<update id="updateStatusByEventId">
    UPDATE local_message_table 
    SET status = #{status}, sent_at = #{sentAt}, updated_at = NOW()
    WHERE event_id = #{eventId}
</update>

3.3.4 步骤3:手动 ACK + 持久幂等消费

@Component
public class PointsConsumer {
    
    @RabbitListener(queues = "GRANT_POINTS.queue")
    public void handle(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String eventId = message.getMessageProperties().getMessageId();
        String body = new String(message.getBody(), StandardCharsets.UTF_8);
        
        try {
            // 1. 持久幂等检查(数据库去重表)
            if (!dedupRepository.markConsumed(eventId)) {
                log.info("重复消息,已跳过 eventId={}", eventId);
                channel.basicAck(deliveryTag, false);
                return;
            }
            
            // 2. 解析事件
            EventEnvelope<PointsPayload> envelope = JSON.parseObject(body, 
                new TypeReference<EventEnvelope<PointsPayload>>(){});
            PointsPayload payload = envelope.getPayload();
            
            // 3. 执行业务(业务层也需保证幂等)
            pointsService.grant(payload.getUserId(), payload.getPoints(), eventId);
            
            // 4. 手动ACK(业务成功后再ACK)
            channel.basicAck(deliveryTag, false);
            
            // 5. 更新出库表状态(可选,用于对账)
            messageMapper.updateCompleted(eventId);
            
        } catch (Exception e) {
            log.error("消费失败 eventId={}", eventId, e);
            // NACK不重回队列,进入DLQ或延迟队列
            channel.basicNack(deliveryTag, false, false);
        }
    }
}

去重Repository(推荐用数据库而非短TTL Redis锁):

@Repository
public class EventDedupRepository {
    
    public boolean markConsumed(String eventId) {
        try {
            jdbcTemplate.update(
                "INSERT INTO event_dedup(event_id) VALUES(?)", 
                eventId
            );
            return true; // 首次消费
        } catch (DuplicateKeyException e) {
            return false; // 重复消费
        }
    }
}

四、实际业务场景

4.1 场景1:技师完单流程

业务需求:

技师完成服务 → 订单完成 → 佣金计算 → 师傅提成 → 积分发放 → 推送通知

实现代码:

// 主流程:快速响应(<100ms)
@Transactional
public void completeService(Long orderId) {
    // 1. 更新订单状态
    orderMapper.updateStatus(orderId, OrderStatus.COMPLETED);
    
    // 2. 批量插入消息(一个事务)
    messageMapper.batchInsert(Arrays.asList(
        buildMessage("CALCULATE_COMMISSION", orderId),  // 佣金
        buildMessage("MENTOR_COMMISSION", orderId),     // 师傅提成
        buildMessage("GRANT_POINTS", orderId),          // 积分
        buildMessage("SEND_NOTIFICATION", orderId)      // 通知
    ));
}

// 异步消费者1:佣金计算
@RabbitListener(queues = "CALCULATE_COMMISSION.queue")
public void handleCommission(OrderEvent event) {
    BigDecimal amount = event.getAmount().multiply(new BigDecimal("0.15"));
    commissionService.create(event.getNurseId(), amount);
}

// 异步消费者2:师傅提成
@RabbitListener(queues = "MENTOR_COMMISSION.queue")
public void handleMentor(OrderEvent event) {
    MentorRelation relation = mentorService.get(event.getNurseId());
    if (relation != null) {
        BigDecimal amount = event.getAmount().multiply(new BigDecimal("0.02"));
        commissionService.create(relation.getMentorId(), amount);
    }
}

4.2 场景2:用户下单支付

业务需求:

创建订单 → 支付 → 通知技师 → 发放积分 → 发送短信

实现代码:

// 阶段1:创建订单(本地事务)
@Transactional
public Order createOrder(OrderDTO dto) {
    Order order = new Order();
    order.setStatus(OrderStatus.PENDING);
    orderMapper.insert(order);
    return order;
}

// 阶段2:支付(本地事务 + 消息)
@Transactional
public void payOrder(Long orderId) {
    // 1. 更新订单状态
    orderMapper.updateStatus(orderId, OrderStatus.PAID);
    
    // 2. 保存后续流程消息
    messageMapper.batchInsert(Arrays.asList(
        buildMessage("NOTIFY_NURSE", orderId),
        buildMessage("GRANT_POINTS", orderId),
        buildMessage("SEND_SMS", orderId)
    ));
}

关键点:

  • 主流程快速完成(<100ms)
  • 后续流程异步处理,不阻塞用户
  • 失败自动重试,不丢消息

五、特殊场景处理

5.1 涉及资金的退款场景

方案:仍然用本地消息表,不用Seata

实现:

// 退款主流程
@Transactional
public void refundOrder(Long orderId) {
    Order order = orderMapper.selectById(orderId);
    
    // 1. 更新订单状态为退款中
    order.setStatus(OrderStatus.REFUNDING);
    orderMapper.updateById(order);
    
    // 2. 保存退款消息
    LocalMessage msg = buildMessage("PROCESS_REFUND", orderId);
    messageMapper.insert(msg);
}

// 退款消费者(支付服务)
@RabbitListener(queues = "PROCESS_REFUND.queue")
public void handleRefund(RefundEvent event) {
    String lockKey = "refund:" + event.getOrderId();
    
    if (redisTemplate.setIfAbsent(lockKey, "1", 5, TimeUnit.MINUTES)) {
        try {
            // 1. 调用微信/支付宝退款API
            RefundResult result = wechatPayService.refund(event);
            
            if (result.isSuccess()) {
                // 2. 创建退款记录
                paymentMapper.insertRefund(event.getOrderId(), event.getAmount());
                
                // 3. 通知订单服务更新状态
                orderService.updateStatus(event.getOrderId(), OrderStatus.REFUNDED);
                
                // 4. 发送余额返还消息
                LocalMessage balanceMsg = buildMessage("ADD_BALANCE", event);
                messageMapper.insert(balanceMsg);
            }
        } catch (Exception e) {
            log.error("退款失败", e);
            throw e; // 触发MQ重试
        }
    }
}

// 余额返还消费者(用户服务)
@RabbitListener(queues = "ADD_BALANCE.queue")
public void handleAddBalance(BalanceEvent event) {
    if (redisTemplate.setIfAbsent("balance:" + event.getOrderId(), "1")) {
        userMapper.addBalance(event.getUserId(), event.getAmount());
    }
}

为什么不用Seata:

  • 退款是异步操作(微信/支付宝需要时间处理)
  • 本地消息表能保证可靠性
  • 性能好,不阻塞用户

5.2 秒杀库存扣减

方案:Redis预扣减 + 本地消息表

实现:

// 秒杀下单
public boolean seckillOrder(Long productId, Long userId) {
    // 1. Redis原子扣减库存
    Long stock = redisTemplate.opsForValue().decrement("seckill:stock:" + productId);
    
    if (stock < 0) {
        // 库存不足
        redisTemplate.opsForValue().increment("seckill:stock:" + productId);
        return false;
    }
    
    // 2. 创建订单 + 消息
    createOrderWithMessage(productId, userId);
    
    return true;
}

// 异步扣减数据库库存
@RabbitListener(queues = "DEDUCT_STOCK.queue")
public void deductStock(StockEvent event) {
    stockMapper.deduct(event.getProductId(), event.getQuantity());
}

六、数据对账(兜底保障)

6.1 每日对账任务

@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点
public void dailyReconciliation() {
    LocalDate yesterday = LocalDate.now().minusDays(1);
    
    // 1. 查询昨日已支付订单
    List<Order> paidOrders = orderMapper.selectPaidOrders(yesterday);
    
    // 2. 查询对应支付流水
    Map<Long, Payment> paymentMap = paymentService.batchQuery(orderIds);
    
    // 3. 比对差异
    List<Long> inconsistent = new ArrayList<>();
    for (Order order : paidOrders) {
        if (paymentMap.get(order.getId()) == null) {
            inconsistent.add(order.getId());
        }
    }
    
    // 4. 告警 + 自动补偿
    if (!inconsistent.isEmpty()) {
        alarmService.send("发现" + inconsistent.size() + "条差异");
        compensationService.fix(inconsistent);
    }
}

6.2 关键指标

指标目标值
消息延迟< 10秒
对账差异率< 0.1%
补偿成功率> 99%

七、最佳实践

7.1 幂等性保证(多层防护)

7.1.1 推荐方案:持久去重表(最可靠)

优势:即使消息延迟数天后重投递也能去重,避免短TTL锁过期失效。

-- 去重表(已在2.2节定义)
CREATE TABLE event_dedup (
    event_id CHAR(36) PRIMARY KEY,
    consumed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
    KEY idx_consumed_at (consumed_at)
);
@Repository
public class EventDedupRepository {
    
    @Transactional
    public boolean markConsumed(String eventId) {
        try {
            jdbcTemplate.update(
                "INSERT INTO event_dedup(event_id, consumed_at) VALUES(?, NOW())", 
                eventId
            );
            return true; // 首次消费,允许处理
        } catch (DuplicateKeyException e) {
            return false; // 重复消费,已跳过
        }
    }
}

// 定期清理(保留7天即可)
@Scheduled(cron = "0 0 3 * * ?")
public void cleanupDedup() {
    jdbcTemplate.update(
        "DELETE FROM event_dedup WHERE consumed_at < DATE_SUB(NOW(), INTERVAL 7 DAY)"
    );
}

7.1.2 业务唯一索引(核心防护)

关键业务必须有数据库唯一约束,作为最后防线。

-- 示例:积分发放
CREATE UNIQUE INDEX uk_points_event ON user_points_log(event_id);

-- 示例:退款记录
CREATE UNIQUE INDEX uk_refund_order ON refunds(order_id);

-- 示例:佣金发放
CREATE UNIQUE INDEX uk_commission_event ON commissions(event_id, nurse_id);

7.1.3 外部调用幂等键(支付/短信等)

// 退款必须带幂等键
RefundRequest request = RefundRequest.builder()
    .outRefundNo(eventId)  // 幂等键(对方去重)
    .orderId(orderId)
    .amount(amount)
    .build();

RefundResult result = wechatPayService.refund(request);

// 我方也需记录,避免重复退款
refundMapper.insert(new Refund(eventId, orderId, amount));

7.1.4 状态机(辅助检查)

// 订单状态流转
@Transactional
public void updateOrderStatus(Long orderId, OrderStatus targetStatus) {
    Order order = orderMapper.selectForUpdate(orderId);
    
    if (!order.getStatus().canTransitionTo(targetStatus)) {
        log.warn("状态转换非法 orderId={}, from={}, to={}", 
            orderId, order.getStatus(), targetStatus);
        return; // 已处理过或状态非法
    }
    
    orderMapper.updateStatus(orderId, targetStatus);
}

7.1.5 Redis 短期锁(可选,限流场景)

仅用于需要即时去重的高频场景(如防刷、限流),不推荐作为主幂等手段。

// 防止5分钟内重复操作
String lockKey = "dedup:action:" + userId + ":" + actionType;
if (redisTemplate.setIfAbsent(lockKey, "1", 5, TimeUnit.MINUTES)) {
    // 执行操作
}

幂等性分层总结:

  1. 消息层:event_dedup 表去重(T+7天有效)
  2. 业务层:唯一索引约束(永久有效)
  3. 外部层:第三方幂等键(对方去重)
  4. 状态层:状态机流转检查(逻辑保护)

7.2 失败重试策略(分层设计)

7.2.1 发布端重试(数据库扫描 + 抖动)

指数退避 + 随机抖动(已在 2.3.3 实现):

private LocalDateTime calculateNextRetry(int retryCount) {
    long baseDelayMs = switch(retryCount) {
        case 0 -> 5_000;      // 5秒
        case 1 -> 30_000;     // 30秒
        case 2 -> 300_000;    // 5分钟
        case 3 -> 1_800_000;  // 30分钟
        default -> 3_600_000; // 1小时(超限降级)
    };
    // ±10% 随机抖动,避免重试风暴
    long jitter = (long)(baseDelayMs * 0.1 * (Math.random() * 2 - 1));
    return LocalDateTime.now().plus(baseDelayMs + jitter, ChronoUnit.MILLIS);
}

// 超过5次失败 → 置为 FAILED,告警人工介入
if (msg.getRetryCount() >= 5) {
    messageMapper.updateStatus(msg.getEventId(), "FAILED");
    alarmService.sendAlert("消息发送失败超限", msg);
}

7.2.2 消费端重试(MQ 延迟队列 + DLX)

推荐:利用 RabbitMQ 插件/特性实现自动延迟重试,降低 DB 压力。

方案A:使用 x-delayed-message 插件(推荐)

// 1. 安装插件
// rabbitmq-plugins enable rabbitmq_delayed_message_exchange

// 2. 声明延迟交换机
@Bean
public CustomExchange delayedExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "topic");
    return new CustomExchange("maiban.events.delayed", "x-delayed-message", true, false, args);
}

// 3. 消费失败时重投延迟队列
@RabbitListener(queues = "GRANT_POINTS.queue")
public void handle(Message message, Channel channel) throws IOException {
    try {
        // 业务处理...
        channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        Integer retryCount = (Integer) message.getMessageProperties()
            .getHeader("x-retry-count");
        retryCount = (retryCount == null) ? 0 : retryCount + 1;
        
        if (retryCount >= 3) {
            // 超限进DLQ
            channel.basicNack(deliveryTag, false, false);
            dlqHandler.handle(message);
        } else {
            // 延迟重投
            long delayMs = calculateDelay(retryCount); // 5s, 30s, 5min
            message.getMessageProperties().setHeader("x-delay", delayMs);
            message.getMessageProperties().setHeader("x-retry-count", retryCount);
            
            rabbitTemplate.convertAndSend(
                "maiban.events.delayed",
                message.getMessageProperties().getReceivedRoutingKey(),
                message
            );
            
            channel.basicAck(deliveryTag, false); // ACK原消息
        }
    }
}

方案B:使用 TTL + DLX(无需插件)

// 1. 声明死信交换机与延迟队列
@Bean
public Queue retryQueue() {
    return QueueBuilder.durable("GRANT_POINTS.retry")
        .withArgument("x-dead-letter-exchange", "maiban.events")
        .withArgument("x-dead-letter-routing-key", "points.grant")
        .withArgument("x-message-ttl", 5000) // 5秒后重投
        .build();
}

// 2. 消费失败投递到重试队列
catch (Exception e) {
    rabbitTemplate.convertAndSend("GRANT_POINTS.retry", message);
    channel.basicAck(deliveryTag, false);
}

7.2.3 DLQ(死信队列)处理

@Component
public class DLQHandler {
    
    @RabbitListener(queues = "maiban.events.dlq")
    public void handleDLQ(Message message) {
        String eventId = message.getMessageProperties().getMessageId();
        String body = new String(message.getBody());
        
        // 记录到数据库
        dlqRecordMapper.insert(DLQRecord.builder()
            .eventId(eventId)
            .routingKey(message.getMessageProperties().getReceivedRoutingKey())
            .body(body)
            .failReason(message.getMessageProperties().getHeader("x-exception-message"))
            .retryCount(message.getMessageProperties().getHeader("x-retry-count"))
            .createdAt(LocalDateTime.now())
            .build());
        
        // 告警
        alarmService.sendAlert("消息进入DLQ", eventId);
    }
    
    // 人工修复后批量回放
    public void replayFromDLQ(List<String> eventIds) {
        List<DLQRecord> records = dlqRecordMapper.selectByIds(eventIds);
        for (DLQRecord record : records) {
            rabbitTemplate.convertAndSend(
                "maiban.events.delayed",
                record.getRoutingKey(),
                record.getBody(),
                msg -> {
                    msg.getMessageProperties().setHeader("x-delay", 0);
                    msg.getMessageProperties().setHeader("x-retry-count", 0);
                    return msg;
                }
            );
        }
    }
}

7.2.4 重试策略总结

层级机制延迟策略上限超限处理
发布端DB扫描 + 定时任务5s/30s/5m/30m/1h5次FAILED状态 + 告警
消费端MQ延迟队列5s/30s/5m3次进DLQ
DLQ人工回放修复后按需-运维介入

7.3 监控告警与运维(可观测性)

7.3.1 核心监控指标

出库表(Outbox)指标:

// Prometheus 指标采集
@Component
public class OutboxMetrics {
    
    @Scheduled(fixedRate = 10000) // 每10秒
    public void collect() {
        // 1. 各状态消息数量
        Map<String, Long> countByStatus = messageMapper.countByStatus();
        meterRegistry.gauge("maiban_outbox_pending", countByStatus.getOrDefault("PENDING", 0L));
        meterRegistry.gauge("maiban_outbox_sending", countByStatus.getOrDefault("SENDING", 0L));
        meterRegistry.gauge("maiban_outbox_failed", countByStatus.getOrDefault("FAILED", 0L));
        
        // 2. 最老未发送消息的年龄(秒)
        LocalDateTime oldest = messageMapper.selectOldestPending();
        if (oldest != null) {
            long ageSeconds = Duration.between(oldest, LocalDateTime.now()).getSeconds();
            meterRegistry.gauge("maiban_outbox_oldest_pending_age_seconds", ageSeconds);
        }
        
        // 3. 过去1分钟发送成功/失败数
        meterRegistry.counter("maiban_outbox_sent_total").increment(sentCount);
        meterRegistry.counter("maiban_outbox_failed_total").increment(failedCount);
    }
}

MQ 消费者指标:

@Component
public class ConsumerMetrics {
    
    @Around("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
    public Object measureConsume(ProceedingJoinPoint joinPoint) throws Throwable {
        String queueName = extractQueueName(joinPoint);
        Timer.Sample sample = Timer.start(meterRegistry);
        
        try {
            Object result = joinPoint.proceed();
            sample.stop(Timer.builder("maiban_consume_duration_ms")
                .tag("queue", queueName)
                .tag("status", "success")
                .register(meterRegistry));
            return result;
        } catch (Exception e) {
            sample.stop(Timer.builder("maiban_consume_duration_ms")
                .tag("queue", queueName)
                .tag("status", "failure")
                .register(meterRegistry));
            meterRegistry.counter("maiban_consume_fail_total", "queue", queueName).increment();
            throw e;
        }
    }
}

关键指标清单:

指标含义告警阈值处理建议
maiban_outbox_pendingPENDING状态消息数>1000扩大批量/增加实例
maiban_outbox_oldest_pending_age_seconds最老消息滞留时长>300秒检查MQ连接/网络
maiban_outbox_failedFAILED状态消息数>10人工介入排查
maiban_publish_confirm_latency_ms发布确认延迟P99>500msMQ性能排查
maiban_consume_duration_ms消费处理时延P95>3s业务逻辑优化
maiban_consume_fail_total消费失败总数增长率>5%/min下游服务排查
maiban_dlq_message_countDLQ堆积数>100批量回放/修复
maiban_dedup_hit_rate去重命中率>10%排查重复投递原因

7.3.2 告警规则配置(Prometheus AlertManager)

groups:
  - name: maiban_outbox_alerts
    rules:
      - alert: OutboxPendingHigh
        expr: maiban_outbox_pending > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "出库表堆积超阈值"
          description: "PENDING消息数: {{ $value }}"
      
      - alert: OutboxOldestMessageStuck
        expr: maiban_outbox_oldest_pending_age_seconds > 300
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "消息滞留超5分钟"
          description: "最老消息年龄: {{ $value }}秒"
      
      - alert: ConsumeFailureRateHigh
        expr: rate(maiban_consume_fail_total[1m]) > 0.05
        for: 3m
        labels:
          severity: warning
        annotations:
          summary: "消费失败率 >5%"
      
      - alert: DLQAccumulating
        expr: maiban_dlq_message_count > 100
        for: 10m
        labels:
          severity: critical
        annotations:
          summary: "DLQ堆积超100条"

7.3.3 运维 Runbook(故障处理手册)

场景1:出库表堆积(PENDING >1000)

# 1. 检查扫描任务是否正常
kubectl logs -f <pod-name> | grep MessageScanTask

# 2. 检查MQ连接
kubectl exec -it <pod-name> -- curl http://rabbitmq:15672/api/connections

# 3. 临时扩容扫描批量(热配置)
curl -X POST http://admin-api/config/update \
  -d '{"key":"outbox.scan.batchSize","value":"500"}'

# 4. 增加扫描实例
kubectl scale deployment order-service --replicas=5

场景2:DLQ 告警(需批量回放)

# 1. 查询 DLQ 原因分布
SELECT fail_reason, COUNT(*) 
FROM dlq_records 
WHERE created_at > DATE_SUB(NOW(), INTERVAL 1 HOUR)
GROUP BY fail_reason;

# 2. 修复根因(如下游服务恢复)后,批量回放
curl -X POST http://admin-api/dlq/replay \
  -H "Content-Type: application/json" \
  -d '{"eventIds": ["uuid1", "uuid2", ...], "delayMs": 0}'

# 3. 监控回放成功率
watch -n 1 'rabbitmqctl list_queues name messages'

场景3:消费延迟高(P95 >3s)

# 1. 定位慢消费队列
curl http://prometheus:9090/api/v1/query?query='topk(5, maiban_consume_duration_ms{quantile="0.95"})'

# 2. 查看该队列消费者日志
kubectl logs -f <pod-name> --tail=100 | grep "GRANT_POINTS"

# 3. 如是业务逻辑慢,优化代码;如是下游慢,增加超时/熔断
# 临时增加消费者并发
kubectl scale deployment marketing-service --replicas=10

场景4:Publisher Confirm 持续失败

# 1. 检查 RabbitMQ 健康
rabbitmqctl status
rabbitmqctl list_queues | grep -E "messages|consumers"

# 2. 检查网络连通性
kubectl exec -it <pod-name> -- telnet rabbitmq 5672

# 3. 降级为本地积压(等MQ恢复后自动发送)
# 无需人工干预,扫描任务会持续重试

7.3.4 日志规范与链路追踪

// 统一日志格式(携带 traceId、eventId)
@Slf4j
public class OrderService {
    
    @Transactional
    public void completeOrder(Long orderId) {
        String traceId = MDC.get("traceId");
        String eventId = UUID.randomUUID().toString();
        
        log.info("订单完成 orderId={}, traceId={}, eventId={}", orderId, traceId, eventId);
        
        // 业务逻辑...
        
        MDC.put("eventId", eventId);
        messageMapper.insert(msg);
        
        log.info("出库消息已保存 eventId={}, bizType={}", eventId, "GRANT_POINTS");
    }
}

// 消费端日志
@RabbitListener(queues = "GRANT_POINTS.queue")
public void handle(Message message) {
    String eventId = message.getMessageProperties().getMessageId();
    String traceId = message.getMessageProperties().getHeader("traceId");
    
    MDC.put("traceId", traceId);
    MDC.put("eventId", eventId);
    
    log.info("开始消费 eventId={}, traceId={}", eventId, traceId);
    // 业务处理...
    log.info("消费完成 eventId={}, cost={}ms", eventId, cost);
}

7.3.5 可视化监控面板(Grafana)

推荐面板布局:

  1. 出库健康:PENDING/SENDING/FAILED 趋势图、最老消息年龄
  2. 发布性能:发送成功率、Confirm延迟分位图、失败原因TOP5
  3. 消费性能:各队列处理时延P50/P95/P99、消费失败率趋势
  4. 异常追踪:DLQ堆积、重复消费次数、超时告警列表
  5. 业务指标:订单完成→积分发放端到端时延、资金类事件成功率

八、总结

核心原则

唯一方案:本地消息表 + RabbitMQ

为什么是最优解:

  • ✅ 性能优异:80ms响应,800 TPS吞吐
  • ✅ 可靠性高:消息不丢失
  • ✅ 实现简单:无需引入Seata Server
  • ✅ 稳定性强:无单点风险
  • ✅ 成本低:只需RabbitMQ

实施要点

  1. 所有跨服务操作都用本地消息表
  2. 消费者必须实现幂等性
  3. 定时扫描间隔5秒
  4. 失败重试最多3次
  5. 每日对账兜底

性能数据

  • 主流程响应:< 100ms
  • 消息延迟:< 10秒
  • 系统吞吐:800 TPS
  • 可用性:99.9%

维护者: 麦瓣健康技术团队
文档版本: v2.0(生产级优化版)
更新时间: 2025-10-21

在 GitHub 上编辑此页
最后更新: 2025/11/10 10:53
贡献者: David, Claude Code
Prev
数据库设计文档
Next
日志管理与配置说明