Linux服务器Centos系统使用开源的EMQX搭建MQTT
EMQ X 消息服务器简介
EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。
Erlang/OTP是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed)的语言平台。
MQTT 是轻量的 (Lightweight)、发布订阅模式 (PubSub) 的物联网消息协议。
EMQ X 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由:
稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。 分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。 完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持。
本文描述单点使用EMQX搭建单点MQTT推送服务器
1.安装emqx
1.1 安装所需要的依赖包
# yum install -y yum-utils device-mapper-persistent-data lvm2
1.2 设置稳定存储库,以 CentOS 7 为例
# yum-config-manager --add-repo https://repos.emqx.io/emqx-ce/redhat/centos/7/emqx-ce.repo
1.3 安装最新版本的 EMQ X Broker
# yum install emqx
2.启动关闭emqx
2.1 启动
# emqx start
emqx 4.0.0 is started successfully!
# emqx_ctl status
Node 'emqx@127.0.0.1' is started
emqx v4.0.0 is running
2.2 关闭
# emqx stop
OK
3.修改密码
3.1 修改最简单密码,mnesia认证
# vim /etc/emqx/plugins/emqx_auth_mnesia.conf
auth.mnesia.password_hash = plain
auth.user.1.username = MYUSER
auth.user.1.password = MYPASSWORD
3.2 开启mnesia认证
访问ip:18083
开启mnesia认证
4.生产者消费者代码示例
4.1 Maven导入MQTT依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
4.2 生产者代码示例
public class MqttProducer {
private static int qos = 2;
private static String broker = "tcp://x.x.x.x:1883";
private static String clientId = "clientId";
private static String userName = "MYUSER";
private static String password = "MYPASSWORD";
public static void sendMsg(String content, String pubTag) {
MemoryPersistence persistence = new MemoryPersistence();
String pubTopic = "TOPIC1/"+pubTag;
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// MQTT connection option
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
// retain session
connOpts.setCleanSession(true);
// establish a connection
System.out.println("连接broker: " + broker);
client.connect(connOpts);
System.out.println("发送消息: " + content);
// Required parameters for message publishing
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(pubTopic, message);
System.out.println("消息已发送");
client.disconnect();
System.out.println("消息发送完毕,断开连接");
client.close();
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
4.2 消费者代码示例
public class MqttConsumer {
public static void main(String[] args) {
String broker = "tcp://x.x.x.x:1883";
String clientId = "cliendId1";
String subTag = clientId;
String subTopic = "TOPIC1/"+subTag;
String userName = "MYUSER";
String password = "MYPASSWORD";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// MQTT connection option
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
// retain session
// connOpts.setCleanSession(true);
// set callback
client.setCallback(new OnMessageCallback());
// establish a connection
System.out.println("连接broker: " + broker);
client.connect(connOpts);
System.out.println("消费者已连接");
// Subscribe
client.subscribe(subTopic);
//
// // Required parameters for message publishing
// MqttMessage message = new MqttMessage(content.getBytes());
// message.setQos(qos);
// client.publish(pubTopic, message);
// System.out.println("Message published");
//
// client.disconnect();
// System.out.println("Disconnected");
// client.close();
// System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}