麦瓣健康 - 微服务分布式事务一致性设计方案
项目名称: 麦瓣健康 - 分布式事务一致性技术方案
核心方案: 本地消息表 + RabbitMQ(最终一致性)
文档版本: v2.0(生产级优化版)
最后更新: 2025-10-21
一、唯一最优解:本地消息表 + MQ
1.1 为什么不用 Seata
性能对比:
| 指标 | 本地消息表+MQ | Seata AT |
|---|---|---|
| 响应时间 | 80ms | 500ms (6倍慢) |
| 吞吐量 | 800 TPS | 100 TPS (8倍低) |
| 复杂度 | 低 | 高(需Seata Server) |
| 稳定性 | 高 | 有单点风险 |
结论:Seata性能代价太大,不使用。
1.2 核心设计思想
三步走策略:
1. 本地事务:保证核心数据一致(订单状态+消息记录)
2. 定时扫描:发送消息到MQ(可靠性保证)
3. 幂等消费:异步处理业务(性能优化)
适用场景(100%业务):
- ✅ 订单完成 → 积分发放
- ✅ 支付成功 → 通知技师
- ✅ 用户下单 → 发送短信
- ✅ 技师完单 → 佣金计算
- ✅ 会员开通 → 发放优惠券
- ✅ 退款操作 → 余额返还
二、系统架构设计
2.0 整体架构图
2.0.1 微服务全景架构
┌──────────────────────────────────────────────────────────────────────────────────┐
│ 麦瓣健康 - 微服务架构全景图 │
├──────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 用户端APP │ │ 技师端APP │ │ 管理后台 │ │ H5/APP │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │ │
│ └─────────────────┴─────────────────┴─────────────────┘ │
│ │ │
│ ┌───────────▼───────────┐ │
│ │ API Gateway (网关) │ │
│ │ - 鉴权、限流、路由 │ │
│ └───────────┬───────────┘ │
│ │ │
│ ┌───────────────────────────┼───────────────────────────┐ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌───────▼────────┐ ┌───────▼────────┐ │
│ │ 用户服务 │ │ 订单服务 │ │ 支付服务 │ │
│ │ (8081) │ │ (8083) │ │ (8084) │ │
│ │ │ │ ┌──────────┐ │ │ │ │
│ │ • 用户管理 │◄──────────┤ │ 本地消息表 │ ├────────►│ • 支付流水 │ │
│ │ • 地址管理 │ 查询用户 │ │ (Outbox) │ │ 发起支付 │ • 退款管理 │ │
│ │ • 家庭成员 │ │ └─────┬────┘ │ │ │ │
│ └─────────────┘ │ │ │ └────────────────┘ │
│ │ │ ┌─────▼────┐ │ │ │
│ │ │ │ 扫描任务 │ │ │ │
│ ┌──────▼──────┐ │ │ (每5秒) │ │ ┌────────▼────────┐ │
│ │ 技师服务 │ │ └─────┬────┘ │ │ 营销服务 │ │
│ │ (8082) │ │ │ │ │ (8090) │ │
│ │ │ │ │ │ │ │ │
│ │ • 技师认证 │◄──────────┤ ┌─────▼────┐ ├────────►│ • 优惠券 │ │
│ │ • 评价管理 │ 查询技师 │ │ 事件去重表 │ │ 扣券积分 │ • 积分管理 │ │
│ │ • 师徒体系 │ │ │ (Dedup) │ │ │ • 红包活动 │ │
│ └─────────────┘ │ └──────────┘ │ └─────────────────┘ │
│ │ └────────┬────────┘ │ │
│ │ │ │ │
│ ┌──────▼──────┐ │ ┌────────▼────────┐ │
│ │ 健康管理服务 │ │ │ 商城服务 │ │
│ │ (8087) │ │ │ (8091) │ │
│ │ │ │ │ │ │
│ │ • 健康档案 │◄───────────────────┼─────────────────►│ • 商品管理 │ │
│ │ • 护理日志 │ 事件通知 │ 下单购买 │ • 购物车 │ │
│ │ • IoT设备 │ │ │ • 物流跟踪 │ │
│ └─────────────┘ │ └─────────────────┘ │
│ │ │
│ ┌───────────────────────────┼───────────────────────────┐ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌───────▼────────┐ ┌───────▼────────┐ │
│ │ IM消息服务 │ │ 运营管理服务 │ │ RabbitMQ │ │
│ │ (8085) │ │ (8092) │ │ 消息队列 │ │
│ │ │ │ │ │ │ │
│ │ • 即时聊天 │ │ • 审核管理 │ │ ┌────────────┐ │ │
│ │ • 推送通知 │ │ • 数据统计 │ │ │ 点对点队列 │ │ │
│ │ • MongoDB │ │ • 权限配置 │ │ │GRANT_POINTS│ │ │
│ └─────────────┘ └────────────────┘ │ │COMMISSION │ │ │
│ │ │SEND_SMS... │ │ │
│ │ └────────────┘ │ │
│ │ ┌────────────┐ │ │
│ │ │ 延迟队列 │ │ │
│ │ │ (重试) │ │ │
│ │ └────────────┘ │ │
│ │ ┌────────────┐ │ │
│ │ │ DLQ │ │ │
│ │ │ (死信队列) │ │ │
│ │ └────────────┘ │ │
│ └────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────────────────┐ │
│ │ 基础设施层 │ │
│ ├──────────────────────────────────────────────────────────────────────────┤ │
│ │ MySQL 8.0 (主从) │ Redis 7.x │ MongoDB 5.0 │ OSS │ Prometheus │ │
│ └──────────────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────────────┘
2.0.2 分布式事务流程架构(核心)
┌────────────────────────────────────────────────────────────────────────────────┐
│ 分布式事务一致性保证 - Outbox 模式 │
└────────────────────────────────────────────────────────────────────────────────┘
【业务场景示例:用户下单 → 扣库存 → 支付 → 发积分 → 通知技师】
┌─────────────────────────────────────────────────────────────────────────────┐
│ 阶段1:本地事务(订单服务)- 保证原子性 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ │
│ │ 订单服务 DB │ │
│ │ (MySQL) │ │
│ └────────┬─────────┘ │
│ │ │
│ BEGIN TRANSACTION; │
│ ┌────────▼──────────────────────────────────────────────┐ │
│ │ 1. UPDATE orders SET status='PAID' │ │
│ │ WHERE order_id = 12345; │ │
│ │ │ │
│ │ 2. UPDATE sku_stock SET stock = stock - 1 │ │
│ │ WHERE sku_id = 789; │ │
│ │ │ │
│ │ 3. INSERT INTO local_message_table VALUES( │ │
│ │ event_id: 'uuid-001', │ │
│ │ biz_type: 'ORDER_PAID', │ │
│ │ routing_key: 'order.paid', │ │
│ │ content: '{"orderId":12345,"userId":100,...}', │ │
│ │ status: 'PENDING' │ │
│ │ ); │ │
│ │ │ │
│ │ 4. INSERT INTO local_message_table VALUES( │ │
│ │ event_id: 'uuid-002', │ │
│ │ biz_type: 'GRANT_POINTS', │ │
│ │ routing_key: 'points.grant', │ │
│ │ content: '{"orderId":12345,"points":50,...}', │ │
│ │ status: 'PENDING' │ │
│ │ ); │ │
│ └───────────────────────────────────────────────────────┘ │
│ COMMIT; ✅ 订单状态、库存、消息记录 100% 一致 │
│ │
│ 响应用户:200 OK "下单成功" (耗时 < 100ms) │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ 阶段2:异步发布(订单服务)- 可靠投递 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ 定时扫描任务 │ │ RabbitMQ │ │
│ │ (每5秒执行) │ │ 消息代理 │ │
│ └────────┬─────────┘ └────────▲─────────┘ │
│ │ │ │
│ ┌────────▼────────────────────────────┴────────────────┐ │
│ │ SELECT id, event_id, routing_key, content │ │
│ │ FROM local_message_table │ │
│ │ WHERE status = 'PENDING' │
│ │ AND next_retry_time <= NOW() │ │
│ │ ORDER BY next_retry_time ASC │ │
│ │ LIMIT 200 │ │
│ │ FOR UPDATE SKIP LOCKED; ← 多实例并发安全 │ │
│ │ │ │
│ │ UPDATE SET status = 'SENDING'; ← 中间态防重 │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ FOR EACH message: │ │
│ │ rabbitTemplate.convertAndSend( │ │
│ │ exchange: "maiban.events", │ │
│ │ routingKey: message.routing_key, │ │
│ │ body: message.content, │ │
│ │ messageId: message.event_id, │ │
│ │ correlationData: new CorrelationData(event_id) │ │
│ │ ); │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Publisher Confirm 回调: │ │
│ │ ✅ ACK → UPDATE status='SENT', sent_at=NOW() │ │
│ │ ❌ NACK → UPDATE status='PENDING', retry_count+1, │ │
│ │ next_retry_time = NOW() + 退避延迟 │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ 阶段3:幂等消费(下游服务)- 最终一致性 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ 营销服务 (8090) - 监听队列: points.grant │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────▼────────────────────────────┐ │
│ │ @RabbitListener(queues = "GRANT_POINTS.queue") │ │
│ │ public void handle(Message msg, Channel channel) { │ │
│ │ │ │
│ │ String eventId = msg.getMessageProperties().getMessageId();│ │
│ │ │ │
│ │ // 1️⃣ 幂等检查(持久化去重表) │ │
│ │ if (!dedupRepo.markConsumed(eventId)) { │ │
│ │ channel.basicAck(deliveryTag, false); │ │
│ │ return; // 已处理过,跳过 │ │
│ │ } │ │
│ │ │ │
│ │ try { │ │
│ │ // 2️⃣ 执行业务逻辑 │ │
│ │ PointsEvent event = JSON.parse(msg.getBody()); │ │
│ │ pointsService.grant(event.getUserId(), 50); │ │
│ │ │ │
│ │ // 3️⃣ 手动 ACK(业务成功后) │ │
│ │ channel.basicAck(deliveryTag, false); │ │
│ │ │ │
│ │ // 4️⃣ 更新出库表状态(可选,用于对账) │ │
│ │ messageMapper.updateCompleted(eventId); │ │
│ │ │ │
│ │ } catch (Exception e) { │ │
│ │ // 5️⃣ NACK 不重回队列,进入延迟队列/DLQ │ │
│ │ channel.basicNack(deliveryTag, false, false); │ │
│ │ } │ │
│ │ } │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │
│ ✅ 结果:用户积分已发放,数据最终一致 │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
│
┌───────────┴───────────┐
│ │
✅ 消费成功 ❌ 消费失败(超3次)
│ │
▼ ▼
┌─────────────────────┐ ┌──────────────────────┐
│ 更新状态: COMPLETED │ │ 进入 DLQ 死信队列 │
│ event_dedup 保留7天 │ │ 告警 + 人工介入 │
└─────────────────────┘ └──────────────────────┘
2.0.3 数据流与状态机
┌────────────────────────────────────────────────────────────────┐
│ 本地消息表状态机 (local_message_table) │
└────────────────────────────────────────────────────────────────┘
┌─────────┐
│ PENDING │ ← 初始状态(业务事务插入)
└────┬────┘
│
│ 扫描任务抢占(FOR UPDATE SKIP LOCKED)
▼
┌─────────┐
│ SENDING │ ← 中间态(正在发送MQ)
└────┬────┘
│
┌──────┴──────┐
│ │
ACK确认 NACK失败
│ │
▼ ▼
┌────────┐ ┌──────────┐
│ SENT │ │ PENDING │ ← 回到待发送(增加重试次数)
└────────┘ └────┬─────┘
│
│ 重试超限(>5次)
▼
┌─────────┐
│ FAILED │ ← 失败终态(告警人工介入)
└─────────┘
┌────────────────────────────────────────────────────────────────┐
│ 消费端去重表 (event_dedup) │
└────────────────────────────────────────────────────────────────┘
消息到达
│
▼
┌────────────────┐
│ 查询 event_id │
│ 是否存在? │
└───────┬────────┘
│
┌──────┴──────┐
│ │
存在 不存在
│ │
▼ ▼
┌──────┐ ┌──────────┐
│ 跳过 │ │ 插入记录 │
│ ACK │ │ 执行业务 │
└──────┘ └──────────┘
│
▼
┌────────┐
│ SUCCESS │
│ ACK │
└────────┘
三、实现方案
3.1 架构流程(简化版)
┌─────────────────────────────────────────────────┐
│ 步骤1:本地事务(原子性) │
├─────────────────────────────────────────────────┤
│ 订单服务 BEGIN TRANSACTION │
│ ├─ UPDATE orders SET status='COMPLETED' │
│ └─ INSERT local_message (type, biz_id, data) │
│ COMMIT │
└─────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────┐
│ 步骤2:定时扫描发送(每5秒) │
├─────────────────────────────────────────────────┤
│ SELECT * FROM local_message WHERE pending │
│ ↓ │
│ RabbitMQ.send(message) │
│ ↓ │
│ UPDATE status='SENT' │
└─────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────┐
│ 步骤3:幂等消费(异步) │
├─────────────────────────────────────────────────┤
│ 营销服务监听 MQ │
│ ├─ Redis.setIfAbsent(key) → 幂等检查 │
│ ├─ 发放积分 │
│ └─ UPDATE message status='COMPLETED' │
└─────────────────────────────────────────────────┘
3.2 数据库设计
核心设计要点:
- 全局唯一事件ID(支持幂等与回执追踪)
- 状态细分(PENDING/SENDING/SENT/COMPLETED/FAILED)
- 并发安全索引(支持 SKIP LOCKED)
- 可观测字段(错误记录、时间戳)
- 分区与归档(历史数据治理)
CREATE TABLE local_message_table (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
event_id CHAR(36) NOT NULL COMMENT '全局唯一事件ID(UUID)',
biz_type VARCHAR(50) NOT NULL COMMENT '业务类型/事件类型',
biz_id BIGINT NOT NULL COMMENT '业务聚合ID(如订单ID)',
exchange VARCHAR(100) DEFAULT NULL COMMENT '目标交换机',
routing_key VARCHAR(100) DEFAULT NULL COMMENT '路由键',
content MEDIUMTEXT COMMENT '事件负载(JSON,大负载建议压缩/外置)',
status VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT 'PENDING/SENDING/SENT/COMPLETED/FAILED',
retry_count INT NOT NULL DEFAULT 0,
next_retry_time DATETIME,
last_error_at DATETIME NULL COMMENT '最后失败时间',
last_error_msg VARCHAR(500) NULL COMMENT '失败原因',
sent_at DATETIME NULL COMMENT '发送成功时间',
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_event_id (event_id),
KEY idx_scan (status, next_retry_time, id),
KEY idx_created (created_at)
) COMMENT='本地消息表(Outbox模式)';
-- 去重表(消费端幂等)
CREATE TABLE event_dedup (
event_id CHAR(36) PRIMARY KEY,
consumed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
KEY idx_consumed_at (consumed_at)
) COMMENT='事件去重表(保留7天)';
运维建议:
- 历史数据按
created_at分区,定期归档(如T+30天) - 大负载消息(>1MB)考虑外置到对象存储,仅存储引用
event_dedup表定期清理(保留7天即可)
3.3 核心代码实现(生产级)
3.3.1 RabbitMQ 配置(启用 Confirm & Returns)
spring:
rabbitmq:
host: ${RABBITMQ_HOST:localhost}
port: 5672
username: ${RABBITMQ_USER:guest}
password: ${RABBITMQ_PASS:guest}
publisher-confirm-type: correlated # 发布确认
publisher-returns: true # 路由失败回调
template:
mandatory: true # 强制路由检查
listener:
simple:
acknowledge-mode: manual # 手动ACK
prefetch: 50 # 消费者预取
3.3.2 步骤1:本地事务保存(带事件信封)
@Service
public class OrderService {
@Transactional(rollbackFor = Exception.class)
public void completeOrder(Long orderId) {
// 1. 更新订单状态
orderMapper.updateStatus(orderId, OrderStatus.COMPLETED);
// 2. 构建事件信封(标准化)
String eventId = UUID.randomUUID().toString();
EventEnvelope<PointsPayload> envelope = EventEnvelope.builder()
.eventId(eventId)
.eventType("GRANT_POINTS")
.aggregateId("order:" + orderId)
.version(1)
.occurredAt(LocalDateTime.now())
.traceId(MDC.get("traceId"))
.payload(new PointsPayload(orderId, userId, points))
.build();
// 3. 保存消息记录(同一事务,原子性保证)
LocalMessage msg = LocalMessage.builder()
.eventId(eventId)
.bizType("GRANT_POINTS")
.bizId(orderId)
.routingKey("points.grant")
.content(JSON.toJSONString(envelope))
.status("PENDING")
.nextRetryTime(LocalDateTime.now())
.build();
messageMapper.insert(msg);
// 事务提交 → 订单状态和消息记录100%一致
}
}
3.3.3 步骤2:并发扫描发送(SKIP LOCKED + Publisher Confirms)
@Component
public class MessageScanTask {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initCallbacks() {
// Confirm回调:MQ Broker确认收到
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String eventId = correlationData != null ? correlationData.getId() : null;
if (ack) {
messageMapper.updateStatusByEventId(eventId, "SENT", LocalDateTime.now());
} else {
log.error("发布失败 eventId={}, cause={}", eventId, cause);
messageMapper.recordError(eventId, cause, calculateNextRetry());
}
});
// Return回调:路由失败
rabbitTemplate.setReturnsCallback(returned -> {
String eventId = returned.getMessage().getMessageProperties().getMessageId();
log.error("路由失败 eventId={}, reply={}", eventId, returned.getReplyText());
messageMapper.recordError(eventId, returned.getReplyText(), calculateNextRetry());
});
}
@Scheduled(cron = "*/5 * * * * ?") // 每5秒
public void scan() {
// 批量锁定(支持多实例并发,不重复)
List<LocalMessage> batch = messageMapper.lockBatchForSending(200);
for (LocalMessage msg : batch) {
try {
// 发送MQ(带CorrelationData用于Confirm回调)
CorrelationData correlation = new CorrelationData(msg.getEventId());
rabbitTemplate.convertAndSend(
defaultIfNull(msg.getExchange(), "maiban.events"),
msg.getRoutingKey(),
msg.getContent(),
message -> {
message.getMessageProperties().setMessageId(msg.getEventId());
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
correlation
);
// 注意:此处不更新状态,等Confirm回调时更新为SENT
} catch (Exception e) {
log.error("发送异常 eventId={}", msg.getEventId(), e);
messageMapper.recordError(msg.getEventId(), e.getMessage(), calculateNextRetry());
}
}
}
// 指数退避 + 随机抖动(避免重试风暴)
private LocalDateTime calculateNextRetry(int retryCount) {
long baseDelayMs = switch(retryCount) {
case 0 -> 5_000; // 5秒
case 1 -> 30_000; // 30秒
case 2 -> 300_000; // 5分钟
default -> 3600_000; // 1小时(超限后降级)
};
long jitter = (long)(baseDelayMs * 0.1 * Math.random()); // ±10%抖动
return LocalDateTime.now().plus(baseDelayMs + jitter, ChronoUnit.MILLIS);
}
}
Mapper SQL(并发安全锁定):
<select id="lockBatchForSending" resultType="LocalMessage">
SELECT id, event_id, biz_type, biz_id, exchange, routing_key, content
FROM local_message_table
WHERE status = 'PENDING'
AND next_retry_time <= NOW()
ORDER BY next_retry_time ASC, id ASC
LIMIT #{batchSize}
FOR UPDATE SKIP LOCKED
</select>
<update id="updateStatusByEventId">
UPDATE local_message_table
SET status = #{status}, sent_at = #{sentAt}, updated_at = NOW()
WHERE event_id = #{eventId}
</update>
3.3.4 步骤3:手动 ACK + 持久幂等消费
@Component
public class PointsConsumer {
@RabbitListener(queues = "GRANT_POINTS.queue")
public void handle(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String eventId = message.getMessageProperties().getMessageId();
String body = new String(message.getBody(), StandardCharsets.UTF_8);
try {
// 1. 持久幂等检查(数据库去重表)
if (!dedupRepository.markConsumed(eventId)) {
log.info("重复消息,已跳过 eventId={}", eventId);
channel.basicAck(deliveryTag, false);
return;
}
// 2. 解析事件
EventEnvelope<PointsPayload> envelope = JSON.parseObject(body,
new TypeReference<EventEnvelope<PointsPayload>>(){});
PointsPayload payload = envelope.getPayload();
// 3. 执行业务(业务层也需保证幂等)
pointsService.grant(payload.getUserId(), payload.getPoints(), eventId);
// 4. 手动ACK(业务成功后再ACK)
channel.basicAck(deliveryTag, false);
// 5. 更新出库表状态(可选,用于对账)
messageMapper.updateCompleted(eventId);
} catch (Exception e) {
log.error("消费失败 eventId={}", eventId, e);
// NACK不重回队列,进入DLQ或延迟队列
channel.basicNack(deliveryTag, false, false);
}
}
}
去重Repository(推荐用数据库而非短TTL Redis锁):
@Repository
public class EventDedupRepository {
public boolean markConsumed(String eventId) {
try {
jdbcTemplate.update(
"INSERT INTO event_dedup(event_id) VALUES(?)",
eventId
);
return true; // 首次消费
} catch (DuplicateKeyException e) {
return false; // 重复消费
}
}
}
四、实际业务场景
4.1 场景1:技师完单流程
业务需求:
技师完成服务 → 订单完成 → 佣金计算 → 师傅提成 → 积分发放 → 推送通知
实现代码:
// 主流程:快速响应(<100ms)
@Transactional
public void completeService(Long orderId) {
// 1. 更新订单状态
orderMapper.updateStatus(orderId, OrderStatus.COMPLETED);
// 2. 批量插入消息(一个事务)
messageMapper.batchInsert(Arrays.asList(
buildMessage("CALCULATE_COMMISSION", orderId), // 佣金
buildMessage("MENTOR_COMMISSION", orderId), // 师傅提成
buildMessage("GRANT_POINTS", orderId), // 积分
buildMessage("SEND_NOTIFICATION", orderId) // 通知
));
}
// 异步消费者1:佣金计算
@RabbitListener(queues = "CALCULATE_COMMISSION.queue")
public void handleCommission(OrderEvent event) {
BigDecimal amount = event.getAmount().multiply(new BigDecimal("0.15"));
commissionService.create(event.getNurseId(), amount);
}
// 异步消费者2:师傅提成
@RabbitListener(queues = "MENTOR_COMMISSION.queue")
public void handleMentor(OrderEvent event) {
MentorRelation relation = mentorService.get(event.getNurseId());
if (relation != null) {
BigDecimal amount = event.getAmount().multiply(new BigDecimal("0.02"));
commissionService.create(relation.getMentorId(), amount);
}
}
4.2 场景2:用户下单支付
业务需求:
创建订单 → 支付 → 通知技师 → 发放积分 → 发送短信
实现代码:
// 阶段1:创建订单(本地事务)
@Transactional
public Order createOrder(OrderDTO dto) {
Order order = new Order();
order.setStatus(OrderStatus.PENDING);
orderMapper.insert(order);
return order;
}
// 阶段2:支付(本地事务 + 消息)
@Transactional
public void payOrder(Long orderId) {
// 1. 更新订单状态
orderMapper.updateStatus(orderId, OrderStatus.PAID);
// 2. 保存后续流程消息
messageMapper.batchInsert(Arrays.asList(
buildMessage("NOTIFY_NURSE", orderId),
buildMessage("GRANT_POINTS", orderId),
buildMessage("SEND_SMS", orderId)
));
}
关键点:
- 主流程快速完成(<100ms)
- 后续流程异步处理,不阻塞用户
- 失败自动重试,不丢消息
五、特殊场景处理
5.1 涉及资金的退款场景
方案:仍然用本地消息表,不用Seata
实现:
// 退款主流程
@Transactional
public void refundOrder(Long orderId) {
Order order = orderMapper.selectById(orderId);
// 1. 更新订单状态为退款中
order.setStatus(OrderStatus.REFUNDING);
orderMapper.updateById(order);
// 2. 保存退款消息
LocalMessage msg = buildMessage("PROCESS_REFUND", orderId);
messageMapper.insert(msg);
}
// 退款消费者(支付服务)
@RabbitListener(queues = "PROCESS_REFUND.queue")
public void handleRefund(RefundEvent event) {
String lockKey = "refund:" + event.getOrderId();
if (redisTemplate.setIfAbsent(lockKey, "1", 5, TimeUnit.MINUTES)) {
try {
// 1. 调用微信/支付宝退款API
RefundResult result = wechatPayService.refund(event);
if (result.isSuccess()) {
// 2. 创建退款记录
paymentMapper.insertRefund(event.getOrderId(), event.getAmount());
// 3. 通知订单服务更新状态
orderService.updateStatus(event.getOrderId(), OrderStatus.REFUNDED);
// 4. 发送余额返还消息
LocalMessage balanceMsg = buildMessage("ADD_BALANCE", event);
messageMapper.insert(balanceMsg);
}
} catch (Exception e) {
log.error("退款失败", e);
throw e; // 触发MQ重试
}
}
}
// 余额返还消费者(用户服务)
@RabbitListener(queues = "ADD_BALANCE.queue")
public void handleAddBalance(BalanceEvent event) {
if (redisTemplate.setIfAbsent("balance:" + event.getOrderId(), "1")) {
userMapper.addBalance(event.getUserId(), event.getAmount());
}
}
为什么不用Seata:
- 退款是异步操作(微信/支付宝需要时间处理)
- 本地消息表能保证可靠性
- 性能好,不阻塞用户
5.2 秒杀库存扣减
方案:Redis预扣减 + 本地消息表
实现:
// 秒杀下单
public boolean seckillOrder(Long productId, Long userId) {
// 1. Redis原子扣减库存
Long stock = redisTemplate.opsForValue().decrement("seckill:stock:" + productId);
if (stock < 0) {
// 库存不足
redisTemplate.opsForValue().increment("seckill:stock:" + productId);
return false;
}
// 2. 创建订单 + 消息
createOrderWithMessage(productId, userId);
return true;
}
// 异步扣减数据库库存
@RabbitListener(queues = "DEDUCT_STOCK.queue")
public void deductStock(StockEvent event) {
stockMapper.deduct(event.getProductId(), event.getQuantity());
}
六、数据对账(兜底保障)
6.1 每日对账任务
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点
public void dailyReconciliation() {
LocalDate yesterday = LocalDate.now().minusDays(1);
// 1. 查询昨日已支付订单
List<Order> paidOrders = orderMapper.selectPaidOrders(yesterday);
// 2. 查询对应支付流水
Map<Long, Payment> paymentMap = paymentService.batchQuery(orderIds);
// 3. 比对差异
List<Long> inconsistent = new ArrayList<>();
for (Order order : paidOrders) {
if (paymentMap.get(order.getId()) == null) {
inconsistent.add(order.getId());
}
}
// 4. 告警 + 自动补偿
if (!inconsistent.isEmpty()) {
alarmService.send("发现" + inconsistent.size() + "条差异");
compensationService.fix(inconsistent);
}
}
6.2 关键指标
| 指标 | 目标值 |
|---|---|
| 消息延迟 | < 10秒 |
| 对账差异率 | < 0.1% |
| 补偿成功率 | > 99% |
七、最佳实践
7.1 幂等性保证(多层防护)
7.1.1 推荐方案:持久去重表(最可靠)
优势:即使消息延迟数天后重投递也能去重,避免短TTL锁过期失效。
-- 去重表(已在2.2节定义)
CREATE TABLE event_dedup (
event_id CHAR(36) PRIMARY KEY,
consumed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
KEY idx_consumed_at (consumed_at)
);
@Repository
public class EventDedupRepository {
@Transactional
public boolean markConsumed(String eventId) {
try {
jdbcTemplate.update(
"INSERT INTO event_dedup(event_id, consumed_at) VALUES(?, NOW())",
eventId
);
return true; // 首次消费,允许处理
} catch (DuplicateKeyException e) {
return false; // 重复消费,已跳过
}
}
}
// 定期清理(保留7天即可)
@Scheduled(cron = "0 0 3 * * ?")
public void cleanupDedup() {
jdbcTemplate.update(
"DELETE FROM event_dedup WHERE consumed_at < DATE_SUB(NOW(), INTERVAL 7 DAY)"
);
}
7.1.2 业务唯一索引(核心防护)
关键业务必须有数据库唯一约束,作为最后防线。
-- 示例:积分发放
CREATE UNIQUE INDEX uk_points_event ON user_points_log(event_id);
-- 示例:退款记录
CREATE UNIQUE INDEX uk_refund_order ON refunds(order_id);
-- 示例:佣金发放
CREATE UNIQUE INDEX uk_commission_event ON commissions(event_id, nurse_id);
7.1.3 外部调用幂等键(支付/短信等)
// 退款必须带幂等键
RefundRequest request = RefundRequest.builder()
.outRefundNo(eventId) // 幂等键(对方去重)
.orderId(orderId)
.amount(amount)
.build();
RefundResult result = wechatPayService.refund(request);
// 我方也需记录,避免重复退款
refundMapper.insert(new Refund(eventId, orderId, amount));
7.1.4 状态机(辅助检查)
// 订单状态流转
@Transactional
public void updateOrderStatus(Long orderId, OrderStatus targetStatus) {
Order order = orderMapper.selectForUpdate(orderId);
if (!order.getStatus().canTransitionTo(targetStatus)) {
log.warn("状态转换非法 orderId={}, from={}, to={}",
orderId, order.getStatus(), targetStatus);
return; // 已处理过或状态非法
}
orderMapper.updateStatus(orderId, targetStatus);
}
7.1.5 Redis 短期锁(可选,限流场景)
仅用于需要即时去重的高频场景(如防刷、限流),不推荐作为主幂等手段。
// 防止5分钟内重复操作
String lockKey = "dedup:action:" + userId + ":" + actionType;
if (redisTemplate.setIfAbsent(lockKey, "1", 5, TimeUnit.MINUTES)) {
// 执行操作
}
幂等性分层总结:
- 消息层:event_dedup 表去重(T+7天有效)
- 业务层:唯一索引约束(永久有效)
- 外部层:第三方幂等键(对方去重)
- 状态层:状态机流转检查(逻辑保护)
7.2 失败重试策略(分层设计)
7.2.1 发布端重试(数据库扫描 + 抖动)
指数退避 + 随机抖动(已在 2.3.3 实现):
private LocalDateTime calculateNextRetry(int retryCount) {
long baseDelayMs = switch(retryCount) {
case 0 -> 5_000; // 5秒
case 1 -> 30_000; // 30秒
case 2 -> 300_000; // 5分钟
case 3 -> 1_800_000; // 30分钟
default -> 3_600_000; // 1小时(超限降级)
};
// ±10% 随机抖动,避免重试风暴
long jitter = (long)(baseDelayMs * 0.1 * (Math.random() * 2 - 1));
return LocalDateTime.now().plus(baseDelayMs + jitter, ChronoUnit.MILLIS);
}
// 超过5次失败 → 置为 FAILED,告警人工介入
if (msg.getRetryCount() >= 5) {
messageMapper.updateStatus(msg.getEventId(), "FAILED");
alarmService.sendAlert("消息发送失败超限", msg);
}
7.2.2 消费端重试(MQ 延迟队列 + DLX)
推荐:利用 RabbitMQ 插件/特性实现自动延迟重试,降低 DB 压力。
方案A:使用 x-delayed-message 插件(推荐)
// 1. 安装插件
// rabbitmq-plugins enable rabbitmq_delayed_message_exchange
// 2. 声明延迟交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "topic");
return new CustomExchange("maiban.events.delayed", "x-delayed-message", true, false, args);
}
// 3. 消费失败时重投延迟队列
@RabbitListener(queues = "GRANT_POINTS.queue")
public void handle(Message message, Channel channel) throws IOException {
try {
// 业务处理...
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
Integer retryCount = (Integer) message.getMessageProperties()
.getHeader("x-retry-count");
retryCount = (retryCount == null) ? 0 : retryCount + 1;
if (retryCount >= 3) {
// 超限进DLQ
channel.basicNack(deliveryTag, false, false);
dlqHandler.handle(message);
} else {
// 延迟重投
long delayMs = calculateDelay(retryCount); // 5s, 30s, 5min
message.getMessageProperties().setHeader("x-delay", delayMs);
message.getMessageProperties().setHeader("x-retry-count", retryCount);
rabbitTemplate.convertAndSend(
"maiban.events.delayed",
message.getMessageProperties().getReceivedRoutingKey(),
message
);
channel.basicAck(deliveryTag, false); // ACK原消息
}
}
}
方案B:使用 TTL + DLX(无需插件)
// 1. 声明死信交换机与延迟队列
@Bean
public Queue retryQueue() {
return QueueBuilder.durable("GRANT_POINTS.retry")
.withArgument("x-dead-letter-exchange", "maiban.events")
.withArgument("x-dead-letter-routing-key", "points.grant")
.withArgument("x-message-ttl", 5000) // 5秒后重投
.build();
}
// 2. 消费失败投递到重试队列
catch (Exception e) {
rabbitTemplate.convertAndSend("GRANT_POINTS.retry", message);
channel.basicAck(deliveryTag, false);
}
7.2.3 DLQ(死信队列)处理
@Component
public class DLQHandler {
@RabbitListener(queues = "maiban.events.dlq")
public void handleDLQ(Message message) {
String eventId = message.getMessageProperties().getMessageId();
String body = new String(message.getBody());
// 记录到数据库
dlqRecordMapper.insert(DLQRecord.builder()
.eventId(eventId)
.routingKey(message.getMessageProperties().getReceivedRoutingKey())
.body(body)
.failReason(message.getMessageProperties().getHeader("x-exception-message"))
.retryCount(message.getMessageProperties().getHeader("x-retry-count"))
.createdAt(LocalDateTime.now())
.build());
// 告警
alarmService.sendAlert("消息进入DLQ", eventId);
}
// 人工修复后批量回放
public void replayFromDLQ(List<String> eventIds) {
List<DLQRecord> records = dlqRecordMapper.selectByIds(eventIds);
for (DLQRecord record : records) {
rabbitTemplate.convertAndSend(
"maiban.events.delayed",
record.getRoutingKey(),
record.getBody(),
msg -> {
msg.getMessageProperties().setHeader("x-delay", 0);
msg.getMessageProperties().setHeader("x-retry-count", 0);
return msg;
}
);
}
}
}
7.2.4 重试策略总结
| 层级 | 机制 | 延迟策略 | 上限 | 超限处理 |
|---|---|---|---|---|
| 发布端 | DB扫描 + 定时任务 | 5s/30s/5m/30m/1h | 5次 | FAILED状态 + 告警 |
| 消费端 | MQ延迟队列 | 5s/30s/5m | 3次 | 进DLQ |
| DLQ | 人工回放 | 修复后按需 | - | 运维介入 |
7.3 监控告警与运维(可观测性)
7.3.1 核心监控指标
出库表(Outbox)指标:
// Prometheus 指标采集
@Component
public class OutboxMetrics {
@Scheduled(fixedRate = 10000) // 每10秒
public void collect() {
// 1. 各状态消息数量
Map<String, Long> countByStatus = messageMapper.countByStatus();
meterRegistry.gauge("maiban_outbox_pending", countByStatus.getOrDefault("PENDING", 0L));
meterRegistry.gauge("maiban_outbox_sending", countByStatus.getOrDefault("SENDING", 0L));
meterRegistry.gauge("maiban_outbox_failed", countByStatus.getOrDefault("FAILED", 0L));
// 2. 最老未发送消息的年龄(秒)
LocalDateTime oldest = messageMapper.selectOldestPending();
if (oldest != null) {
long ageSeconds = Duration.between(oldest, LocalDateTime.now()).getSeconds();
meterRegistry.gauge("maiban_outbox_oldest_pending_age_seconds", ageSeconds);
}
// 3. 过去1分钟发送成功/失败数
meterRegistry.counter("maiban_outbox_sent_total").increment(sentCount);
meterRegistry.counter("maiban_outbox_failed_total").increment(failedCount);
}
}
MQ 消费者指标:
@Component
public class ConsumerMetrics {
@Around("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
public Object measureConsume(ProceedingJoinPoint joinPoint) throws Throwable {
String queueName = extractQueueName(joinPoint);
Timer.Sample sample = Timer.start(meterRegistry);
try {
Object result = joinPoint.proceed();
sample.stop(Timer.builder("maiban_consume_duration_ms")
.tag("queue", queueName)
.tag("status", "success")
.register(meterRegistry));
return result;
} catch (Exception e) {
sample.stop(Timer.builder("maiban_consume_duration_ms")
.tag("queue", queueName)
.tag("status", "failure")
.register(meterRegistry));
meterRegistry.counter("maiban_consume_fail_total", "queue", queueName).increment();
throw e;
}
}
}
关键指标清单:
| 指标 | 含义 | 告警阈值 | 处理建议 |
|---|---|---|---|
maiban_outbox_pending | PENDING状态消息数 | >1000 | 扩大批量/增加实例 |
maiban_outbox_oldest_pending_age_seconds | 最老消息滞留时长 | >300秒 | 检查MQ连接/网络 |
maiban_outbox_failed | FAILED状态消息数 | >10 | 人工介入排查 |
maiban_publish_confirm_latency_ms | 发布确认延迟 | P99>500ms | MQ性能排查 |
maiban_consume_duration_ms | 消费处理时延 | P95>3s | 业务逻辑优化 |
maiban_consume_fail_total | 消费失败总数 | 增长率>5%/min | 下游服务排查 |
maiban_dlq_message_count | DLQ堆积数 | >100 | 批量回放/修复 |
maiban_dedup_hit_rate | 去重命中率 | >10% | 排查重复投递原因 |
7.3.2 告警规则配置(Prometheus AlertManager)
groups:
- name: maiban_outbox_alerts
rules:
- alert: OutboxPendingHigh
expr: maiban_outbox_pending > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "出库表堆积超阈值"
description: "PENDING消息数: {{ $value }}"
- alert: OutboxOldestMessageStuck
expr: maiban_outbox_oldest_pending_age_seconds > 300
for: 2m
labels:
severity: critical
annotations:
summary: "消息滞留超5分钟"
description: "最老消息年龄: {{ $value }}秒"
- alert: ConsumeFailureRateHigh
expr: rate(maiban_consume_fail_total[1m]) > 0.05
for: 3m
labels:
severity: warning
annotations:
summary: "消费失败率 >5%"
- alert: DLQAccumulating
expr: maiban_dlq_message_count > 100
for: 10m
labels:
severity: critical
annotations:
summary: "DLQ堆积超100条"
7.3.3 运维 Runbook(故障处理手册)
场景1:出库表堆积(PENDING >1000)
# 1. 检查扫描任务是否正常
kubectl logs -f <pod-name> | grep MessageScanTask
# 2. 检查MQ连接
kubectl exec -it <pod-name> -- curl http://rabbitmq:15672/api/connections
# 3. 临时扩容扫描批量(热配置)
curl -X POST http://admin-api/config/update \
-d '{"key":"outbox.scan.batchSize","value":"500"}'
# 4. 增加扫描实例
kubectl scale deployment order-service --replicas=5
场景2:DLQ 告警(需批量回放)
# 1. 查询 DLQ 原因分布
SELECT fail_reason, COUNT(*)
FROM dlq_records
WHERE created_at > DATE_SUB(NOW(), INTERVAL 1 HOUR)
GROUP BY fail_reason;
# 2. 修复根因(如下游服务恢复)后,批量回放
curl -X POST http://admin-api/dlq/replay \
-H "Content-Type: application/json" \
-d '{"eventIds": ["uuid1", "uuid2", ...], "delayMs": 0}'
# 3. 监控回放成功率
watch -n 1 'rabbitmqctl list_queues name messages'
场景3:消费延迟高(P95 >3s)
# 1. 定位慢消费队列
curl http://prometheus:9090/api/v1/query?query='topk(5, maiban_consume_duration_ms{quantile="0.95"})'
# 2. 查看该队列消费者日志
kubectl logs -f <pod-name> --tail=100 | grep "GRANT_POINTS"
# 3. 如是业务逻辑慢,优化代码;如是下游慢,增加超时/熔断
# 临时增加消费者并发
kubectl scale deployment marketing-service --replicas=10
场景4:Publisher Confirm 持续失败
# 1. 检查 RabbitMQ 健康
rabbitmqctl status
rabbitmqctl list_queues | grep -E "messages|consumers"
# 2. 检查网络连通性
kubectl exec -it <pod-name> -- telnet rabbitmq 5672
# 3. 降级为本地积压(等MQ恢复后自动发送)
# 无需人工干预,扫描任务会持续重试
7.3.4 日志规范与链路追踪
// 统一日志格式(携带 traceId、eventId)
@Slf4j
public class OrderService {
@Transactional
public void completeOrder(Long orderId) {
String traceId = MDC.get("traceId");
String eventId = UUID.randomUUID().toString();
log.info("订单完成 orderId={}, traceId={}, eventId={}", orderId, traceId, eventId);
// 业务逻辑...
MDC.put("eventId", eventId);
messageMapper.insert(msg);
log.info("出库消息已保存 eventId={}, bizType={}", eventId, "GRANT_POINTS");
}
}
// 消费端日志
@RabbitListener(queues = "GRANT_POINTS.queue")
public void handle(Message message) {
String eventId = message.getMessageProperties().getMessageId();
String traceId = message.getMessageProperties().getHeader("traceId");
MDC.put("traceId", traceId);
MDC.put("eventId", eventId);
log.info("开始消费 eventId={}, traceId={}", eventId, traceId);
// 业务处理...
log.info("消费完成 eventId={}, cost={}ms", eventId, cost);
}
7.3.5 可视化监控面板(Grafana)
推荐面板布局:
- 出库健康:PENDING/SENDING/FAILED 趋势图、最老消息年龄
- 发布性能:发送成功率、Confirm延迟分位图、失败原因TOP5
- 消费性能:各队列处理时延P50/P95/P99、消费失败率趋势
- 异常追踪:DLQ堆积、重复消费次数、超时告警列表
- 业务指标:订单完成→积分发放端到端时延、资金类事件成功率
八、总结
核心原则
唯一方案:本地消息表 + RabbitMQ
为什么是最优解:
- ✅ 性能优异:80ms响应,800 TPS吞吐
- ✅ 可靠性高:消息不丢失
- ✅ 实现简单:无需引入Seata Server
- ✅ 稳定性强:无单点风险
- ✅ 成本低:只需RabbitMQ
实施要点
- 所有跨服务操作都用本地消息表
- 消费者必须实现幂等性
- 定时扫描间隔5秒
- 失败重试最多3次
- 每日对账兜底
性能数据
- 主流程响应:< 100ms
- 消息延迟:< 10秒
- 系统吞吐:800 TPS
- 可用性:99.9%
维护者: 麦瓣健康技术团队
文档版本: v2.0(生产级优化版)
更新时间: 2025-10-21
