Linux服务器Centos系统使用开源的EMQX搭建MQTT

EMQ X 消息服务器简介

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。

Erlang/OTP是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed)的语言平台。

MQTT 是轻量的 (Lightweight)、发布订阅模式 (PubSub) 的物联网消息协议。

EMQ X 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由:

稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。 分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。 完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持。

本文描述单点使用EMQX搭建单点MQTT推送服务器

1.安装emqx

1.1 安装所需要的依赖包
   # yum install -y yum-utils device-mapper-persistent-data lvm2
1.2 设置稳定存储库,以 CentOS 7 为例
  # yum-config-manager --add-repo https://repos.emqx.io/emqx-ce/redhat/centos/7/emqx-ce.repo
1.3 安装最新版本的 EMQ X Broker
# yum install emqx

2.启动关闭emqx

2.1 启动
# emqx start
emqx 4.0.0 is started successfully!

# emqx_ctl status
Node 'emqx@127.0.0.1' is started
emqx v4.0.0 is running
2.2 关闭
   # emqx stop
OK

3.修改密码

3.1 修改最简单密码,mnesia认证
# vim /etc/emqx/plugins/emqx_auth_mnesia.conf

auth.mnesia.password_hash = plain

auth.user.1.username = MYUSER
auth.user.1.password = MYPASSWORD
3.2 开启mnesia认证

访问ip:18083开启mnesia认证

4.生产者消费者代码示例

4.1 Maven导入MQTT依赖
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>
4.2 生产者代码示例
public class MqttProducer {

    private static int qos = 2;
    private static String broker = "tcp://x.x.x.x:1883";
    private static String clientId = "clientId";
    private static String userName = "MYUSER";
    private static String password = "MYPASSWORD";
    public static void sendMsg(String content, String pubTag) {
        MemoryPersistence persistence = new MemoryPersistence();
        String pubTopic = "TOPIC1/"+pubTag;
        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // MQTT connection option
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(userName);
            connOpts.setPassword(password.toCharArray());
//             retain session
            connOpts.setCleanSession(true);


            // establish a connection
            System.out.println("连接broker: " + broker);
            client.connect(connOpts);
            System.out.println("发送消息: " + content);
            // Required parameters for message publishing
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(pubTopic, message);
            System.out.println("消息已发送");

            client.disconnect();
            System.out.println("消息发送完毕,断开连接");
            client.close();
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}
4.2 消费者代码示例
public class MqttConsumer {
    public static void main(String[] args) {
        String broker = "tcp://x.x.x.x:1883";
        String clientId = "cliendId1";
        String subTag = clientId;
        String subTopic = "TOPIC1/"+subTag;
        String userName = "MYUSER";
        String password = "MYPASSWORD";

        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // MQTT connection option
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(userName);
            connOpts.setPassword(password.toCharArray());
            // retain session
//            connOpts.setCleanSession(true);

            // set callback
            client.setCallback(new OnMessageCallback());

            // establish a connection
            System.out.println("连接broker: " + broker);
            client.connect(connOpts);

            System.out.println("消费者已连接");

            // Subscribe
            client.subscribe(subTopic);
//
//            // Required parameters for message publishing
//            MqttMessage message = new MqttMessage(content.getBytes());
//            message.setQos(qos);
//            client.publish(pubTopic, message);
//            System.out.println("Message published");
//
//            client.disconnect();
//            System.out.println("Disconnected");
//            client.close();
//            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}
Table of Contents