Linux搭建RocketMQ,Springboot集成

MQ在平时的开发,架构搭建中至关重要,可以做到削峰填谷,异步处理,本文主要描述MQ单体搭建和Springboot集成的基本使用

准备材料

1.Linux服务器一台
2.JDK1.8+
3.4G以上的磁盘空间(broker需要)

1.下载解压rocketMq(直接下载可运行bin文件)

1.1 下载

   # cd /opt
   # mkdir rocketmq
   # cd rocketmq
   # wget https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip

1.2 解压

  # unzip rocketmq-all-4.9.2-bin-release.zip
  # cd rocketmq-4.9.2

2.修改配置文件,开启鉴权

2.1 修改namesrv.conf文件,设置nameSrv端口

   # vim namesrv.conf

#设置端口
listenPort=port1

2.2 修改broker.conf文件

   # cd /opt/rocketmq/rocketmq-4.9.2/conf
   # vim broker.conf

#设置nameSrv端口,x.x.x.x为外网IP
namesrvAddr = x.x.x.x:port1
brokerIP1 = x.x.x.x
#设置监听端口
listenPort = port2
#开启acl鉴权,开启了之后plain_acl.yml才生效
aclEnable=true

2.3 修改plain_acl.yml文件

  # vim plain_acl.yml

#设置账户ak/sk
accounts:
- accessKey: rocketMq-Server
  secretKey: server-password
  whiteRemoteAddress:
  admin: false
  defaultTopicPerm: PUB
  defaultGroupPerm: PUB
  topicPerms:
  - Topic1=PUB
#  - topicB=PUB|SUB
#  - topicC=SUB
  groupPerms:
  # the group should convert to retry topic
  - Group1=PUB
#  - groupB=PUB|SUB
#  - groupC=SUB

- accessKey: rocketMq-Client
  secretKey: client-password
  whiteRemoteAddress:
  admin: false
  defaultTopicPerm: SUB
  defaultGroupPerm: SUB
  topicPerms:
  - Topic1=SUB
#  - topicB=PUB|SUB
#  #  - topicC=SUB
#    groupPerms:
#      # the group should convert to retry topic
#        - Layout=PUB

2.4 添加startMq,stopMq执行命令

# cd /opt/rocketmq/rocketmq-4.9.2
# vim start_mq.sh

输入以下命令
nohup sh bin/mqnamesrv -c conf/namesrv.conf &
nohup sh bin/mqbroker -n x.x.x.x:port1 -c conf/broker.conf autoCreateTopicEnable=true &
echo "启动MQ完毕."

# vim stop_mq.sh
输入以下命令
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
echo "停止MQ完毕"

# chmod 777 start_mq.sh
# chmod 777 stop_mq.sh

启动MQ
# ./start_mq.sh

打印: 启动MQ完毕.
查看日志:
The Name Server boot success. serializeType=JSON
The broker[broker-a, x.x.x.x:port2] boot success. serializeType=JSON and name server is x.x.x.x:port1
OpenJDK 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
load config properties file OK, conf/namesrv.conf
The Name Server boot success. serializeType=JSON
The broker[broker-a, x.x.x.x:port2] boot success. serializeType=JSON and name server is x.x.x.x:port1

关闭MQ
# ./stop_mq.sh
打印:停止MQ完毕


3.Springboot集成

3.1 添加maven依赖

<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-spring-boot-starter</artifactId>
			<version>2.0.4</version>
		</dependency>

3.2 添加RocketMQ配置

rocketmq:
  name-server: x.x.x.x:port1
  producer:
    access-key: RocketMq-Server
    secret-key: server-password
    send-message-timeout: 30000
    retry-times-when-send-async-failed: 3
    retry-times-when-send-failed: 3
    group: Group1

3.3 生产者代码样例

@Slf4j
@Component
public class MqSender {
    private static final String TOPIC = "Topic1";
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public SendResult sendSyncMsg(String msg,String tag){
        SendResult sendResult = rocketMQTemplate.syncSend(TOPIC+":"+tag, MessageBuilder.withPayload(msg).build());
        log.info("Send message success,result:{}",JSONObject.toJSONString(sendResult));
        return sendResult;
    }

    public void sendASyncMsg(String msg,String tag){
        rocketMQTemplate.asyncSend(TOPIC+":"+tag, MessageBuilder.withPayload(msg).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("Send Async message success,result:{}",JSONObject.toJSONString(sendResult));
            }

            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
                log.info("Send Async message failed,error:{}",throwable.getMessage());
            }
        });
    }
}

3.4 消费者代码样例

@Slf4j
public class MqReceive {
    public static String GROUP = "Group1";
    public static String NAMEADDR = "x.x.x.x:port1";
    public static String TOPIC = "Topic1";
    public static String AK = "rocketMq-Client";
    public static String SK = "client-password";

    public static String TAG = "tag1";

    public static void main(String[] args) throws InterruptedException, MQClientException {
        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP,getAclRpcHook(),new AllocateMessageQueueAveragely());

        // Specify name server addresses.
        consumer.setNamesrvAddr(NAMEADDR);

        // Subscribe one more more topics to consume.
        consumer.subscribe(TOPIC, TAG);
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                msgs.forEach(msg->{
                    System.out.printf("%s 接收到新消息: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }

    static RPCHook getAclRpcHook(){
        return new AclClientRPCHook(new SessionCredentials(AK, SK));
    }
}
Table of Contents