Linux 下 RocketMQ 安装、配置与运维(详细讲解)

qyhua 2024-10-14 15:07:01 阅读 61

一  RocketMQ 下载安装

1 下载 RocketMQ:        

    下载当前最新版本RocketMQ

   官网下载:    https://dist.apache.org/repos/dist/release/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip

<code>wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.0/rocketmq-al l-5.3.0-bin-release.zip

  执行下载图:

  下载成功图:

2 安装RocketMQ

  安装过程非常简单,解压RocketMQ压缩包即可

<code>unzip rocketmq-all-5.3.0-bin-release.zip

    解压过程中: 

3 验证安装

1 启动NameServer

  以后台启动NameServer服务:

<code>nohup sh bin/mqnamesrv &

  执行后看到创建了个后台进程,但此时并无法看到日记

打开日记查看执行效果:

<code>tail -f nohup.out

2 启动Broker

启动Broker 可以加上--enable-proxy 方式启动代理,也可正常启动不使用代理,如下:

<code># 开启代理方式启动

nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

# 默认不使用代理方式启动

nohup sh bin/mqbroker -n localhost:9876

启动成功如下图: 

启动成功, 看一下brokerIP xx.xx.xx.xx10911 如果是内网IP外网是无法访问的,需要配置外网IP,云服务器如果使用默认配置一般是内网IP

 

配置broker外网IP

 rocketMQ主目录\conf\broker.conf

<code>brokerIP1=外网IP地址

 增加后如下图: 

3 测试连通

  使用自带工具验证本地环境:

  生产端发送测试

<code>#先设置工具依赖变量

export NAMESRV_ADDR=localhost:9876

#测试生产端发送

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

执行如下: 

 

从上图中可以看到send_ok,说明生产端已正常发送信息到队列。

测试接收端:

<code>sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

执行后则到队列的信息如下图:

按ctrl+c结束接收测试。 

从上面看,本地生产发送与接收数据正常,基本可以判断本地安装正常。

注意事项

broket 启动时默认启动脚本内存参数是使用8G内存。如果您的内存足够可以继续增加,如果内存有限则要缩小, 如果内存小于8G可能存在报错:

修改默认内存,文件位置:bin/runbroker.sh,如下图:

用vi打开脚本,找到配置内存参数如下图:

配置项:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g",根据实际内存业务情况,变更-Xms8g -Xmx8g参数大小即可。

二 JAVA客户端连接

1 mvn 项目引入依赖如下:

<code> <dependency>

<groupId>org.apache.rocketmq</groupId>

<artifactId>rocketmq-client</artifactId>

<version>4.3.0</version>

</dependency>

<dependency>

<groupId>org.apache.rocketmq</groupId>

<artifactId>rocketmq-acl</artifactId>

<version>4.5.2</version>

</dependency>

2 生产发送端java代码:

 java生产端发送消息代码:

import org.apache.logging.log4j.LogManager;

import org.apache.logging.log4j.Logger;

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendCallback;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

import org.apache.rocketmq.remoting.common.RemotingHelper;

import org.apache.rocketmq.remoting.exception.RemotingException;

import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

import javax.annotation.PreDestroy;

import java.io.UnsupportedEncodingException;

import java.nio.charset.StandardCharsets;

@Service

public class MQRocketServiceImpl {

private DefaultMQProducer producer;

private static final Logger logger = LogManager.getLogger(MQRocketServiceImpl.class);

@PostConstruct

public void initProducer() throws MQClientException {

producer = new DefaultMQProducer("CONSUMER_GROUP");

producer.setNamesrvAddr(xx.xx.xx.xx:9876);

producer.setInstanceName(RunTimeUtil.getRocketMqUniqeInstanceName());

producer.start();

}

@PreDestroy

public void shutdownProducer() {

if (producer != null) {

producer.shutdown();

}

}

public boolean sendMsg(String text, String key) {

try {

Message msg = new Message(

MqCfg.TOPIC,

MqCfg.SUB_EXPRESSION,

key,

text.getBytes(StandardCharsets.UTF_8) // 使用标准字符集

);

producer.send(msg, new SendCallback() {

@Override

public void onSuccess(SendResult sendResult) {

logger.info("Message sent successfully: {}", sendResult);

}

@Override

public void onException(Throwable e) {

logger.error("Failed to send message", e);

}

});

return true;

} catch (UnsupportedEncodingException | MQClientException | RemotingException | InterruptedException e) {

logger.error("Error sending message", e);

return false;

}

}

}

执行后发送成功打印结果:

 3 服务端接收代码:

 java服务端接收代码

<code>

import org.apache.logging.log4j.LogManager;

import org.apache.logging.log4j.Logger;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.message.MessageExt;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.ApplicationArguments;

import org.springframework.boot.ApplicationRunner;

import org.springframework.stereotype.Service;

import java.util.List;

@Service

public class MsgReceiveServiceImpl implements ApplicationRunner {

@Autowired

private PackageHandlerImpl packageHandler;

private static final Logger logger = LogManager.getLogger(MsgReceiveServiceImpl.class);

@Override

public void run(ApplicationArguments args) {

receiveQueue();

}

private void receiveQueue() {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MqCfg.CONSUMER_GROUP);

consumer.setNamesrvAddr("xx.xx.xx.xx:9076");

try {

consumer.subscribe(MqCfg.TOPIC, MqCfg.SUB_EXPRESSION);

consumer.registerMessageListener((MessageListenerOrderly) this::processMessages);

consumer.start();

logger.info("MQ消费者启动成功。");

} catch (MQClientException e) {

logger.error("MQ消费者启动失败!", e);

throw new RuntimeException("连接MQ错误,启动失败!", e);

}

}

private ConsumeOrderlyStatus processMessages(List<MessageExt> msgs, ConsumeOrderlyContext context) {

for (MessageExt msg : msgs) {

try {

String text = new String(msg.getBody());

//消息处理……

} catch (Exception e) {

logger.error("处理消息出错,key={},错误信息:", msg.getKeys(), e);

// 可以在此处根据业务需求返回 SUSPEND_CURRENT_QUEUE_A_MOMENT,或者选择其他处理方式

}

}

return ConsumeOrderlyStatus.SUCCESS;

}

}

 以上代码实现ApplicationRunner接口,使其作为后台一个线程单独处理消息。

三 RocketMQ 运维

1 RocketMQ面板,可视化管理

RocketMQ与apacheMQ不同,本身没有自带面板查看状态工具,查看队列的状态要依赖命令行这对运维或开发都非常不方便,需要另外安装Rocket MQ 面板工具rocketmq-dashboard:

开源地址:https://github.com/apache/rocketmq-dashboard

下载回来后直接用IDE开发环境运行,也可以打包后放在服务器一起运行,如下图:

开发环境运行视图如下 

除了一般的统计信息,还可以进行管理,功能丰富,如下图:

有此神器作观察,相信运维不再什么难事。

2 RocketMQ变更默认端口

1 修改方式一(4.x ,以前的版本是可以的):

 要更改 RocketMQ 的本地部署中的端口,您需要修改 RocketMQ 的配置文件。RocketMQ 的配置  主要包括 <code>broker.conf 和 namesrv.conf 这两个文件。

找到配置文件

找到 RocketMQ 的安装目录下的 conf 文件夹。在 conf 文件夹中,你会看到 broker.conf 和 namesrv.conf

修改 Broker 端口

打开 broker.conf 文件。查找 listen_port 这一行,这是 Broker 的监听端口,默认通常是 10911。更改 listen_port 的值为所需的端口号。

修改 NameServer 端口

打开 namesrv.conf 文件。查找  NAMESRV_PORT 这一行,这是 NameServer 的监听端口,默认通常是 9876。更改 NAMESRV_PORT 的值为所需的端口号。

2 端口修改方式二(当前最新的版本5.x)

在使用上面的方式修改端口后发现失效,只能查看源代码:

看NameSrv模块源码发现服务固定是9876,后面通加载参数c 判断配置文件路径加载,如下图:

跟进MixAll.properties2Object,发现只是根据类的参数与类型匹配加载,如下图:

直接看配置参数类:

这次我们要改端口,所以新建个配置文件,只需增加端口配置即可:

文件内容就一个字段: listenPort = xxxx 端口号,如下图:

启动测试,修改成功如下图:

<code># 启动名称服务 xxx.conf是配置文件路径,可以使用相对路径

nohup sh bin/mqnamesrv -c xxx.conf &

查看了 broker 模块源码发现启动也是一样,所以端口修改方式也是同上 ,修改后启动成功如下图:

3 启停脚本

  1 启动RocketMQ脚本:

  启动脚本,只需执行脚本就可以快速启动MQ,以下是启动脚本代码如下:

<code>#!/bin/bash

# 启动 Nameserver

echo 'Starting MQ NameServer...'

nohup sh bin/mqnamesrv > mq.log 2>&1 &

sleep 5

# 检查 Nameserver 是否启动成功

if ps aux | grep -v grep | grep -q 'mqnamesrv'; then

echo 'MQ NameServer started successfully.'

else

echo 'Failed to start MQ NameServer.'

exit 1

fi

# 启动 Broker

echo 'Starting MQ Broker...'

nohup sh bin/mqbroker -n 0.0.0.0:9876 >> mq.log 2>&1 &

sleep 5

# 检查 Broker 是否启动成功

if ps aux | grep -v grep | grep -q 'mqbroker'; then

echo 'MQ Broker started successfully.'

else

echo 'Failed to start MQ Broker.'

exit 1

fi

# 显示日志

tail -f mq.log

执行后同时打印日记,退出只需按ctrl+c即可。执行启动脚本成功如下图:

  2 停止脚本: 

  停止RocketMQ脚本:

<code>#!/bin/bash

# 关闭 Nameserver

echo 'Closing MQ NameServer...'

sh bin/mqshutdown namesrv

# 检查 Nameserver 是否成功关闭

sleep 5

if ! ps aux | grep -v grep | grep -q 'mqnamesrv'; then

echo 'MQ NameServer closed successfully.'

else

echo 'Failed to close MQ NameServer.'

exit 1

fi

# 关闭 Broker

echo 'Closing MQ Broker...'

sh bin/mqshutdown broker

# 检查 Broker 是否成功关闭

sleep 20

if ! ps aux | grep -v grep | grep -q 'mqbroker'; then

echo 'MQ Broker closed successfully.'

else

echo 'Failed to close MQ Broker.'

exit 1

fi

执行停止脚本成功停止,如下图:

注意:停止broker服务花的时间通常比较长,如果显示停止失败可以多次调用或者在脚本延长等待时间即可。 



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。