菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
335
0

Java框架-ActiveMQ

原创
05/13 14:22
阅读数 89427
 适用场景:
商品修改了名称,商品详情模块、搜索模块、其他业务模块中的“商品名称”也修改
 
ActiveMQ的消息形式
对于消息的传递有两种类型
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
  · StreamMessage -- Java原始值的数据流
  · MapMessage--一套名称-值对
  · TextMessage--一个字符串对象
  · ObjectMessage--一个序列化的 Java对象
  · BytesMessage--一个字节的数据流
 
安装步骤:
第一步: 把ActiveMQ 的压缩包上传到Linux系统。
第二步:解压缩。
第三步:启动。
使用bin目录下的activemq命令
 
启动:
[root@localhost bin]# ./activemq start
关闭:
[root@localhost bin]# ./activemq stop
查看状态:
[root@localhost bin]# ./activemq status
 
工程需要添加jar包:
 
 
1.1. Queue
1.1.1.    Producer
生产者:生产消息,发送端。
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
@Test
      public void testQueueProducer() throws Exception {
           // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
           //brokerURL服务器的ip及端口号
           ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
           // 第二步:使用ConnectionFactory对象创建一个Connection对象。
           Connection connection = connectionFactory.createConnection();
           // 第三步:开启连接,调用Connection对象的start方法。
           connection.start();
           // 第四步:使用Connection对象创建一个Session对象。
           //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
           //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
           // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
           //参数:队列的名称。
           Queue queue = session.createQueue("queue-test");
           // 第六步:使用Session对象创建一个Producer对象。
           MessageProducer producer = session.createProducer(queue);
           // 第七步:创建一个Message对象,创建一个TextMessage对象。
           /*TextMessage message = new ActiveMQTextMessage();
           message.setText("hello activeMq,this is my first test.");*/
           TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
           // 第八步:使用Producer对象发送消息。
           producer.send(textMessage);
           // 第九步:关闭资源。
           producer.close();
           session.close();
           connection.close();
      }
 
1.1.2.    Consumer
消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
public class QueueCustomer {
      @Test
      public void recieve() throws Exception{
           // 1.创建一个连接工厂 Activemq的连接工厂)参数:指定连接的activemq的服务
           ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
           // 2.获取连接
           Connection connection = connectionFactory.createConnection();
           // 3.开启连接
           connection.start();
           // 4.根据连接对象创建session
           // 第一个参数:表示是否使用分布式事务(JTA
           // 第二个参数:如果第一个参数为false,第二个参数才有意义;表示使用的应答模式 :自动应答,手动应答.这里选择自动应答。
           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5.根据session创建Destination(目的地,queue topic,这里使用的是queue)
           Queue queue = session.createQueue("queue-test");
           // 6.创建消费者
           MessageConsumer consumer = session.createConsumer(queue);
           //7.接收消息
           //第一种
//               while(true){
//                    //接收消息 (参数的值表示的是超过一定时间 以毫秒为单位就断开连接)
//                    Message message = consumer.receive(10000);
//                    //如果message为空,没有接收到消息了就跳出
//                    if(message==null){
//                          break;
//                    }
//                   
//                    if(message instanceof TextMessage){
//                          TextMessage messaget = (TextMessage)message;
//                          System.out.println(">>>获取的消息内容:"+messaget.getText());//获取消息内容
//                    }
//               }
           System.out.println("start");
           //第二种:
                 //设置监听器,其实开启了一个新的线程。
           consumer.setMessageListener(new MessageListener() {
                 //接收消息,如果有消息才进入,如果没有消息就不会进入此方法
                 @Override
                 public void onMessage(Message message) {
                      if(message instanceof TextMessage){
                            TextMessage messaget = (TextMessage)message;
                            try {
                                  //获取消息内容
                                  System.out.println(">>>获取的消息内容:"+messaget.getText());
                            } catch (JMSException e) {
                                  e.printStackTrace();
                            }
                      }
                 }
           });
      System.out.println("end");
           Thread.sleep(10000);//睡眠10秒钟。
          
           // 9.关闭资源
           consumer.close();
           session.close();
           connection.close();
      }
}
 
1.2. Topic
 
 
1.2.1.    Producer
使用步骤:
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
 
public class TopicProducer {
      @Test
      public void send() throws Exception{
           // 1.创建一个连接工厂 Activemq的连接工厂)参数:指定连接的activemq的服务
           ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
           // 2.获取连接
           Connection connection = connectionFactory.createConnection();
           // 3.开启连接
           connection.start();
           // 4.根据连接对象创建session
           // 第一个参数:表示是否使用分布式事务(JTA
           // 第二个参数:如果第一个参数为false,第二个参数才有意义;表示使用的应答模式 :自动应答,手动应答.这里选择自动应答。
           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
           // 5.根据session创建Destination(目的地,queue topic,这里使用的是topic)
           Topic topic = session.createTopic("topic-test");//---------------------
           // 6.创建生产者
           MessageProducer producer = session.createProducer(topic);
           // 7.构建消息对象,(构建发送消息的内容) 字符串类型的消息格式(TEXTMessage
           TextMessage textMessage = new ActiveMQTextMessage();
           textMessage.setText("发送消息123");// 消息的内容
           // 8.发送消息
           producer.send(textMessage);
           // 9.关闭资源
           producer.close();
           session.close();
           connection.close();
      }
}
 
1.2.2.    Consumer
消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
public class TopicCustomer1 {
      @Test
      public void reieve() throws Exception{
 
           // 1.创建一个连接工厂 Activemq的连接工厂)参数:指定连接的activemq的服务
           ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
           // 2.获取连接
           Connection connection = connectionFactory.createConnection();
           // 3.开启连接
           connection.start();
           // 4.根据连接对象创建session
           // 第一个参数:表示是否使用分布式事务(JTA
           // 第二个参数:如果第一个参数为false,第二个参数才有意义;表示使用的应答模式 :自动应答,手动应答.这里选择自动应答。
           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
           // 5.根据session创建Destination(目的地,queue topic,这里使用的是queue)
           Topic topic = session.createTopic("topic-test");//---------------------
           // 6.创建消费者
           MessageConsumer consumer = session.createConsumer(topic);
           // 7.接收消息
           while(true){
                 //接收消息 (参数的值表示的是超过一定时间 以毫秒为单位就断开连接)
                 Message message = consumer.receive(100000);
                 //如果message为空,没有接收到消息了就跳出
                 if(message==null){
                      break;
                 }
                
                 if(message instanceof TextMessage){
                      TextMessage messaget = (TextMessage)message;
                      System.out.println(">>>获取的消息内容:"+messaget.getText());//获取消息内容
                 }
           }
           // 第二种:
           // 设置监听器,其实开启了一个新的线程。
//         consumer.setMessageListener(new MessageListener() {
//               // 接收消息,如果有消息才进入,如果没有消息就不会进入此方法
//               @Override
//               public void onMessage(Message message) {
//                    if (message instanceof TextMessage) {
//                          TextMessage messaget = (TextMessage) message;
//                          try {
//                                // 获取消息内容
//                                System.out.println(">>>获取的消息内容:" + messaget.getText());
//                          } catch (JMSException e) {
//                                e.printStackTrace();
//                          }
//                    }
//               }
//         });
           //Thread.sleep(10000);// 睡眠10秒钟。
 
           // 9.关闭资源
           consumer.close();
           session.close();
           connection.close();
 
      }
}
 
1.3. 小结
queue 是点对点模式,只能是一个生产者产生一个消息,被一个消费者消费。
topic 是发布订阅模式,一个生产者可以一个消息,可以被多个消费者消费。
 
queue 默认是存在于MQ的服务器中的,发送消息之后,消费者随时取。但是一定是一个消费者取,消费完消息也就没有了。
topic 默认是不存在于MQ服务器中的,一旦发送之后,如果没有订阅,消息则丢失。
----------------------
---------------------
 
与Spring整合
 
 
1JMS
1.1》什么是JMS
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
 
1.2》为什么要学JMS
在JAVA中,如果两个应用程序之间对各自都不了解,甚至这两个程序可能部署在不同的大洲上,那么它们之间如何发送消息呢?举个例子,一个应用程序A部署在印度,另一个应用程序部署在美国,然后每当A触发某件事后,B想从A获取一些更新信息。当然,也有可能不止一个B对A的更新信息感兴趣,可能会有N个类似B的应用程序想从A中获取更新的信息。在这种情况下,JAVA提供了最佳的解决方案-JMS,完美解决了上面讨论的问题。
JMS与RMI不同,发送消息的时候,接收者不需要在线。服务器发送了消息,然后就不管了;等到客户端上线的时候,能保证接收到服务器发送的消息。这是一个很强大的解决方案,能处理当今世界很多普遍问题。
 
1.3JMS有什么优势
异步:JMS天生就是异步的,客户端获取消息的时候,不需要主动发送请求,消息会自动发送给可用的客户端。
 
可靠:JMS保证消息只会递送一次。大家都遇到过重复创建消息问题,而JMS能帮你避免该问题,只是避免而不是杜绝,所以在一些糟糕的环境下还是有可能会出现重复。
 
1.4JMS数据交互的两种方式
点对点消息模型
(1)、每个消息只有一个接受者(自己测试了一下,可以有多个接受者,但是当有多个接收者时,每个接收者只能获取随机的几条信息)
 
(2)、消息发送者和消息接受者并没有时间依赖性。
 
(3)、当消息发送者发送消息的时候,无论接收者程序在不在运行,都能获取到消息;
                                        
(4)、当接收者收到消息的时候,会发送确认收到通知(acknowledgement)。
 
(5)、点对点消息模型图:
 
 
发布/订阅消息模型
(1)、一个消息可以传递给多个订阅者
 
(2)、发布者和订阅者有时间依赖性,只有当客户端创建订阅后才能接受消息,且订阅者需一直保持活动状态以接收消息。
 
(3)、为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
 
(4)、发布/订阅消息模型图:
 
 
2ActiveMQ
2.1》什么是ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
 
2.2ActiveMQ的下载
主页:http://activemq.apache.org/ 目前最新版本:5.15.3
 
开发包及源码下载地址:http://activemq.apache.org/activemq-5111-release.html
 
ActiveMQ 服务启动地址:http://127.0.0.1:8161/admin/ 用户名/密码 admin/admin
 
2.3ActiveMQ的点对点使用
Session.AUTO_ACKNOWLEDGE。当客户成功的从receive 方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
 
Session.CLIENT_ACKNOWLEDGE客户通过消息的 acknowledge 方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消 费的消息。例如,如果一个消息消费者消费了 10 个消息,然后确认第 5 个消息,那么所有 10 个消息都被确认。
 
Session.DUPS_ACKNOWLEDGE该选择只是会话迟钝的确认消息的提交。如果 JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么 JMS provider 必须把消息头的 JMSRedelivered 字段设置
 
为 true
消息生产者
public class JMSProducer {
 
 
 
    private static final String USERNAME= ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
 
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
 
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
 
 
 
    public static void main(String[] args) {
 
 
 
 
 
        ConnectionFactory connectionFactory;//连接工厂
 
        Connection connection = null;//连接
 
        Session session;//会话
 
        Destination destination;//消息目的地
 
        MessageProducer messageProducer;//消息生产者
 
 
 
        connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
 
 
 
        try {
 
            connection = connectionFactory.createConnection();//通过工厂获取连接
 
            connection.start();//启动连接
 
            session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);//创建session
 
            destination = session.createQueue("新闻队列");
 
            messageProducer = session.createProducer(destination);//创建消息生产者
 
 
 
            //发送消息
 
            for(int i=0;i<10;i++){
 
                TextMessage msg = session.createTextMessage("郭永峰IT工作室客服" + (i + 1) +"号");
 
                messageProducer.send(destination,msg);
 
            }
 
 
 
            session.commit();
 
 
 
        }catch (Exception e){
 
            e.printStackTrace();
 
        }finally {
 
            if(connection!=null){
 
                try {
 
                    connection.close();
 
                } catch (JMSException e) {
 
                    // TODO Auto-generated catch block
 
                    e.printStackTrace();
 
                }
 
            }
 
        }
 
    }
 
}
 
 
 
消息消费者
public class JMSConsumer {
 
 
 
    private static final String USERNAME= ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
 
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
 
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
 
 
 
    public static void main(String[] args) {
 
 
 
 
 
        ConnectionFactory connectionFactory;//连接工厂
 
        Connection connection = null;//连接
 
        Session session;//会话
 
        Destination destination;//消息目的地
 
        MessageConsumer consumer;//消息消费者
 
 
 
        connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
 
 
 
        try {
 
            connection = connectionFactory.createConnection();//通过工厂获取连接
 
            connection.start();//启动连接
 
            session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);//创建session
 
            destination = session.createQueue("新闻队列");
 
            consumer = session.createConsumer(destination);//创建消息消费者
 
 
 
            //发送消息
 
           while(true){
 
               TextMessage message = (TextMessage) consumer.receive(10000);
 
               if(message != null){
 
                   System.out.println(message.getText());//获取消息
 
               }
 
           }
 
 
 
 
 
        }catch (Exception e){
 
            e.printStackTrace();
 
        }
 
}
 
 
 
接收消息监听器
 

 

 

等级: 二级丙等 类型: 公立
医保定点: 医保 类别: 综合

发表评论

0/200
335 点赞
0 评论
收藏
为你推荐 换一批