RabbitMQ 高可用优化

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!

RabbitMQ 高可用优化

来源:my.oschina.net/xiaolyuh/blog/3110049

RabbitMQ的主要作用基本上可以用8个字概括,削峰填谷异步解耦。但是引入MQ我们也不得不考虑引入MQ后带来的一些问题,如消息丢失。

在一些业务场景不一样,处理方式也就不一样,比如发短信,日志收集我们主要看吞吐量所以对消息丢失容忍度较高,这类场景基本上不用花太多时间在消息丢失问题上。

另外一种,如我们用MQ来做分布式事务,续保计算,提成的计算,这类业务对消息丢失容忍度较底,所以我们一定要考虑消息丢失的问题。这次分享的内容是怎么来最大限制的防止消息丢失,顺带提一下消息的重发和重复消费。

RabbitMQ 模型图

RabbitMQ 高可用优化

ConfirmCallback和ReturnCallback

在这个里我们主要实现了ConfirmCallback和ReturnCallback两个接口。这两个接口主要是用来发送消息后回调的。因为rabbit发送消息是只管发,至于发没发成功,发送方法不管。

  • ConfirmCallback:当消息成功到达exchange的时候触发的ack回调。

  • ReturnCallback:当消息成功到达exchange,但是没有队列与之绑定的时候触发的ack回调。发生网络分区会出现这种情况。

在这里一定要把这两个开关打开, publisher-confirms="true" publisher-returns="true"。

生产者端使用ConfirmCallback和ReturnCallback回调机制,最大限度的保证消息不丢失,对原有CorrelationData类进行扩展,来实现消息的重发,具体请看源码。

消息的日志链路跟踪

使用MQ来解耦服务,异步化处理一些复杂耗时逻辑,但是也带来了一个问题。由于异步化以后,排查问题就很不方便了,根本不知道这个消息什么时候消费,消费的日志也很不好排查。所以引入了Slf4j MDC机制将主线程的日志链路和消息的日志链路连起来,方便MQ问题的排查。

RabbitSender

import com.alibaba.fastjson.JSON;
import com.wlqq.insurance.common.enums.MetricNameEnum;
import com.wlqq.insurance.common.enums.SystemTypeEnum;
import com.wlqq.insurance.common.log.core.FisLoggerFactory;
import com.wlqq.insurance.common.mq.CorrelationData;
import com.wlqq.insurance.common.service.AlertService;
import org.slf4j.Logger;
import org.slf4j.MDC;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.util.UUID;

/**
 * Rabbit 发送消息
 *
 * @author yuhao.wang
 */

public class RabbitSender implements RabbitTemplate.ConfirmCallbackRabbitTemplate.ReturnCallbackInitializingBean {
    private final Logger logger = FisLoggerFactory.getLogger(RabbitSender.class);

    @Value("${mq.retry.count}")
    private int mqRetryCount;

    /**
     * 告警服务
     */

    @Autowired
    private AlertService alertService;

    /**
     * Rabbit MQ 客户端
     */

    private RabbitTemplate rabbitTemplate;

    /**
     * 发送MQ消息,异步
     *
     * @param exchangeName 交换机名称
     * @param routingKey   路由名称
     * @param message      发送消息体
     */

    public void sendMessage(String exchangeName, String routingKey, com.wlqq.insurance.common.mq.message.Message message) {
        Assert.notNull(message, "message 消息体不能为NULL");
        Assert.notNull(exchangeName, "exchangeName 不能为NULL");
        Assert.notNull(routingKey, "routingKey 不能为NULL");
        // 获取CorrelationData对象
        CorrelationData correlationData = this.correlationData(message, message.getMessageId());
        correlationData.setExchange(exchangeName);
        correlationData.setRoutingKey(routingKey);
        correlationData.setMessage(message);

        logger.info("发送MQ消息,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
                correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
        // 发送消息
        this.convertAndSend(exchangeName, routingKey, message, correlationData);
    }

    /**
     * RPC方式,发送MQ消息
     *
     * @param exchangeName 交换机名称
     * @param routingKey   路由名称
     * @param message      发送消息体
     */

    public void sendAndReceiveMessage(String exchangeName, String routingKey, com.wlqq.insurance.common.mq.message.Message message) {
        Assert.notNull(message, "message 消息体不能为NULL");
        Assert.notNull(exchangeName, "exchangeName 不能为NULL");
        Assert.notNull(routingKey, "routingKey 不能为NULL");
        // 获取CorrelationData对象
        CorrelationData correlationData = this.correlationData(message, message.getMessageId());
        correlationData.setExchange(exchangeName);
        correlationData.setRoutingKey(routingKey);
        correlationData.setMessage(message);

        logger.info("发送MQ消息,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
                correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);

        rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, message);
    }

    /**
     * 用于实现消息发送到RabbitMQ交换器后接收ack回调。
     * 如果消息发送确认失败就进行重试。
     *
     * @param correlationData
     * @param ack
     * @param cause
     */

    @Override
    public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) {
        CorrelationData correlationDataExtends = null;
        if (correlationData instanceof CorrelationData) {
            correlationDataExtends = (CorrelationData) correlationData;
            if (correlationDataExtends.getMdcContainer() != null) {
                // 日志链路跟踪
                MDC.setContextMap(correlationDataExtends.getMdcContainer());
            }
        }

        // 消息回调确认失败处理
        if (!ack) {
            if (correlationDataExtends != null) {
                //消息发送失败,就进行重试,重试过后还不能成功就记录到数据库
                if (correlationDataExtends.getRetryCount() < mqRetryCount) {
                    logger.info("MQ消息发送失败,消息重发,消息ID:{},重发次数:{},消息体:{}", correlationDataExtends.getId(),
                            correlationDataExtends.getRetryCount(), JSON.toJSONString(correlationDataExtends.getMessage()));

                    // 将重试次数加一
                    correlationDataExtends.setRetryCount(correlationDataExtends.getRetryCount() + 1);

                    // 重发发消息
                    this.convertAndSend(correlationDataExtends.getExchange(), correlationDataExtends.getRoutingKey(),
                            correlationDataExtends.getMessage(), correlationDataExtends);
                } else {
                    //消息重试发送失败,将消息放到数据库等待补发
                    logger.error("MQ消息重发失败,消息ID:{},消息体:{}", correlationData.getId(),
                            JSON.toJSONString(correlationDataExtends.getMessage()));

                    alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(),
                            correlationDataExtends.getExchange(), null);
                }
            }
        } else {
            logger.info("消息发送成功,消息ID:{}", correlationData.getId());
        }
    }

    /**
     * 用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
     * 在脑裂的情况下会出现这种情况。
     */

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 反序列化消息
        Object msg = rabbitTemplate.getMessageConverter().fromMessage(message);
        if (msg instanceof com.wlqq.insurance.common.mq.message.Message) {
            // 日志链路跟踪
            MDC.setContextMap(((com.wlqq.insurance.common.mq.message.Message) msg).getMdcContainer());
        }

        logger.error("MQ消息发送失败,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息体:{}",
                replyCode, replyText, exchange, routingKey, JSON.toJSONString(msg));

        alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(), exchange, null);
    }

    /**
     * 消息相关数据(消息ID)
     *
     * @param message   消息体
     * @param messageId 消息ID
     * @return
     */

    private CorrelationData correlationData(Object message, String messageId) {
        // 消息ID默认使用UUID
        if (StringUtils.isEmpty(messageId)) {
            messageId = UUID.randomUUID().toString();
        }
        return new CorrelationData(messageId, message);
    }

    /**
     * 发送消息
     *
     * @param exchange        交换机名称
     * @param routingKey      路由key
     * @param message         消息内容
     * @param correlationData 消息相关数据(消息ID)
     * @throws AmqpException
     */

    private void convertAndSend(String exchange, String routingKey, final Object message, CorrelationData correlationData) {
        try {
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
        } catch (Exception e) {
            logger.error("MQ消息发送异常,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
                    correlationData.getId(), JSON.toJSONString(message), exchange, routingKey, e);

            alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(), exchange, null);
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
}

CorrelationData

import lombok.Data;
import org.slf4j.MDC;

import java.util.Map;

/**
 * 发送消息的相关数据
 *
 * @author yuhao.wang
 */

@Data
public class CorrelationData extends org.springframework.amqp.rabbit.support.CorrelationData {


    /**
     * MDC容器
     * 获取父线程MDC中的内容,做日志链路
     */

    private Map<String, String> mdcContainer = MDC.getCopyOfContextMap();

    /**
     * 消息体
     */

    private volatile Object message;

    /**
     * 交换机名称
     */

    private String exchange;

    /**
     * 路由key
     */

    private String routingKey;

    /**
     * 重试次数
     */

    private int retryCount = 0;

    public CorrelationData(String id) {
        super(id);
    }

    public CorrelationData(String id, Object data) {
        this(id);
        this.message = data;
    }
}

Message

/**
 * MQ消息的父类消息体
 *
 * @author yuhao.wang
 */

@Data
public class Message implements Serializable {
    private static final long serialVersionUID = -4731326195678504565L;

    /**
     * MDC容器
     * 获取父线程MDC中的内容,做日志链路
     */

    private Map<String, String> mdcContainer = MDC.getCopyOfContextMap();

    /**
     * 消息ID(消息的唯一标示)
     */

    private String messageId;
}

AbstractConsumer

/**
 * 默认消费者
 *
 * @author yuhao.wang3
 */

public abstract class AbstractConsumer implements MessageListener {
    private static final Logger LOGGER = FisLoggerFactory.getLogger(AbstractConsumer.class);

    @Override
    public void onMessage(Message msg) {
        String body = null;

        try {
            // 日志链路跟踪逻辑
            body = new String(msg.getBody(), "utf-8");
            DefaultMessage message = JSON.parseObject(body, DefaultMessage.class);
            Map<String, String> container = message.getMdcContainer();
            if (container != null) {
                // 日志链路跟踪
                MDC.setContextMap(message.getMdcContainer());
            }
        } catch (Exception e) {
            LOGGER.warn("没有找到MQ消息日志链路数据,无法做日志链路追踪");
        }

        try {
            // 处理消息逻辑
            doMessage(msg);
            LOGGER.info("成功处理MQ消息, 消息体:{}", body);
        } catch (Exception e) {
            LOGGER.error("处理MQ消息异常 {}, 消息体:{}", JSON.toJSONString(msg), body, e);
        }
    }

    /**
     * 处理消息的实现方法
     *
     * @param msg
     */

    public abstract void doMessage(Message msg);
}

源码

https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

推荐好文

强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!

分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!

能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!

RabbitMQ 高可用优化

原文始发于微信公众号(Java笔记虾):RabbitMQ 高可用优化