SpringBoot + RabbitMQ:轻松实现邮件大批量异步推送!

在之前的文章中,我们详细介绍了 SpringBoot 整合 mail 实现各类邮件的自动推送服务。

但是这类服务通常不稳定,当出现网络异常的时候,会导致邮件推送失败;同时,如果大批量的同步推送邮件,可靠性也不高。

本篇文章将介绍另一种高可靠的服务架构,实现邮件 100% 被投递成功。类似的短信自动发送等服务也大体相同。

01、先来一张流程图

本文内容主要围绕这个流程图展开,利用 RabbitMQ 消息队列来实现邮件 100% 被投递,内容涵盖了 RabbitMQ 很多知识点,如:

  • 生产者和消费者模型
  • 消息发送确认机制
  • 消费确认机制
  • 消息的重新投递
  • 消费幂等性, 等等

02、实现思路

  • 1.创建一台 Linux 服务器,并安装 RabbitMQ
  • 2.开放 QQ 邮箱或者其它邮箱授权码,用于发送邮件
  • 3.创建邮件发送项目并编写代码
  • 4.发送邮件测试
  • 5.消息发送失败处理

03、环境准备

获取邮箱授权码的目的,主要是为了通过代码进行发送邮件,例如 QQ 邮箱授权码获取方式,如下图:

点击【开启】按钮,然后发送短信,即可获取授权码,该授权码就是配置文件spring.mail.password需要的密码!

04、项目介绍

  • springboot版本:2.1.5.RELEASE
  • RabbitMQ版本:3.6.5
  • SendMailUtil:发送邮件工具类
  • ProduceServiceImpl:生产者,发送消息
  • ConsumerMailService:消费者,消费消息,发送邮件

05、代码实现

5.1、创建项目

在 IDEA 下创建一个名称为smail的 Springboot 项目,pom文件中加入amqpmail

<dependencies>
    <!--spring boot核心-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--spring boot 测试-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <!--springmvc web-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--开发环境调试-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <optional>true</optional>
    </dependency>
    <!--mail 支持-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-mail</artifactId>
    </dependency>
    <!--amqp 支持-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!-- commons-lang3 -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.4</version>
    </dependency>
    <!--lombok-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.10</version>
    </dependency>
</dependencies>

5.2、配置rabbitMQ、mail

application.properties文件中,配置amqpmail

#rabbitmq
spring.rabbitmq.host=192.168.0.103
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100

# mail
spring.mail.default-encoding=UTF-8
spring.mail.host=smtp.qq.com
spring.mail.username=xxxx@qq.com
spring.mail.password=获取的邮箱授权码
spring.mail.from=xxxx@qq.com
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true

其中,spring.mail.password第四步中获取的授权码,同时usernamefrom要一致!

5.3、RabbitConfig配置类

@Configuration
@Slf4j
public class RabbitConfig {

    // 发送邮件
    public static final String MAIL_QUEUE_NAME = "mail.queue";
    public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
    public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());

        // 消息是否成功发送到Exchange
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息成功发送到Exchange");
            } else {
                log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
            }
        });

        // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
        rabbitTemplate.setMandatory(true);
        // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
        });

        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public Queue mailQueue() {
        return new Queue(MAIL_QUEUE_NAME, true);
    }

    @Bean
    public DirectExchange mailExchange() {
        return new DirectExchange(MAIL_EXCHANGE_NAME, truefalse);
    }

    @Bean
    public Binding mailBinding() {
        return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
    }
}

5.4、Mail 邮件实体类

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Mail {

    @Pattern(regexp = "^([a-z0-9A-Z]+[-|\\.]?)+[a-z0-9A-Z]@([a-z0-9A-Z]+(-[a-z0-9A-Z]+)?\\.)+[a-zA-Z]{2,}$", message = "邮箱格式不正确")
    private String to;

    @NotBlank(message = "标题不能为空")
    private String title;

    @NotBlank(message = "正文不能为空")
    private String content;

    private String msgId;// 消息id
}

5.5、SendMailUtil邮件发送类

@Component
@Slf4j
public class SendMailUtil {

    @Value("${spring.mail.from}")
    private String from;

    @Autowired
    private JavaMailSender mailSender;

    /**
     * 发送简单邮件
     *
     * @param mail
     */

    public boolean send(Mail mail) {
        String to = mail.getTo();// 目标邮箱
        String title = mail.getTitle();// 邮件标题
        String content = mail.getContent();// 邮件正文

        SimpleMailMessage message = new SimpleMailMessage();
        message.setFrom(from);
        message.setTo(to);
        message.setSubject(title);
        message.setText(content);

        try {
            mailSender.send(message);
            log.info("邮件发送成功");
            return true;
        } catch (MailException e) {
            log.error("邮件发送失败, to: {}, title: {}", to, title, e);
            return false;
        }
    }
}

5.6、ProduceServiceImpl 生产者类

@Service
public class ProduceServiceImpl implements ProduceService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public boolean send(Mail mail) {
        //创建uuid
        String msgId = UUID.randomUUID().toString().replaceAll("-""");
        mail.setMsgId(msgId);
        
        //发送消息到rabbitMQ
        CorrelationData correlationData = new CorrelationData(msgId);
        rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME, MessageHelper.objToMsg(mail), correlationData);

        return true;
    }
}

5.7、ConsumerMailService 消费者类

@Component
@Slf4j
public class ConsumerMailService {

    @Autowired
    private SendMailUtil sendMailUtil;

    @RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)
    public void consume(Message message, Channel channel) throws IOException {
        //将消息转化为对象
        String str = new String(message.getBody());
        Mail mail = JsonUtil.strToObj(str, Mail.class);
        log.info("收到消息: {}", mail.toString());

        MessageProperties properties = message.getMessageProperties();
        long tag = properties.getDeliveryTag();

        boolean success = sendMailUtil.send(mail);
        if (success) {
            channel.basicAck(tag, false);// 消费确认
        } else {
            channel.basicNack(tag, falsetrue);
        }
    }
}

5.8、TestController 控制层类

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {

    @Autowired
    private ProduceService testService;

    @PostMapping("send")
    public boolean sendMail(Mail mail) {
        return testService.send(mail);
    }
}

06、测试服务

启动 SpringBoot 服务之后,用 postman 模拟请求接口。

查看控制台信息。

查询接受者邮件信息。

邮件发送成功!

07、消息发送失败处理

虽然,上面案例可以成功的实现消息的发送,但是上面的流程很脆弱,例如: rabbitMQ 突然蹦了、邮件发送失败了、重启 rabbitMQ 服务器出现消息重复消费,应该怎处理呢?

很显然,我们需要对原有的逻辑进行升级改造,因此我们需要引入数据库来记录消息的发送情况。

7.1、创建消息投递日志表

CREATE TABLE `msg_log` (
  `msg_id` varchar(255NOT NULL DEFAULT '' COMMENT '消息唯一标识',
  `msg` text COMMENT '消息体, json格式化',
  `exchange` varchar(255NOT NULL DEFAULT '' COMMENT '交换机',
  `routing_key` varchar(255NOT NULL DEFAULT '' COMMENT '路由键',
  `status` int(11NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费',
  `try_count` int(11NOT NULL DEFAULT '0' COMMENT '重试次数',
  `next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`msg_id`),
  UNIQUE KEY `unq_msg_id` (`msg_id`USING BTREE
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递日志';

7.2、编写 MsgLog 相关服务类

public interface MsgLogService {

    /**
     * 插入消息日志
     * @param msgLog
     */

    void insert(MsgLog msgLog);

    /**
     * 更新消息状态
     * @param msgId
     * @param status
     */

    void updateStatus(String msgId, Integer status);

    /**
     * 查询消息
     * @param msgId
     * @return
     */

    MsgLog selectByMsgId(String msgId);
}

7.3、改写服务逻辑

在生产服务类中,新增数据写入。

同时,在RabbitConfig服务配置,当消息发送成功之后,新增更新消息状态逻辑。

改造消费者ConsumerMailService,每次消费的时候,从数据库中查询,如果消息已经被消费,不用再重复发送数据!

这样即可保证,如果 rabbitMQ 服务器,即使重启之后重新推送消息,通过数据库判断,也不会重复消费进而发生业务异常!

7.4、利用定数任务对消息投递失败进行补偿

当 rabbitMQ 服务器突然挂掉之后,生成者就无法正常进行投递数据,此时因为消息已经被记录到数据库,因此我们可以利用定数任务查询出没有投递成功的消息,进行补偿投递。

利用定数任务,对投递失败的消息进行补偿投递,基本可以保证消息 100% 消费成功!

08、总结

本文主要是通过发送邮件这个业务案例,来讲解 Springboot 与 rabbitMQ 技术的整合和使用!

当然解决这个业务需求的技术方案还有很多,例如 Springboot 与 rocketMQ 也可以实现这个需求,这个会在后期的文章讲解!

想要获取项目源代码的小伙伴,可以通过如下地址获取!

https://gitee.com/pzblogs/spring-boot-example-demo

09、参考

1、https://www.jianshu.com/p/dca01aad6bc8

最后,送大家一份Java 生态知识体系/面试必看资料还在找工作的小伙伴看过来啦!好多读者都问我有没有面试题神器,我苦苦整了一份内部资料,Java 生态知识体系/面试必看资料!
据说已经有小伙伴通过这套资料,成功的入职了蚂蚁金服、字节跳动等大厂
而且,这些资料不是扫描版的,里面的文字都可以直接复制,非常便于我们学习:




如果你想获得完整PDF可以通过以下方式获得

面试大全怎么获取:
1.加我微信备注【Java极客技术 2.后台回复【pdf即可。

喜欢就分享
认同就点赞

支持就在看

一键四连,你的offer也四连

相关推荐

  • CCL2024·第二十三届中国计算语言大会讲习班公布
  • 会议开幕倒计时三天!CCAC 2024 主要报告介绍
  • 大模型集体失智!9.11和9.9哪个大,几乎全翻车了
  • [开源]一款MES系统基础上进行二次开发的ERP系统,高效智能的运营
  • Meta开发System 2蒸馏技术,Llama 2对话模型任务准确率接近100%
  • ECCV 2024 | 模型逆向攻击高性能新范式,人脸隐私安全问题新思考
  • 对齐全量微调!这是我看过最精彩的LoRA改进
  • 多人同时导出 Excel 干崩服务器?大佬给出的解决方案太优雅了!
  • 6个强大且流行的Python爬虫库,强烈推荐!
  • Spring Cloud Eureka快读入门Demo
  • 菊花开!!!带您深入菊花链
  • 6步!!!用 Electron开发一个记事本
  • 113K Star微软甄选!!!用这个框架开发百万人爱的VSCode
  • 月之暗面新活:Kimi浏览器插件
  • 聪明的大模型都认为9.11 大于 9.9……
  • AMD与国产AI芯势力创始人领衔!2024全球AI芯片峰会首批嘉宾公布,报名正式开启
  • 顶级AI投资人发起中国大模型群聊:十大趋势、具身智能、AI超级应用
  • 一年变现1500w,如何从0-1做一家小吃加盟店?
  • 博客园再次发出求救信
  • 牛皮!我被银行码农的工资惊到了