SpringBoot连接多RabbitMQ源

Springboot系列教程

在实际开发中,很多场景需要异步处理,这时就需要用到RabbitMQ,而且随着场景的增多程序可能需要连接多个RabbitMQ。SpringBoot本身提供了默认的配置可以快速配置连接RabbitMQ,但是只能连接一个RabbitMQ,当需要连接多个RabbitMQ时,默认的配置就不太适用了,需要单独编写每个连接。

在SpringBoot框架中,我们常用的两个类一般是:

  • RabbitTemplate:作为生产、消费消息使用;
  • RabbitAdmin:作为申明、删除交换机和队列,绑定和解绑队列和交换机的绑定关系使用。

所以我们连接多个RabbitMQ就需要重新建立连接、重新实现这两个类。
代码如下:

配置

application.properties配置文件需要配置两个连接:

server.port=8080

# rabbitmq
v2.spring.rabbitmq.host=host
v2.spring.rabbitmq.port=5672
v2.spring.rabbitmq.username=username
v2.spring.rabbitmq.password=password
v2.spring.rabbitmq.virtual-host=virtual-host
#consume 手动 ack
v2.spring.rabbitmq.listener.simple.acknowledge-mode=manual
#1.当mandatory标志位设置为true时,
#   如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
#   那么broker会调用basic.return方法将消息返还给生产者;
#2.当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,
#   mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
#   否则就将消息return给发送者;
v2.spring.rabbitmq.template.mandatory=true
#publisher confirms 发送确认
v2.spring.rabbitmq.publisher-confirms=true
#returns callback :
#   1.未送达exchange
#   2.送达exchange却未送道queue的消息 回调returnCallback.(注意)出现2情况时,publisher-confirms 回调的是true
v2.spring.rabbitmq.publisher-returns=true
v2.spring.rabbitmq.listener.simple.prefetch=5

# rabbitmq
v1.spring.rabbitmq.host=host
v1.spring.rabbitmq.port=5672
v1.spring.rabbitmq.username=username
v1.spring.rabbitmq.password=password
v1.spring.rabbitmq.virtual-host=virtual-host
#consume 手动 ack
v1.spring.rabbitmq.listener.simple.acknowledge-mode=manual
#1.当mandatory标志位设置为true时,
#   如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
#   那么broker会调用basic.return方法将消息返还给生产者;
#2.当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,
#   mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
#   否则就将消息return给发送者;
v1.spring.rabbitmq.template.mandatory=true
#publisher confirms 发送确认
v1.spring.rabbitmq.publisher-confirms=true
#returns callback :
#   1.未送达exchange
#   2.送达exchange却未送道queue的消息 回调returnCallback.(注意)出现2情况时,publisher-confirms 回调的是true
v1.spring.rabbitmq.publisher-returns=true
v1.spring.rabbitmq.listener.simple.prefetch=5

重写连接工厂

需要注意的是,在多源的情况下,需要在某个连接加上@Primary注解,表示主连接,默认使用这个连接

package com.example.config.rabbitmq;

import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * Created by shuai on 2019/4/23.
 */
@Configuration
public class MultipleRabbitMQConfig {

    @Bean(name = "v2ConnectionFactory")
    public CachingConnectionFactory hospSyncConnectionFactory(
            @Value("${v2.spring.rabbitmq.host}") String host,
            @Value("${v2.spring.rabbitmq.port}") int port,
            @Value("${v2.spring.rabbitmq.username}") String username,
            @Value("${v2.spring.rabbitmq.password}") String password,
            @Value("${v2.spring.rabbitmq.virtual-host}") String virtualHost,
            @Value("${v2.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
            @Value("${v2.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(publisherConfirms);
        connectionFactory.setPublisherReturns(publisherReturns);
        return connectionFactory;
    }

    @Bean(name = "v2RabbitTemplate")
    public RabbitTemplate firstRabbitTemplate(
            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
            @Value("${v2.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
        RabbitTemplate v2RabbitTemplate = new RabbitTemplate(connectionFactory);
        v2RabbitTemplate.setMandatory(mandatory);
        v2RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
            if (!ack) {
//                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", this.name, JSON.toJSONString(object));
            } else {
//                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", this.name, JSON.toJSONString(object));
            }
        });
        v2RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
//                LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
        });
        return v2RabbitTemplate;
    }

    @Bean(name = "v2ContainerFactory")
    public SimpleRabbitListenerContainerFactory hospSyncFactory(
            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
            @Value("${v2.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
            @Value("${v2.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
        factory.setPrefetchCount(prefetch);
        return factory;
    }

    @Bean(name = "v2RabbitAdmin")
    public RabbitAdmin iqianzhanRabbitAdmin(
            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }


    // mq主连接
    @Bean(name = "v1ConnectionFactory")
    @Primary
    public CachingConnectionFactory publicConnectionFactory(
            @Value("${v1.spring.rabbitmq.host}") String host,
            @Value("${v1.spring.rabbitmq.port}") int port,
            @Value("${v1.spring.rabbitmq.username}") String username,
            @Value("${v1.spring.rabbitmq.password}") String password,
            @Value("${v1.spring.rabbitmq.virtual-host}") String virtualHost,
            @Value("${v1.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
            @Value("${v1.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(publisherConfirms);
        connectionFactory.setPublisherReturns(publisherReturns);
        return connectionFactory;
    }

    @Bean(name = "v1RabbitTemplate")
    @Primary
    public RabbitTemplate publicRabbitTemplate(
            @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,
            @Value("${v1.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
        RabbitTemplate v1RabbitTemplate = new RabbitTemplate(connectionFactory);
        v1RabbitTemplate.setMandatory(mandatory);
        v1RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
            if (!ack) {
//                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", this.name, JSON.toJSONString(object));
            } else {
//                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", this.name, JSON.toJSONString(object));
            }
        });
        v1RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
//                LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
        });
        return v1RabbitTemplate;
    }

    @Bean(name = "v1ContainerFactory")
    @Primary
    public SimpleRabbitListenerContainerFactory insMessageListenerContainer(
            @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,
            @Value("${v1.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
            @Value("${v1.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
        factory.setPrefetchCount(prefetch);
        return factory;
    }

    @Bean(name = "v1RabbitAdmin")
    @Primary
    public RabbitAdmin publicRabbitAdmin(
            @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

创建Exchange、Queue并绑定

再实现RabbitAdmin后,我们就需要根据RabbitAdmin创建对应的交换机和队列,并建立绑定关系

package com.example.config.rabbitmq;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * 创建Queue、Exchange并建立绑定关系
 * Created by shuai on 2019/5/16.
 */
@Configuration
public class MyRabbitMQCreateConfig {

    @Resource(name = "v2RabbitAdmin")
    private RabbitAdmin v2RabbitAdmin;

    @Resource(name = "v1RabbitAdmin")
    private RabbitAdmin v1RabbitAdmin;

    @PostConstruct
    public void RabbitInit() {
        v2RabbitAdmin.declareExchange(new TopicExchange("exchange.topic.example.new", true, false));
        v2RabbitAdmin.declareQueue(new Queue("queue.example.topic.new", true));
        v2RabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(new Queue("queue.example.topic.new", true))        //直接创建队列
                        .to(new TopicExchange("exchange.topic.example.new", true, false))    //直接创建交换机 建立关联关系
                        .with("routing.key.example.new"));    //指定路由Key
    }
}

生产者

为了后续验证每个连接都建立成功,并且都能生产消息,生产者这里分别使用新生成的RabbitTemplate发送一条消息。

package com.example.topic;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class TopicProducer {

    @Resource(name = "v1RabbitTemplate")
    private RabbitTemplate v1RabbitTemplate;

    @Resource(name = "v2RabbitTemplate")
    private RabbitTemplate v2RabbitTemplate;

    public void sendMessageByTopic() {
        String content1 = "This is a topic type of the RabbitMQ message example from v1RabbitTemplate";
        v1RabbitTemplate.convertAndSend(
                "exchange.topic.example.new",
                "routing.key.example.new",
                content1);

        String content2 = "This is a topic type of the RabbitMQ message example from v2RabbitTemplate";
        v2RabbitTemplate.convertAndSend(
                "exchange.topic.example.new",
                "routing.key.example.new",
                content2);
    }
}

消费者

这里需要注意在配置消费队列时,需要标识ContainerFactory

package com.example.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "queue.example.topic.new", containerFactory = "v2ContainerFactory")
public class TopicConsumer {

    @RabbitHandler
    public void consumer(String message) {
        System.out.println(message);
    }
}

这样就完成了SpringBoot连接多个RabbitMQ源的示例了,再写一段测试代码验证下。

测试验证

package com.example.test;

import com.example.topic.TopicProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQMultipleTest {

    @Autowired
    private TopicProducer topicProducer;


    @Test
    public void topicProducerTest() {
        topicProducer.sendMessageByTopic();
    }
}

执行测试代码,验证结果为:

验证结果

验证SpringBoot连接多RabbitMQ源成功!

github地址:Spring Boot 教程、技术栈、示例代码
联系我

扫码关注公众号:java之旅

Image placeholder
lman
未设置
  50人点赞

没有讨论,发表一下自己的看法吧

推荐文章
SpringBoot个人应用开发框架(SpringBoot版本2.1)+IDEA

前言: 此笔记为本人首个SpringBoot项目框架学习实践记录,期间参考了许多大神的笔记和心得。 参考文档如下: 项目git地址: 一、创建SpringBoot工程 1.1创建父POM工程结

SpringBoot整合RabbitMQ

SpringBoot整合RabbitMQSpringBoot框架已经提供了RabbitMQ的使用jar包,开发人员在使用RabbitMQ的时候只需要引用jar包简单的配置一下就可以使用RabbitMQ

SpringBoot 整合 Dubbo

1.整合dubbo 有的人或许会说已经有spring-cloud了,你整合dubbo干什么,其实没啥意图,主要就是想整合一下,毕竟dubbo在国内使用的还是很多的,你会一点点总不至于让你显得那么尴尬。

再见 Spring Boot 1.X ,Spring Boot 2.X 走向舞台中心

2019年8月6日,Spring官方在其博客宣布,SpringBoot1.x停止维护,SpringBoot1.x生命周期正式结束。其实早在2018年7月30号,Spring官方就已经在博客进行过预告,

SpringBoot 集成 JWT 实现 token 验证,token 注销

什么是JWT Jsonwebtoken(JWT),是为了在网络应用环境间传递声明而执行的一种基于JSON的开放标准((RFC7519).定义了一种简洁的,自包含的方法用于通信双方之间以JSON对象的形

springboot 多数据源,最简单的整合方式

简介 相信大家有配置过多数据源,或者即将配置多数据的朋友们,会发现网上大概有以下几种方案: 1.使用AOP切片进行动态数据源切换 2.使用MapperScan的basePackages配置不同的map

SpringBoot2.0 支持 https 访问

买了dapideng.com,自然要上https。 其实在之前的博客中,也早有提及配置证书的事儿,只不过这次变成了springboot,它内置了tomcat容器,和把项目打包放在tomcat下面不太一

SpringBoot 中的 Servlet Web 容器

1.前言 SpringBoot支持一下嵌入式Servlet容器: SpringBoot2.0.3.RELEASE需要Java8或9以及SpringFramework5.0.7.RELEASE或更高版本

SpringBoot 深度调优,让你的项目飞起来!

项目调优作为一名工程师,项目调优这事,是必须得熟练掌握的事情。在SpringBoot项目中,调优主要通过配置文件和配置JVM的参数的方式进行。一、修改配置文件关于修改配置文件application.p

使用Jenkins一键打包部署SpringBoot应用,就是这么6!

SpringBoot实战电商项目mall(25k+star)地址:https://github.com/macrozheng/mall 摘要任何简单操作的背后,都有一套相当复杂的机制。本文将以Spri

通过一个示例了解kafka connect连接器

什么是kafka连接器connect在实际工作中使用kafka,有时候会有类似这样的场景。我们需要把某些数据源的数据导入到kafka,或者把kafka作为数据源导出数据。或者两种场景的需求都要。这算是

Spring-SpringAOP原理,手写Spring事务框架

一、Spring核心知识Spring是一个开源框架,Spring是于2003年兴起的一个轻量级的Java开发框架,由RodJohnson在其著作ExpertOne-On-OneJ2EEDevelopm

云端的生存之道,第 2 单元:将 Spring Boot 应用程序连接到云托管的数据库

前提条件 本系列教程的第1部分,因为本教程直接以第1部分中的课程内容和完成的操作为基础。 一个IBMCloud帐户 云原生数据持久性 IBMCloud提供了许多可持久存储数据的选项。在本教程中,我

基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送

#介绍 基于Hyperf+WebSocket+RabbitMQ实现的一个简单大屏幕的消息推送。 #思路 利用WebSocket协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端id。订阅发

你知道如何自动保存 Spring Boot 应用进程号吗

1.前言 欢迎阅读SpringBoot2实战系列文章PID对于系统运维来说并不陌生,但是对于一些开发者特别是新手还是要简单介绍一下的。它是ProcessID的简称,是系统分配给一个进程的唯一标识符,

云端的生存之道,第 1 单元:将 Spring Boot 部署到 Kubernetes

初始化Kubernetes集群 第一步是初始化IBMCloud上的Kubernetes集群。IBMCloud可能需要几分钟时间来启动新的Kubernetes集群;因此,通过先执行初始化操作,可以在后台

Spring Boot自动装配整理

首先写一个我们自己的HelloWorld配置类 1、基于"注解驱动"实现@Enable模块 @ConfigurationpublicclassHelloWorldConfiguration{@Bean

Spring Boot 高效数据聚合之道

项目地址和示例代码:https://github.com/lvyahui8/spring-boot-data-aggregator 背景 接口开发是后端开发中最常见的场景,可能是RESTFul接口,也

Spring boot 如何快速的配置多个 Redis 数据源

Redis简介 redis多数据源主要的运用场景是在需要使用多个redis服务器或者使用多个redis库,本文采用的是fastdep依赖集成框架,快速集成Redis多数据源并集成lettuce连接池,

Spring Boot到底是怎么运行的,你知道吗?

导读SpringBoot方式的项目开发已经逐步成为Java应用开发领域的主流框架,它不仅可以方便地创建生产级的Spring应用程序,还能轻松地通过一些注解配置与目前比较流行的微服务框架SpringCl

Spring Boot 面试,一个问题就干趴下了!

随着SpringBoot使用越来越广泛,SpringBoot已经成为Java程序员面试的知识点,很多同学对SpringBoot理解不是那么深刻,经常就会被几个连环跑给干趴下了!比如下面这一段的Spri

使用 Docker 部署 Spring Boot 项目

Docker技术发展为微服务落地提供了更加便利的环境,使用Docker部署SpringBoot其实非常简单,这篇文章我们就来简单学习下。首先构建一个简单的SpringBoot项目,然后给项目添加Doc

Spring Boot 中关于自定义异常处理的套路!

在SpringBoot项目中,异常统一处理,可以使用Spring中@ControllerAdvice来统一处理,也可以自己来定义异常处理方案。SpringBoot中,对异常的处理有一些默认的策略,我们

Github 上 Star 最多的 Spring Boot 个人开源学习项目

2016年,在一次技术调研的过程中认识到了SpringBoot,试用之后便一发不可收拾的爱上它。为了防止学习之后忘记,就在网上连载了 SpringBoot系列文章,没想到这一开始便与SpringBoo

Spring Boot 中的响应式编程和 WebFlux 入门

Spring5.0中发布了重量级组件Webflux,拉起了响应式编程的规模使用序幕。WebFlux使用的场景是异步非阻塞的,使用Webflux作为系统解决方案,在大多数场景下可以提高系统吞吐量。Spr