Linux搭建RocketMQ,Springboot集成
MQ在平时的开发,架构搭建中至关重要,可以做到削峰填谷,异步处理,本文主要描述MQ单体搭建和Springboot集成的基本使用
准备材料
1.Linux服务器一台
2.JDK1.8+
3.4G以上的磁盘空间(broker需要)
1.下载解压rocketMq(直接下载可运行bin文件)
1.1 下载
# cd /opt
# mkdir rocketmq
# cd rocketmq
# wget https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip
1.2 解压
# unzip rocketmq-all-4.9.2-bin-release.zip
# cd rocketmq-4.9.2
2.修改配置文件,开启鉴权
2.1 修改namesrv.conf
文件,设置nameSrv端口
# vim namesrv.conf
#设置端口
listenPort=port1
2.2 修改broker.conf
文件
# cd /opt/rocketmq/rocketmq-4.9.2/conf
# vim broker.conf
#设置nameSrv端口,x.x.x.x为外网IP
namesrvAddr = x.x.x.x:port1
brokerIP1 = x.x.x.x
#设置监听端口
listenPort = port2
#开启acl鉴权,开启了之后plain_acl.yml才生效
aclEnable=true
2.3 修改plain_acl.yml
文件
# vim plain_acl.yml
#设置账户ak/sk
accounts:
- accessKey: rocketMq-Server
secretKey: server-password
whiteRemoteAddress:
admin: false
defaultTopicPerm: PUB
defaultGroupPerm: PUB
topicPerms:
- Topic1=PUB
# - topicB=PUB|SUB
# - topicC=SUB
groupPerms:
# the group should convert to retry topic
- Group1=PUB
# - groupB=PUB|SUB
# - groupC=SUB
- accessKey: rocketMq-Client
secretKey: client-password
whiteRemoteAddress:
admin: false
defaultTopicPerm: SUB
defaultGroupPerm: SUB
topicPerms:
- Topic1=SUB
# - topicB=PUB|SUB
# # - topicC=SUB
# groupPerms:
# # the group should convert to retry topic
# - Layout=PUB
2.4 添加startMq,stopMq执行命令
# cd /opt/rocketmq/rocketmq-4.9.2
# vim start_mq.sh
输入以下命令
nohup sh bin/mqnamesrv -c conf/namesrv.conf &
nohup sh bin/mqbroker -n x.x.x.x:port1 -c conf/broker.conf autoCreateTopicEnable=true &
echo "启动MQ完毕."
# vim stop_mq.sh
输入以下命令
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
echo "停止MQ完毕"
# chmod 777 start_mq.sh
# chmod 777 stop_mq.sh
启动MQ
# ./start_mq.sh
打印: 启动MQ完毕.
查看日志:
The Name Server boot success. serializeType=JSON
The broker[broker-a, x.x.x.x:port2] boot success. serializeType=JSON and name server is x.x.x.x:port1
OpenJDK 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
load config properties file OK, conf/namesrv.conf
The Name Server boot success. serializeType=JSON
The broker[broker-a, x.x.x.x:port2] boot success. serializeType=JSON and name server is x.x.x.x:port1
关闭MQ
# ./stop_mq.sh
打印:停止MQ完毕
3.Springboot集成
3.1 添加maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
3.2 添加RocketMQ配置
rocketmq:
name-server: x.x.x.x:port1
producer:
access-key: RocketMq-Server
secret-key: server-password
send-message-timeout: 30000
retry-times-when-send-async-failed: 3
retry-times-when-send-failed: 3
group: Group1
3.3 生产者代码样例
@Slf4j
@Component
public class MqSender {
private static final String TOPIC = "Topic1";
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult sendSyncMsg(String msg,String tag){
SendResult sendResult = rocketMQTemplate.syncSend(TOPIC+":"+tag, MessageBuilder.withPayload(msg).build());
log.info("Send message success,result:{}",JSONObject.toJSONString(sendResult));
return sendResult;
}
public void sendASyncMsg(String msg,String tag){
rocketMQTemplate.asyncSend(TOPIC+":"+tag, MessageBuilder.withPayload(msg).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("Send Async message success,result:{}",JSONObject.toJSONString(sendResult));
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
log.info("Send Async message failed,error:{}",throwable.getMessage());
}
});
}
}
3.4 消费者代码样例
@Slf4j
public class MqReceive {
public static String GROUP = "Group1";
public static String NAMEADDR = "x.x.x.x:port1";
public static String TOPIC = "Topic1";
public static String AK = "rocketMq-Client";
public static String SK = "client-password";
public static String TAG = "tag1";
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP,getAclRpcHook(),new AllocateMessageQueueAveragely());
// Specify name server addresses.
consumer.setNamesrvAddr(NAMEADDR);
// Subscribe one more more topics to consume.
consumer.subscribe(TOPIC, TAG);
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
msgs.forEach(msg->{
System.out.printf("%s 接收到新消息: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
static RPCHook getAclRpcHook(){
return new AclClientRPCHook(new SessionCredentials(AK, SK));
}
}