]> git.basschouten.com Git - openhab-addons.git/commitdiff
Fix MQTT transport deprecations (#8570)
authorWouter Born <github@maindrain.net>
Thu, 24 Sep 2020 20:50:46 +0000 (22:50 +0200)
committerGitHub <noreply@github.com>
Thu, 24 Sep 2020 20:50:46 +0000 (13:50 -0700)
Signed-off-by: Wouter Born <github@maindrain.net>
bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/ChannelStateTests.java
bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/internal/handler/GenericThingHandlerTests.java
bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/action/MQTTActions.java
bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/MQTTTopicDiscoveryService.java
bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/handler/BrokerHandler.java
bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/handler/SystemBrokerHandler.java
bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java
bundles/org.openhab.io.mqttembeddedbroker/src/test/java/org/openhab/io/mqttembeddedbroker/internal/MqttEmbeddedBrokerServiceTest.java

index a4c4e46d4fd800a879ee4935a596237c6034148d..3d0d9876e8428c9138a17b01f6231db9ed681e39 100644 (file)
@@ -79,7 +79,6 @@ public class ChannelStateTests {
         doReturn(voidFutureComplete).when(connection).unsubscribeAll();
         doReturn(CompletableFuture.completedFuture(true)).when(connection).subscribe(any(), any());
         doReturn(CompletableFuture.completedFuture(true)).when(connection).unsubscribe(any(), any());
-        doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any());
         doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any(), anyInt(),
                 anyBoolean());
 
index 18a1948906aba095a22dacd69d064e335aad47d3..25571e96127a257be344adb4e89399bf2e46af41 100644 (file)
@@ -84,7 +84,6 @@ public class GenericThingHandlerTests {
         doReturn(voidFutureComplete).when(connection).unsubscribeAll();
         doReturn(CompletableFuture.completedFuture(true)).when(connection).subscribe(any(), any());
         doReturn(CompletableFuture.completedFuture(true)).when(connection).unsubscribe(any(), any());
-        doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any());
         doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any(), anyInt(),
                 anyBoolean());
 
index 93d89bd71a872b63462733f13b7bf1c8d31e6b25..339236a729cb777dc552b48e008b60a1adfe0e74 100644 (file)
@@ -63,9 +63,9 @@ public class MQTTActions implements ThingActions, IMQTTActions {
     @Override
     @RuleAction(label = "@text/actionLabel", description = "@text/actionDesc")
     public void publishMQTT(
-            @ActionInput(name = "topic", label = "@text/actionInputTopicLabel", description = "@text/actionInputTopicDesc") @Nullable String topic,
-            @ActionInput(name = "value", label = "@text/actionInputValueLabel", description = "@text/actionInputValueDesc") @Nullable String value,
-            @ActionInput(name = "retain", label = "@text/actionInputRetainlabel", description = "@text/actionInputRetainDesc") @Nullable Boolean retain) {
+            @ActionInput(name = "topic", label = "@text/actionInputTopicLabel", description = "@text/actionInputTopicDesc") @Nullable final String topic,
+            @ActionInput(name = "value", label = "@text/actionInputValueLabel", description = "@text/actionInputValueDesc") @Nullable final String value,
+            @ActionInput(name = "retain", label = "@text/actionInputRetainlabel", description = "@text/actionInputRetainDesc") @Nullable final Boolean retain) {
         AbstractBrokerHandler brokerHandler = handler;
         if (brokerHandler == null) {
             logger.warn("MQTT Action service ThingHandler is null!");
@@ -84,15 +84,14 @@ public class MQTTActions implements ThingActions, IMQTTActions {
             logger.debug("skipping MQTT publishing of value '{}' as topic is null.", value);
             return;
         }
-        if (retain == null) {
-            retain = connection.isRetain();
-        }
-        connection.publish(topic, value.getBytes(), connection.getQos(), retain).thenRun(() -> {
-            logger.debug("MQTT publish to {} performed", topic);
-        }).exceptionally(e -> {
-            logger.warn("MQTT publish to {} failed!", topic);
-            return null;
-        });
+
+        connection.publish(topic, value.getBytes(), connection.getQos(), retain != null && retain.booleanValue())
+                .thenRun(() -> {
+                    logger.debug("MQTT publish to {} performed", topic);
+                }).exceptionally(e -> {
+                    logger.warn("MQTT publish to {} failed!", topic);
+                    return null;
+                });
     }
 
     public static void publishMQTT(@Nullable ThingActions actions, @Nullable String topic, @Nullable String value) {
index a6843f46721c700c3f410a9da1e10c8ee93e6c8c..a2361f10084e15a77d92af0f5f3aac7ba1b4e59c 100644 (file)
@@ -42,8 +42,10 @@ public interface MQTTTopicDiscoveryService {
     /**
      * Publish a message to all connected brokers
      *
-     * @param topic The topic to publish on
-     * @param payload The message to publish
+     * @param topic The topic
+     * @param payload The message payload
+     * @param qos The quality of service for this message
+     * @param retain Set to true to retain the message on the broker
      */
-    void publish(String topic, byte[] payload);
+    void publish(String topic, byte[] payload, int qos, boolean retain);
 }
index 3e9f7f61ab676f59c38ecf0ec13fb79b78831153..e186fda76ecfca5aa23b5c59e9f71a75605a6e8f 100644 (file)
@@ -223,8 +223,6 @@ public class BrokerHandler extends AbstractBrokerHandler implements PinnedCallba
             connection.setTimeoutExecutor(scheduler, TIMEOUT_DEFAULT);
         }
 
-        connection.setRetain(config.retainMessages);
-
         return connection;
     }
 
index b70dd77383ff9ed74108be314cc098027bc00fb6..65a892ab9d801e55043ea170a243f45ccfe523f2 100644 (file)
@@ -69,7 +69,6 @@ public class SystemBrokerHandler extends AbstractBrokerHandler implements MqttSe
             properties.put(PROPERTY_PASSWORD, password);
         }
         properties.put(PROPERTY_QOS, String.valueOf(connection.getQos()));
-        properties.put(PROPERTY_RETAIN, String.valueOf(connection.isRetain()));
         final MqttWillAndTestament lastWill = connection.getLastWill();
         if (lastWill != null) {
             properties.put(PROPERTY_LAST_WILL, lastWill.toString());
index 1f98ed65d4f02eb9ecdad8b0a798c9d7dc7ed5fb..804783eb247c35829c07882d38f107eacb2b0f80 100644 (file)
  */
 package org.openhab.binding.mqtt.internal;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.WeakHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -127,10 +133,10 @@ public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements
     }
 
     @Override
-    public void publish(String topic, byte[] payload) {
+    public void publish(String topic, byte[] payload, int qos, boolean retain) {
         handlers.forEach(handler -> {
             handler.getConnectionAsync().thenAccept(connection -> {
-                connection.publish(topic, payload);
+                connection.publish(topic, payload, qos, retain);
             });
         });
     }
index 92eb4f408ad1317968995612bc3f1932cc1af704..19ee819bc51e7b65379786e68832b67a6f269b3e 100644 (file)
@@ -42,6 +42,7 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.openhab.core.OpenHAB;
 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
+import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.MqttVersion;
 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.Protocol;
 import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
 import org.openhab.core.io.transport.mqtt.MqttConnectionState;
@@ -115,8 +116,8 @@ public class MqttEmbeddedBrokerServiceTest extends JavaTest {
         verify(service).addBrokerConnection(anyString(), eq(c));
 
         // Connect with a second connection but wrong credentials
-        MqttBrokerConnection wrongCredentials = new MqttBrokerConnection(Protocol.TCP, c.getHost(), c.getPort(), false,
-                "wrongCred");
+        MqttBrokerConnection wrongCredentials = new MqttBrokerConnection(Protocol.TCP, MqttVersion.V3, c.getHost(),
+                c.getPort(), false, "wrongCred");
         wrongCredentials.setCredentials("someUser", "somePassword");
 
         if (wrongCredentials.start().get()) {
@@ -126,8 +127,8 @@ public class MqttEmbeddedBrokerServiceTest extends JavaTest {
         wrongCredentials.stop().get();
 
         // Connect with a second connection but correct credentials
-        MqttBrokerConnection correctCredentials = new MqttBrokerConnection(Protocol.TCP, c.getHost(), c.getPort(),
-                false, "correctCred");
+        MqttBrokerConnection correctCredentials = new MqttBrokerConnection(Protocol.TCP, MqttVersion.V3, c.getHost(),
+                c.getPort(), false, "correctCred");
         correctCredentials.setCredentials(c.getUser(), c.getPassword());
 
         if (!correctCredentials.start().get()) {