Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

95.2. MQTT Support

https://docs.spring.io/spring-integration/reference/mqtt.html

		
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>6.2.1</version>
</dependency>
		
		

95.2.1. 入站消息通道适配器

95.2.2. 出站通道适配器

			
package cn.netkiller.config;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.handler.annotation.Header;

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class MqttConfiguration {
    @Value("${mqtt.broker}")
    private String broker;
    @Value("${mqtt.username}")
    private String username;
    @Value("${mqtt.password}")
    private String password;

    private final int qos = 2;
    @Value("${mqtt.topic.prefix}")
    private String prefix;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{broker});
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setAutomaticReconnect(true);
        options.setCleanSession(false);
        options.setKeepAliveInterval(20);
//        options.setConnectionTimeout(30000);
//        options.setExecutorServiceTimeout(30000);
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);

    }
}
			
			

			
    @Autowired
    private MqttConfiguration.MyGateway myGateway;

    @GetMapping("/mqtt")
    public Mono<String> mqtt(Principal principal) {
        myGateway.sendToMqtt("Test");
        myGateway.sendToMqtt("neo", "Test");
        return Mono.empty();
    }			
			
			

95.2.3. @MessagingGateway 定义消息网管接口

			
    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);

        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);

    }			
			
			

95.2.4. 手动 ACK 应答

o

95.2.5. Spring boot with Mqtt v5

使用原生 paho 库

			
       <!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.mqttv5.client -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
            <version>1.2.5</version>
        </dependency>			
			
			

代码

			
package cn.netkiller.component;


import com.google.gson.Gson;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.LinkedHashMap;
import java.util.Map;

@Component
@Slf4j
@Data
public class MqttComponent {
    private final String clientId = "netkiller-" + System.currentTimeMillis();

    private final Gson gson = new Gson();
    private final int qos = 2;
    @Value("${mqtt.topic.prefix}")
    private String prefix;
    
    private String broker;
    
    private String username;
    
    private String password;


    public MqttComponent() {

    }
    public MqttComponent(@Value("${mqtt.broker}") String broker, @Value("${mqtt.username}") String username, @Value("${mqtt.password}") String password) {
        this.broker = broker;
        this.username = username;
        this.password = password;
    }

    public void publish(String topic, String device, String session, String content) {
//        Thread.currentThread().setName(this.getClass().getSimpleName());
        MqttConnectionOptions options = new MqttConnectionOptions();
        options.setCleanStart(false);
//        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(30);
        options.setKeepAliveInterval(20);
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            if (username != null) {
                options.setUserName(username);
                options.setPassword(password.getBytes());
            }

            MqttAsyncClient client = new MqttAsyncClient(broker, clientId, persistence);
            IMqttToken token = client.connect(options);
            token.waitForCompletion(20000L);

            if (token.isComplete()) {
                log.debug("Connecting to broker: {} username: {} password: {} ", broker, username, password);
//                log.debug("Auth username: {} password: {} Connected!", username, password);
                topic = prefix.concat("/".concat(device).concat("/").concat(topic));
                String jsonString = gson.toJson(Map.of("session", session, "data", content), LinkedHashMap.class);
                byte[] payload = jsonString.getBytes();

//        if (client == null || !client.isConnected()) {
//            this.connect();
//        }

                try {
                    if (client.isConnected()) {
                        MqttMessage message = new MqttMessage(payload);
                        message.setQos(qos);
                        token = client.publish(topic, message);
                        token.waitForCompletion(30000L);
                        if (token.isComplete()) {
//                            log.info(String.format("Published topic: %s, message: %s", topic, message));
                            log.info(String.format("Publishing topic: %s, message: %s", topic, message));
                        }
                    }
                } finally {
                    client.close();
                }
            }
        } catch (MqttException e) {
            log.debug("Mqtt reason: " + e.getReasonCode() + ", cause: " + e.getCause() + ", msg: " + e.getMessage());
        }

    }

    public String topic(String prefix, String device, String service) {
        if (prefix == null) {
            prefix = this.prefix;
        }

        return String.format("%s/%s/%s", prefix, device, service);
    }

    public String message(int sequence, String session, String segment, String audio, String state) {
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        LocalDateTime dateTime = LocalDateTime.now();
        String formattedDateTime = dateTime.format(formatter);
        String jsonString = gson.toJson(Map.of("sequence", sequence, "session", session, "segment", segment, "audio", audio, "time", formattedDateTime, "state", state), LinkedHashMap.class);
        return jsonString;
    }

    public String message(String session, String content) {
//        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
//        LocalDateTime dateTime = LocalDateTime.now();
//        String formattedDateTime = dateTime.format(formatter);
        String jsonString = gson.toJson(Map.of("session", session, "data", content), LinkedHashMap.class);
        return jsonString;
    }

    public void publish(String topic, String message) {
//        Thread.currentThread().setName(this.getClass().getSimpleName());
        MqttConnectionOptions options = new MqttConnectionOptions();
        options.setCleanStart(false);
//        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(30);
        options.setKeepAliveInterval(20);
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            if (username != null) {
                options.setUserName(username);
                options.setPassword(password.getBytes());
            }

            MqttAsyncClient client = new MqttAsyncClient(broker, clientId, persistence);
            IMqttToken token = client.connect(options);
            token.waitForCompletion(20000L);

            if (token.isComplete()) {
                log.debug("Connecting to broker: {} username: {} password: {} ", broker, username, password);
                byte[] payload = message.getBytes();

                try {
                    if (client.isConnected()) {
                        MqttMessage mqttMessage = new MqttMessage(payload);
                        mqttMessage.setQos(qos);
                        token = client.publish(topic, mqttMessage);
                        token.waitForCompletion(30000L);
                        if (token.isComplete()) {
                            log.info(String.format("Publishing topic: %s, message: %s", topic, mqttMessage));
                        }
                    }
                } finally {
                    client.close();
                }
            }
        } catch (MqttException e) {
            log.debug("Mqtt reason: " + e.getReasonCode() + ", cause: " + e.getCause() + ", msg: " + e.getMessage());
        }

    }

    public void publish(String topic, String session, String content) {
        this.publish(topic, this.message(session, content));
    }
}