菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
219
0

centos7 安装 maven 和ant git 以及 rocketmq 4.2安装过程(安装成功,调用失败)

原创
05/13 14:22
阅读数 24940
1.maven 安装
wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
yum -y install apache-maven
2.install
yum -y install ant

3.git 安装
yum install git
#查看版本
git --version
#显示 git version +版本号 表示成功
#配置 git 名称以及邮箱
git config --global user.name "Your Name"
git config --global user.email "user@youremail" 
3.rocketmq安装(注意这里版本是4.2.0)

cd /usr/local/rocketmq(没有则创建目录)
git clone -b develop https://github.com/apache/incubator-rocketmq.git

cd incubator-rocketmq
mvn -Prelease-all -DskipTests clean install -U

------------------

一段长时间的maven 依赖下载


-------------------

cd distribution/target/apache-rocketmq

2)配置文件
vim /etc/profile 添加

 

#apache rocket-mq
export ROCKETMQ_HOME=/usr/local/rocketmq/incubator-rocketmq/distribution/target/apache-rocketmq
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
export NAMESRV_ADDR=自己服务器ip:9876

   使profile 生效

   source /etc/profile

  进入到 /usr/local/rocketmq/incubator-rocketmq/distribution/target/apache-rocketmq/bin 目录下:添加权限

  chmod +x mqadmin mqbroker mqfiltersrv mqshutdown mqnamesrv

  启动:

 

nohup  mqnamesrv & 

//查看启动日志 默认在bin 目录的nohup.log下
tail -f nohup.out显示如下信息 表示启动成功

 

3)由于自己的服务器使用的是阿里云的 2g 内存,启动那个 mqnamesrv后,启动mqbroker时候需要设置下内存大小,否则会报错

vim runserver.sh(因为mqbroker脚本里面调用了runserver.sh
sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@) 修改下图:

 


之后 启动 mqbroker并将启动日志写入到指定位置.进入到target/bin目录

nohup mqbroker & >/var/log/mq.log

  

   启动成功后,使用ps aux|grep rocketmq如下图

 

 

 

 

4)写测试用例

 

  1.pom.xml引入rocket包(引入的是4.1的包,4.2的引入后无法使用)

<!--4.2无法使用 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0</version>
<type>pom</type>
</dependency>
<!--4.1的引用包-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.1.0-incubating</version>
</dependency>


生产者main方法:
package cn.rocketmq;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;


/**
 * Create by fan on 2018/4/16
 */
public class TestProductRocketMq {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {


        final DefaultMQProducer defaultMQProducer = new DefaultMQProducer("producerGroupName");
       // defaultMQProducer.setVipChannelEnabled(false);
        defaultMQProducer.setNamesrvAddr("47.98.111.19:9876");

        try {
            defaultMQProducer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }


        Message message = new Message("testTopic","tagA","keyA","Hello RocketMq".getBytes());
        for (int i = 0 ;i<100;i++){
            if (i%2 == 0){
                SendResult sendResult = defaultMQProducer.send(message);
                Thread.sleep(100);
                System.out.println("tags send result:" + sendResult);
            }else {
                message = new Message("testTopic","tagB","keyB","Hello RocketMq.I'm your user".getBytes());
                SendResult sendResult = defaultMQProducer.send(message);
                Thread.sleep(100);
                System.out.println("tags send result:" + sendResult);
            }
        }

        Runtime.getRuntime().addShutdownHook(new Thread(() -> defaultMQProducer.shutdown()));
        System.exit(0);
    }
}

 






 

消费者方法:

package cn.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * Create by fan on 2018/4/16
 */
public class TestConsumeRocketMq {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("testProducerGroupName");
        defaultMQPushConsumer.setNamesrvAddr("47.98.111.19:9876");


        defaultMQPushConsumer.subscribe("testTopic","tagA || tagB");
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println(Thread.currentThread().getName() + "Receive new message:" + list);
                MessageExt messageExt = list.get(0);
                System.out.println("messageExt:" + messageExt);
                if(messageExt!=null && "testTopic".equals(messageExt.getTopic())){

                    if("tagA".equals(messageExt.getTags())){

                        String mess = new String(messageExt.getBody());
                        System.out.println("mess tagA consume:" + mess);
                    }else if("tagB".equals(messageExt.getTags())){
                        String mess = new String(messageExt.getBody());
                        System.out.println("mess tagB consume:" + mess);
                    }
                }
                //回执确认消息
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        defaultMQPushConsumer.start();

        System.out.println("Consume start.");

    }
}

 

 

 

之后运行main方法报错:

 

RocketMq Exception "connect to <:10909> failed"

 

解决办法是:centos关闭了10909以及9876防火墙端口

 firewall-cmd --zone=public --add-port=10909/tcp --permanent 

 firewall-cmd --zone=public --add-port=9876/tcp --permanent 

 
之后,又碰到下面的问题:
 

 

 

google了半天,尚未解决。。。 

 

  

 

发表评论

0/200
219 点赞
0 评论
收藏