使用SpringBoot开发MQTT应用_简单示例
一、pom依赖
注意,此处仅展示mqtt所需依赖,实际项目中还需要SpringBoot相关依赖才能运行,此处未写。
TODO: 超链接到搭建SpringBoot项目简单示例
<!-- MQTT依赖 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.4.1</version>
</dependency>
<!-- 生成/解析JSON依赖(非必须) -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
二、配置文件
在resources中创建或修改application.properties配置文件
# MQTT Config
com.mqtt.url = tcp://42.193.97.163:1883
com.mqtt.inBoundClientId = java_client_id
com.mqtt.outBoundClientId = java_out_client_id
com.mqtt.subTopic = /esp32c3/fish/data/0000000000000000
注意,需要将com.mqtt.url字段中的IP和端口改为MQTT服务器的IP和端口(也可以用域名和端口),clientId是代表客户端ID,此处是固定写死的配置,实际应用中应该随机生成或者使用其他唯一ID来代替,以避免客户端ID重复,subTopic代表需要订阅的主题字符串。(TODO: 暂时不清楚outBoundClientId是干嘛用的)
三、配置类与核心代码
在您的主包下创建config包,并在config包下创建MqttConfiguration类,例如我这个项目的类所在位置是com.zonhor.config下。
下面这段代码演示了如何实现最基础的消息收发功能,实际应用中应将配置代码和业务代码分开,此处仅为了演示功能的实现才将配置类和业务类放到了一起使用。
package com.zonhor.config;
import com.alibaba.fastjson.JSON;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MqttConfiguration {
@Value("${com.mqtt.url}")
private String url;
@Value("${com.mqtt.username}")
private String username;
@Value("${com.mqtt.password}")
private String password;
@Value("${com.mqtt.subTopic}")
private String subTopic;
@Value("${com.mqtt.inBoundClientId}")
private String inClientId;
@Value("${com.mqtt.outBoundClientId}")
private String outClientId;
@Resource
private MyGateway myGateway;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setServerURIs(new String[]{url});
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
factory.setConnectionOptions(mqttConnectOptions);
return factory;
}
/**
* InBound Begin 消息接收端
****/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
// 此处将客户端ID后拼接了一个当前系统时间,为避免多个客户端ID重复
String clientid = inClientId + "_" + System.currentTimeMillis();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(url, clientid);
// 配置MQTT客户端
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(0);
adapter.setOutputChannel(mqttInputChannel());
// 添加订阅,如果想订阅多个主题,可以重复下面这一行代码
adapter.addTopic(subTopic);
return adapter;
}
// ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
// 消息消费
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
Map maps = new HashMap();
try {
maps = (Map) JSON.parse(message.getPayload().toString());
} catch (Exception e) {
}
// 收到消息
// 使用topic来判断收到的订阅主题名称
// 使用message.getPayload().toString()来获取收到的具体内容
System.out.println(topic + ":收到消息 " + message.getPayload().toString());
// 发送消息示例
// 此处演示了如果收到订阅消息,再将此消息发送到 /esp32c3/fish/data/cache 主题
myGateway.sendToMqtt(message.getPayload().toString(), "/esp32c3/fish/data/cache");
}
};
}
/** InBound End ****/
/**
* OutBound Begin 消息发送端
****/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/*****
* 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
*
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
// 在这里进行mqttOutboundChannel的相关设置
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(outClientId, mqttClientFactory());
messageHandler.setAsync(true); // 如果设置成true,发送消息时将不会阻塞。
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
// 定义重载方法,用于消息发送
void sendToMqtt(String data);
// 指定topic进行消息发送
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
/** OutBound End ****/
}
四、效果演示
检查客户端已连接
我们运行SpringBoot后如果没有报错,就可以在EMQX中看到我们Java客户端的ID了,代表已连接成功。
测试订阅和发布
发布消息
我使用EMQX自带的WebSocket客户端工具进行测试,首先我们发布一条消息,返回springboot客户端控制台可以看到已经成功接收到此消息。
订阅消息
接下来我们在WebSocket客户端工具中订阅/esp32c3/fish/data/cache
主题,然后再次点击下面的发布,我们就可以在”已接收“区域中看到来自SpringBoot客户端发布的相同内容的消息了。