Вопрос:
Я пытаюсь изучить MQTT и играть с ним. Я написал клиент для публикации и клиент для подписки (см. Ниже).
Если я запустил клиент подписки, а затем запустил клиент публикации (в то время как подписка запущена), тогда все работает нормально. Мой клиент подписки получает сообщения, опубликованные в теме правильно.
Однако, если я сначала запускаю клиента публикации (т.е. Публикую сообщение в теме), а затем запускаю клиент подписки, я не получаю никаких сообщений.
Другими словами, если я сначала подключаюсь к суб-клиенту, а затем публикую сообщения с клиентом pub при подключении суб-клиента, все работает нормально. Однако, если я сначала опубликую сообщение, а затем подключитесь к моему суб-клиенту, я не получаю никаких сообщений. Мое понимание заключается в том, что я должен получать сообщения, которые присутствуют в теме, когда я подключаюсь к клиенту и подписаться на эту тему.
Я обнаружил, что похоже на аналогичную проблему: не удается получить уже опубликованные сообщения на тему подписки на mqtt paho, хотя этот случай выглядит несколько иначе. Я попытался изменить различные настройки QoS или флаг ClearSession, но это не решило проблему.
Любая помощь будет оценена!
Опубликовать клиент:
public class MQTT_Client_Pub implements MqttCallback{ MqttClient client; public static void main(String[] args) { new MQTT_Client_Pub().mqttPub(); } public void mqttPub(){ try { this.setConnection(); // Connect client.connect(); // Create new message MqttMessage message = new MqttMessage(); message.setPayload(«A single test message from b112358».getBytes()); message.setQos(0); // Publish message to a topic System.out.println(«Publishing a message.»); client.publish(«pahodemo/test/b112358», message); // Disconnect client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } catch (Exception e){ e.printStackTrace(); } } public void setConnection(){ // Client try{ client = new MqttClient(«tcp://iot.eclipse.org:1883», «mqtt_test_b112358_pub»); } catch (MqttException e) { e.printStackTrace(); } // Connection Options MqttConnectOptions options = new MqttConnectOptions(); // Set the will options.setWill(«pahodemo/clienterrors», «CRASHED — CONNECTION NOT CLOSED CLEANLY».getBytes(),2,true); // Set Callback client.setCallback(this); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println(«Message delivered to the broker.»); } public void messageArrived(String topic, MqttMessage message) throws Exception {} public void connectionLost(Throwable cause) {}
}
Подписываться Клиенту:
public class MQTT_Client_Sub implements MqttCallback{ MqttClient client; public static void main(String[] args) { new MQTT_Client_Sub().mqttSub(); } public void mqttSub(){ try { // Set connection this.setConnection(); // Connect client.connect(); // Subscribe client.subscribe(«pahodemo/test/b112358», 0); // Disconnect // client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } public void setConnection(){ try { // Client client = new MqttClient(«tcp://iot.eclipse.org:1883», «mqtt_test_b112358_sub»); } catch (MqttException e) { e.printStackTrace(); } // Connection Options MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); // Set the will options.setWill(«pahodemo/clienterrors», «CRASHED — CONNECTION NOT CLOSED CLEANLY».getBytes(),2,true); client.setCallback(this); } public void deliveryComplete(IMqttDeliveryToken token) {} public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(«Message Arrived: » + message.getPayload() + » on tipic: » + topic.getBytes()); } public void connectionLost(Throwable cause) {}
}
Ответ №1
Сообщения, опубликованные до подключения подписчика и подписки, будут доставлены только в следующих двух ситуациях
-
Когда сообщения были опубликованы как сохраненные. Это означает, что последнее сообщение по этой теме будет доставлено новому абоненту в момент подписки. Это приведет только к последнему сообщению.
-
Если клиент был ранее подключен и подписан, он был отключен. Затем публикуется сообщение, и клиент снова соединяется с cleansession = false. (и когда подписка на QOS1/2)
Это может помочь: