测试

使用SpringBoot开发MQTT应用_简单示例

一、pom依赖

注意,此处仅展示mqtt所需依赖,实际项目中还需要SpringBoot相关依赖才能运行,此处未写。
TODO: 超链接到搭建SpringBoot项目简单示例

<!-- MQTT依赖 -->
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
  <version>5.4.1</version>
</dependency>

<!-- 生成/解析JSON依赖(非必须) -->
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.51</version>
</dependency>

二、配置文件

在resources中创建或修改application.properties配置文件

# MQTT Config
com.mqtt.url = tcp://42.193.97.163:1883
com.mqtt.inBoundClientId = java_client_id
com.mqtt.outBoundClientId = java_out_client_id
com.mqtt.subTopic = /esp32c3/fish/data/0000000000000000

注意,需要将com.mqtt.url字段中的IP和端口改为MQTT服务器的IP和端口(也可以用域名和端口),clientId是代表客户端ID,此处是固定写死的配置,实际应用中应该随机生成或者使用其他唯一ID来代替,以避免客户端ID重复,subTopic代表需要订阅的主题字符串。(TODO: 暂时不清楚outBoundClientId是干嘛用的)

三、配置类与核心代码

在您的主包下创建config包,并在config包下创建MqttConfiguration类,例如我这个项目的类所在位置是com.zonhor.config下。

下面这段代码演示了如何实现最基础的消息收发功能,实际应用中应将配置代码和业务代码分开,此处仅为了演示功能的实现才将配置类和业务类放到了一起使用。

package com.zonhor.config;

import com.alibaba.fastjson.JSON;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class MqttConfiguration {

  @Value("${com.mqtt.url}")
  private String url;

  @Value("${com.mqtt.username}")
  private String username;

  @Value("${com.mqtt.password}")
  private String password;

  @Value("${com.mqtt.subTopic}")
  private String subTopic;

  @Value("${com.mqtt.inBoundClientId}")
  private String inClientId;

  @Value("${com.mqtt.outBoundClientId}")
  private String outClientId;

  @Resource
  private MyGateway myGateway;
  
  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    mqttConnectOptions.setServerURIs(new String[]{url});
    mqttConnectOptions.setUserName(username);
    mqttConnectOptions.setPassword(password.toCharArray());
    factory.setConnectionOptions(mqttConnectOptions);
    return factory;
  }

  /**
   * InBound Begin 消息接收端
   ****/
  @Bean
  public MessageChannel mqttInputChannel() {
    return new DirectChannel();
  }

  @Bean
  public MessageProducer inbound() {
    // 此处将客户端ID后拼接了一个当前系统时间,为避免多个客户端ID重复
    String clientid = inClientId + "_" + System.currentTimeMillis();
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(url, clientid);

    // 配置MQTT客户端
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(0);
    adapter.setOutputChannel(mqttInputChannel());

    // 添加订阅,如果想订阅多个主题,可以重复下面这一行代码
    adapter.addTopic(subTopic);

    return adapter;
  }

  // ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。
  @Bean
  @ServiceActivator(inputChannel = "mqttInputChannel")
  public MessageHandler handler() {
    return new MessageHandler() {
      // 消息消费
      @Override
      public void handleMessage(Message<?> message) throws MessagingException {
        String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
        Map maps = new HashMap();
        try {
          maps = (Map) JSON.parse(message.getPayload().toString());
        } catch (Exception e) {

        }

        // 收到消息
        // 使用topic来判断收到的订阅主题名称
        // 使用message.getPayload().toString()来获取收到的具体内容
        System.out.println(topic + ":收到消息 " + message.getPayload().toString());

        // 发送消息示例
        // 此处演示了如果收到订阅消息,再将此消息发送到 /esp32c3/fish/data/cache 主题
        myGateway.sendToMqtt(message.getPayload().toString(), "/esp32c3/fish/data/cache");
      }
    };
  }

  /** InBound End ****/

  /**
   * OutBound Begin 消息发送端
   ****/

  @Bean
  public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
  }

  /*****
   * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
   *
   * @return
   */
  @Bean
  @ServiceActivator(inputChannel = "mqttOutboundChannel")
  public MessageHandler mqttOutbound() {
    // 在这里进行mqttOutboundChannel的相关设置
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(outClientId, mqttClientFactory());
    messageHandler.setAsync(true); // 如果设置成true,发送消息时将不会阻塞。
    messageHandler.setDefaultTopic("testTopic");
    return messageHandler;
  }

  @Component
  @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
  public interface MyGateway {

    // 定义重载方法,用于消息发送
    void sendToMqtt(String data);

    // 指定topic进行消息发送
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

  }
  /** OutBound End ****/

}

四、效果演示

检查客户端已连接

我们运行SpringBoot后如果没有报错,就可以在EMQX中看到我们Java客户端的ID了,代表已连接成功。

测试订阅和发布

发布消息

我使用EMQX自带的WebSocket客户端工具进行测试,首先我们发布一条消息,返回springboot客户端控制台可以看到已经成功接收到此消息。

订阅消息

接下来我们在WebSocket客户端工具中订阅/esp32c3/fish/data/cache主题,然后再次点击下面的发布,我们就可以在”已接收“区域中看到来自SpringBoot客户端发布的相同内容的消息了。

添加新评论

评论列表