130.3. Android Mqtt v5 例子
package cn.netkiller.ropeway.service;
import android.app.Service;
import android.content.Intent;
import android.os.IBinder;
import android.util.Log;
import org.eclipse.paho.mqttv5.client.IMqttDeliveryToken;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
public class MyService extends Service {
MqttAsyncClient mqttAsyncClient;
IMqttToken token;
String topic = "/netkiller/test";
String content = "Helloworld!!!";
int qos = 2;
String broker = "tcp://broker.emqx.io:1883";
String clientId = "JavaSample" + System.currentTimeMillis();
public MyService() {
try {
MemoryPersistence persistence = new MemoryPersistence();
mqttAsyncClient = new MqttAsyncClient(broker, clientId, persistence);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
}
}
@Override
public IBinder onBind(Intent intent) {
// TODO: Return the communication channel to the service.
throw new UnsupportedOperationException("Not yet implemented");
}
@Override
public void onCreate() {
super.onCreate();
Log.d("Service", "onCreate() executed");
try {
MqttConnectionOptions mqttConnectionOptions = new MqttConnectionOptions();
mqttConnectionOptions.setCleanStart(false);
mqttConnectionOptions.setAutomaticReconnect(true);
Log.d("Service", "Connecting to broker: " + broker);
token = mqttAsyncClient.connect(mqttConnectionOptions);
token.waitForCompletion();
if (token.isComplete()) {
Log.d("Service", "Connected");
mqttAsyncClient.subscribe("/netkiller/message", qos);
}
} catch (MqttException e) {
throw new RuntimeException(e);
}
mqttAsyncClient.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse disconnectResponse) {
}
@Override
public void mqttErrorOccurred(MqttException exception) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String msg = new String(message.getPayload());
Log.d("Service", String.format("接收消息 Id:%s, Topic: %s, QoS: %s, Message: %s, ", message.getId(), topic, message.getQos(), message.toString()));
}
@Override
public void deliveryComplete(IMqttToken token) {
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
// if (reconnect) {
// try {
// mqttAsyncClient.subscribe("/netkiller/message", qos);
// } catch (MqttException e) {
// throw new RuntimeException(e);
// }
// }
}
@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {
}
public void deliveryComplete(IMqttDeliveryToken arg0) {
try {
System.out.println(arg0.getMessage());
} catch (MqttException e1) {
e1.printStackTrace();
}
}
public void connectionLost(Throwable err) {
System.out.println("连接丢失");
System.out.println(err.getMessage());
}
});
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
Log.d("Service", "onStartCommand() executed");
try {
Log.d("Service", "Publishing message: " + content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
token = mqttAsyncClient.publish(topic, message);
token.waitForCompletion();
} catch (MqttException e) {
throw new RuntimeException(e);
}
return super.onStartCommand(intent, flags, startId);
}
@Override
public void onDestroy() {
super.onDestroy();
try {
if (mqttAsyncClient.isConnected()) {
mqttAsyncClient.close();
Log.d("Service", "Close client.");
}
} catch (MqttException e) {
Log.d("Service", "Disconnected");
}
Log.d("Service", "onDestroy() executed");
}
}