mqtt客户端源代码(开源MQTT产品emq使用有java源码)
mqtt客户端源代码(开源MQTT产品emq使用有java源码)客户端很丰富,我主要是做java的,所以用java做的测试这控制台做的还有点好看呢停止emqttd服务: /usr/local/emqttd/bin/emqttd stop重启emqttd服务: /usr/local/emqttd/bin/emqttd restart测试是否成功:启动后在浏览器输入:http://主机ip或域名地址:18083 访问账号 admin ,密码 public
安装:比较简单,官网介绍的很详细
先下载emqttd网站下载软件:http://www.emqtt.com/downloads ,
请根据服务器系统型号下载稳定版,如centos7系统,请下载emqttd-centos7-v2.3.11.zip,下载后解压后直接放在软件运行目录,一般是放在:/usr/local/emqttd目录,
启动emqttd服务:/usr/local/emqttd/bin/emqttd start
停止emqttd服务: /usr/local/emqttd/bin/emqttd stop
重启emqttd服务: /usr/local/emqttd/bin/emqttd restart
测试是否成功:启动后在浏览器输入:http://主机ip或域名地址:18083 访问账号 admin ,密码 public
这控制台做的还有点好看呢
客户端:客户端很丰富,我主要是做java的,所以用java做的测试
springboot开发的
发送消息:
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class PublishMessage { private static int QOS = 1; /** * 注:此处的tcp端口默认是1883 */ private static String HOST = "tcp://192.168.230.129:1883"; private static String userName = "admin"; private static String password = "public"; private static MqttClient connect(String clientId String userName String password) throws MqttException { MemoryPersistence persistence = new MemoryPersistence(); MqttconnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(password.toCharArray()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); MqttClient client = new MqttClient(HOST clientId persistence); client.setCallback(new PushCallback()); client.connect(options); return client; } private static void publish(MqttClient client String msg String topic) throws MqttException { MqttMessage message = new MqttMessage(msg.getBytes()); message.setQos(QOS); message.setRetained(false); client.publish(topic message); } public static void start(String msg) throws MqttException { /*String msg = "Hello !";*/ String clientId = "ServerId_01"; String topic = "MQTT_TOPIC"; MqttClient client = connect(clientId userName password); if (client != null) { publish(client msg topic); System.out.println("Start-----Public Message:" msg); } if (client != null) { client.disconnect(); } } }
方便测试,来个rest接口
import org.eclipse.paho.client.mqttv3.MqttException; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class MQTTServerController { @RequestMapping("/") public String sayHello() { return "Hello !"; } @RequestMapping("/send") public void send(String msg) throws MqttException { PublishMessage.start(msg); } }
消费消息:
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class ReceiveMessage { private static int QOS = 1; /** * 注:此处的TCP端口默认是1883 */ private static String HOST = "tcp://192.168.230.129:1883"; private static String userName = "admin"; private static String password = "public"; private static MqttClient connect(String clientId) throws MqttException { MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(password.toCharArray()); options.setKeepAliveInterval(20); options.setConnectionTimeout(10); MqttClient client = new MqttClient(HOST clientId persistence); client.setCallback(new PushCallback()); client.connect(options); return client; } public static void receive(MqttClient client String topic) throws MqttException { int[] Qos = {QOS}; String[] topics = {topic}; client.subscribe(topics Qos); } public static void start() throws MqttException { String clientId = "ClientId_01"; String topic = "MQTT_TOPIC"; MqttClient client = connect(clientId); if (client != null) { receive(client topic); } } }
回调类:
在emq里有个回调类特别重要
/** * 发布消息的回调类 * <p> * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 * 在回调中,将它用来标识已经启动了该回调的哪个实例。 * 必须在回调类中实现三个方法: * <p> * public void messageArrived(MqttTopic topic MqttMessage message)接收已经预订的发布。 * <p> * public void connectionLost(Throwable cause)在断开连接时调用。 * <p> * public void deliveryComplete(MqttDeliveryToken token)) * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 * 由 MqttClient.connect 激活此回调。 */ public class PushCallback implements MqttCallback { @Override public void connectionLost(Throwable throwable) { /** * 连接丢失后,一般在这里面进行重连 */ System.out.println("连接断开,可以做重连"); } @Override public void messageArrived(String topic MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 System.out.println("Server 接收消息主题 : " topic); System.out.println("Server 接收消息Qos : " message.getQos()); System.out.println("Server 接收消息内容 : " new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" token.isComplete()); } }
发送消息用到deliveryComplete方法,消费消息用到messageArrived方法
启动测试即可
GitHub源码:https://github.com/jedyang/mqtt_demo