]> git.basschouten.com Git - openhab-addons.git/commitdiff
[mqtt] Discovery services shall not unsubscribe unless they have already subscribed...
authorAndrew Fiddian-Green <software@whitebear.ch>
Sun, 8 Aug 2021 12:18:01 +0000 (13:18 +0100)
committerGitHub <noreply@github.com>
Sun, 8 Aug 2021 12:18:01 +0000 (14:18 +0200)
* [mqqt] do not allow unsubscribe unless already subscribed

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/AbstractMQTTDiscovery.java
bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java

index 5b83320b1de9e344d25d42ff6e3f71fec1701e9d..0732387a221ffb51d7360922c26e4fba25b3111d 100644 (file)
@@ -16,6 +16,7 @@ import java.util.Date;
 import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
@@ -48,11 +49,32 @@ public abstract class AbstractMQTTDiscovery extends AbstractDiscoveryService imp
 
     private @Nullable ScheduledFuture<?> scheduledStop;
 
+    private AtomicBoolean isSubscribed;
+
     public AbstractMQTTDiscovery(@Nullable Set<ThingTypeUID> supportedThingTypes, int timeout,
             boolean backgroundDiscoveryEnabledByDefault, String baseTopic) {
         super(supportedThingTypes, 0, backgroundDiscoveryEnabledByDefault);
         this.subscribeTopic = baseTopic;
         this.timeout = timeout;
+        isSubscribed = new AtomicBoolean(false);
+    }
+
+    /**
+     * Only subscribe if we were not already subscribed
+     */
+    private void subscribe() {
+        if (!isSubscribed.getAndSet(true)) {
+            getDiscoveryService().subscribe(this, subscribeTopic);
+        }
+    }
+
+    /**
+     * Only unsubscribe if we were already subscribed
+     */
+    private void unSubscribe() {
+        if (isSubscribed.getAndSet(false)) {
+            getDiscoveryService().unsubscribe(this);
+        }
     }
 
     /**
@@ -94,7 +116,7 @@ public abstract class AbstractMQTTDiscovery extends AbstractDiscoveryService imp
             return;
         }
         resetTimeout();
-        getDiscoveryService().subscribe(this, subscribeTopic);
+        subscribe();
     }
 
     @Override
@@ -104,7 +126,7 @@ public abstract class AbstractMQTTDiscovery extends AbstractDiscoveryService imp
             return;
         }
         stopTimeout();
-        getDiscoveryService().unsubscribe(this);
+        unSubscribe();
         super.stopScan();
     }
 
@@ -118,11 +140,11 @@ public abstract class AbstractMQTTDiscovery extends AbstractDiscoveryService imp
     protected void startBackgroundDiscovery() {
         // Remove results that are restored after a restart
         removeOlderResults(new Date().getTime());
-        getDiscoveryService().subscribe(this, subscribeTopic);
+        subscribe();
     }
 
     @Override
     protected void stopBackgroundDiscovery() {
-        getDiscoveryService().unsubscribe(this);
+        unSubscribe();
     }
 }
index 5a8eec82b3ef13b524b63248b9573759fad81702..491f6e1b12ce68b15b825709badee35c2f1c60f2 100644 (file)
  */
 package org.openhab.binding.mqtt.internal;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -54,11 +52,22 @@ import org.slf4j.LoggerFactory;
 @Component(service = { ThingHandlerFactory.class,
         MQTTTopicDiscoveryService.class }, configurationPid = "MqttBrokerHandlerFactory")
 public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements MQTTTopicDiscoveryService {
+
     private static final Set<ThingTypeUID> SUPPORTED_THING_TYPES_UIDS = Stream
             .of(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, MqttBindingConstants.BRIDGE_TYPE_BROKER)
             .collect(Collectors.toSet());
+
     private final Logger logger = LoggerFactory.getLogger(MqttBrokerHandlerFactory.class);
-    protected final Map<String, List<MQTTTopicDiscoveryParticipant>> discoveryTopics = new HashMap<>();
+
+    /**
+     * This Map provides a lookup between a Topic string (key) and a Set of MQTTTopicDiscoveryParticipants (value),
+     * where the Set itself is a list of participants which are subscribed to the respective Topic.
+     */
+    protected final Map<String, Set<MQTTTopicDiscoveryParticipant>> discoveryTopics = new ConcurrentHashMap<>();
+
+    /**
+     * This Set contains a list of all the Broker handlers that have been created by this factory
+     */
     protected final Set<AbstractBrokerHandler> handlers = Collections
             .synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
 
@@ -75,12 +84,13 @@ public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements
     }
 
     /**
-     * Add the given broker connection to all listeners.
+     * Add the given broker handler to the list of known handlers. And then iterate over all topics and their respective
+     * list of listeners, and register the respective new listener and topic with the given new broker handler.
      */
     protected void createdHandler(AbstractBrokerHandler handler) {
         handlers.add(handler);
-        discoveryTopics.forEach((topic, listenerList) -> {
-            listenerList.forEach(listener -> {
+        discoveryTopics.forEach((topic, listeners) -> {
+            listeners.forEach(listener -> {
                 handler.registerDiscoveryListener(listener, topic);
             });
         });
@@ -111,24 +121,33 @@ public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements
     /**
      * This factory also implements {@link MQTTTopicDiscoveryService} so consumers can subscribe to
      * a MQTT topic that is registered on all available broker connections.
+     *
+     * Checks each topic, and if the listener is not already in the listener list for that topic, adds itself from that
+     * list, and registers itself and the respective topic with all the known brokers.
      */
     @Override
+    @SuppressWarnings("null")
     public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) {
-        List<MQTTTopicDiscoveryParticipant> listenerList = discoveryTopics.computeIfAbsent(topic,
-                t -> new ArrayList<>());
-        listenerList.add(listener);
-        handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic));
+        Set<MQTTTopicDiscoveryParticipant> listeners = discoveryTopics.computeIfAbsent(topic,
+                t -> ConcurrentHashMap.newKeySet());
+        if (listeners.add(listener)) {
+            handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic));
+        }
     }
 
     /**
-     * Unsubscribe a listener from all available broker connections.
+     * This factory also implements {@link MQTTTopicDiscoveryService} so consumers can unsubscribe from
+     * a MQTT topic that is registered on all available broker connections.
+     *
+     * Checks each topic, and if the listener is in the listener list for that topic, removes itself from that list, and
+     * unregisters itself and the respective topic from all the known brokers.
      */
     @Override
-    @SuppressWarnings("null")
     public void unsubscribe(MQTTTopicDiscoveryParticipant listener) {
-        discoveryTopics.forEach((topic, listenerList) -> {
-            listenerList.remove(listener);
-            handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic));
+        discoveryTopics.forEach((topic, listeners) -> {
+            if (listeners.remove(listener)) {
+                handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic));
+            }
         });
     }