| 知乎专栏 |
https://docs.spring.io/spring-integration/reference/mqtt.html
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.2.1</version>
</dependency>
package cn.netkiller.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.handler.annotation.Header;
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class MqttConfiguration {
@Value("${mqtt.broker}")
private String broker;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
private final int qos = 2;
@Value("${mqtt.topic.prefix}")
private String prefix;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{broker});
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setAutomaticReconnect(true);
options.setCleanSession(false);
options.setKeepAliveInterval(20);
// options.setConnectionTimeout(30000);
// options.setExecutorServiceTimeout(30000);
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
}
}
@Autowired
private MqttConfiguration.MyGateway myGateway;
@GetMapping("/mqtt")
public Mono<String> mqtt(Principal principal) {
myGateway.sendToMqtt("Test");
myGateway.sendToMqtt("neo", "Test");
return Mono.empty();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}
<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.mqttv5.client -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
代码
package cn.netkiller.component;
import com.google.gson.Gson;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.LinkedHashMap;
import java.util.Map;
@Component
@Slf4j
@Data
public class MqttComponent {
private final String clientId = "netkiller-" + System.currentTimeMillis();
private final Gson gson = new Gson();
private final int qos = 2;
@Value("${mqtt.topic.prefix}")
private String prefix;
private String broker;
private String username;
private String password;
public MqttComponent() {
}
public MqttComponent(@Value("${mqtt.broker}") String broker, @Value("${mqtt.username}") String username, @Value("${mqtt.password}") String password) {
this.broker = broker;
this.username = username;
this.password = password;
}
public void publish(String topic, String device, String session, String content) {
// Thread.currentThread().setName(this.getClass().getSimpleName());
MqttConnectionOptions options = new MqttConnectionOptions();
options.setCleanStart(false);
// options.setAutomaticReconnect(true);
options.setConnectionTimeout(30);
options.setKeepAliveInterval(20);
MemoryPersistence persistence = new MemoryPersistence();
try {
if (username != null) {
options.setUserName(username);
options.setPassword(password.getBytes());
}
MqttAsyncClient client = new MqttAsyncClient(broker, clientId, persistence);
IMqttToken token = client.connect(options);
token.waitForCompletion(20000L);
if (token.isComplete()) {
log.debug("Connecting to broker: {} username: {} password: {} ", broker, username, password);
// log.debug("Auth username: {} password: {} Connected!", username, password);
topic = prefix.concat("/".concat(device).concat("/").concat(topic));
String jsonString = gson.toJson(Map.of("session", session, "data", content), LinkedHashMap.class);
byte[] payload = jsonString.getBytes();
// if (client == null || !client.isConnected()) {
// this.connect();
// }
try {
if (client.isConnected()) {
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
token = client.publish(topic, message);
token.waitForCompletion(30000L);
if (token.isComplete()) {
// log.info(String.format("Published topic: %s, message: %s", topic, message));
log.info(String.format("Publishing topic: %s, message: %s", topic, message));
}
}
} finally {
client.close();
}
}
} catch (MqttException e) {
log.debug("Mqtt reason: " + e.getReasonCode() + ", cause: " + e.getCause() + ", msg: " + e.getMessage());
}
}
public String topic(String prefix, String device, String service) {
if (prefix == null) {
prefix = this.prefix;
}
return String.format("%s/%s/%s", prefix, device, service);
}
public String message(int sequence, String session, String segment, String audio, String state) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime dateTime = LocalDateTime.now();
String formattedDateTime = dateTime.format(formatter);
String jsonString = gson.toJson(Map.of("sequence", sequence, "session", session, "segment", segment, "audio", audio, "time", formattedDateTime, "state", state), LinkedHashMap.class);
return jsonString;
}
public String message(String session, String content) {
// DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// LocalDateTime dateTime = LocalDateTime.now();
// String formattedDateTime = dateTime.format(formatter);
String jsonString = gson.toJson(Map.of("session", session, "data", content), LinkedHashMap.class);
return jsonString;
}
public void publish(String topic, String message) {
// Thread.currentThread().setName(this.getClass().getSimpleName());
MqttConnectionOptions options = new MqttConnectionOptions();
options.setCleanStart(false);
// options.setAutomaticReconnect(true);
options.setConnectionTimeout(30);
options.setKeepAliveInterval(20);
MemoryPersistence persistence = new MemoryPersistence();
try {
if (username != null) {
options.setUserName(username);
options.setPassword(password.getBytes());
}
MqttAsyncClient client = new MqttAsyncClient(broker, clientId, persistence);
IMqttToken token = client.connect(options);
token.waitForCompletion(20000L);
if (token.isComplete()) {
log.debug("Connecting to broker: {} username: {} password: {} ", broker, username, password);
byte[] payload = message.getBytes();
try {
if (client.isConnected()) {
MqttMessage mqttMessage = new MqttMessage(payload);
mqttMessage.setQos(qos);
token = client.publish(topic, mqttMessage);
token.waitForCompletion(30000L);
if (token.isComplete()) {
log.info(String.format("Publishing topic: %s, message: %s", topic, mqttMessage));
}
}
} finally {
client.close();
}
}
} catch (MqttException e) {
log.debug("Mqtt reason: " + e.getReasonCode() + ", cause: " + e.getCause() + ", msg: " + e.getMessage());
}
}
public void publish(String topic, String session, String content) {
this.publish(topic, this.message(session, content));
}
}