]> git.basschouten.com Git - openhab-addons.git/commitdiff
[mqtt] Fix avail topics subscription after Brige Restart (#9851)
authorFlorian Albrecht <11006895+albrechtf@users.noreply.github.com>
Sat, 11 Dec 2021 16:57:13 +0000 (17:57 +0100)
committerGitHub <noreply@github.com>
Sat, 11 Dec 2021 16:57:13 +0000 (17:57 +0100)
Fixes #9850

Signed-off-by: Florian Albrecht <cw.florian.albrecht@gmx.de>
bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/AbstractMQTTThingHandler.java
bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/internal/handler/GenericMQTTThingHandler.java
bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/internal/handler/GenericThingHandlerTests.java

index 1c11e462857a2485ef3aea37c78da2fd8a6bebe4..14ecdc8510dd29a634e5e5c0eb11e01938fa5ac1 100644 (file)
@@ -12,7 +12,6 @@
  */
 package org.openhab.binding.mqtt.generic;
 
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
@@ -23,12 +22,10 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
 
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
-import org.openhab.binding.mqtt.generic.utils.FutureCollector;
 import org.openhab.binding.mqtt.generic.values.OnOffValue;
 import org.openhab.binding.mqtt.generic.values.Value;
 import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
@@ -195,19 +192,7 @@ public abstract class AbstractMQTTThingHandler extends BaseThingHandler
         // We do not set the thing to ONLINE here in the AbstractBase, that is the responsibility of a derived
         // class.
         try {
-            Collection<CompletableFuture<@Nullable Void>> futures = availabilityStates.values().stream().map(s -> {
-                if (s != null) {
-                    return s.start(connection, scheduler, 0);
-                }
-                return CompletableFuture.allOf();
-            }).collect(Collectors.toList());
-
-            futures.add(start(connection));
-
-            futures.stream().collect(FutureCollector.allOf()).exceptionally(e -> {
-                updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getLocalizedMessage());
-                return null;
-            }).get(subscribeTimeout, TimeUnit.MILLISECONDS);
+            start(connection).get(subscribeTimeout, TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
                     "Did not receive all required topics");
index 8cc05429ec1b652f440d61f73ddecd43bcb53fd1..c157506fb8b52ea512abc32391e3597a1b8d2d63 100644 (file)
@@ -85,6 +85,9 @@ public class GenericMQTTThingHandler extends AbstractMQTTThingHandler implements
      */
     @Override
     protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
+        // availability topics are also started asynchronously, so no problem here
+        clearAllAvailabilityTopics();
+        initializeAvailabilityTopicsFromConfig();
         return channelStateByChannelUID.values().stream().map(c -> c.start(connection, scheduler, 0))
                 .collect(FutureCollector.allOf()).thenRun(this::calculateThingStatus);
     }
@@ -142,15 +145,7 @@ public class GenericMQTTThingHandler extends AbstractMQTTThingHandler implements
 
     @Override
     public void initialize() {
-        GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class);
-
-        String availabilityTopic = config.availabilityTopic;
-
-        if (availabilityTopic != null) {
-            addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable);
-        } else {
-            clearAllAvailabilityTopics();
-        }
+        initializeAvailabilityTopicsFromConfig();
 
         List<ChannelUID> configErrors = new ArrayList<>();
         for (Channel channel : thing.getChannels()) {
@@ -193,4 +188,16 @@ public class GenericMQTTThingHandler extends AbstractMQTTThingHandler implements
             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
         }
     }
+
+    private void initializeAvailabilityTopicsFromConfig() {
+        GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class);
+
+        String availabilityTopic = config.availabilityTopic;
+
+        if (availabilityTopic != null) {
+            addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable);
+        } else {
+            clearAllAvailabilityTopics();
+        }
+    }
 }
index 13cf9946e686cff0ce176d6a415f774587f43c57..d9f41464ae22998b3c9f5cfb802f1fa0370816eb 100644 (file)
@@ -192,4 +192,16 @@ public class GenericThingHandlerTests {
         verify(callback).stateUpdated(eq(textChannelUID), argThat(arg -> "UPDATE".equals(arg.toString())));
         assertThat(textValue.getChannelState().toString(), is("UPDATE"));
     }
+
+    @Test
+    public void handleBridgeStatusChange() {
+        Configuration config = new Configuration();
+        config.put("availabilityTopic", "test/LWT");
+        when(thing.getConfiguration()).thenReturn(config);
+        thingHandler.initialize();
+        thingHandler
+                .bridgeStatusChanged(new ThingStatusInfo(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null));
+        thingHandler.bridgeStatusChanged(new ThingStatusInfo(ThingStatus.ONLINE, ThingStatusDetail.NONE, null));
+        verify(connection, times(2)).subscribe(eq("test/LWT"), any());
+    }
 }