import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
private @Nullable ScheduledFuture<?> scheduledStop;
+ private AtomicBoolean isSubscribed;
+
public AbstractMQTTDiscovery(@Nullable Set<ThingTypeUID> supportedThingTypes, int timeout,
boolean backgroundDiscoveryEnabledByDefault, String baseTopic) {
super(supportedThingTypes, 0, backgroundDiscoveryEnabledByDefault);
this.subscribeTopic = baseTopic;
this.timeout = timeout;
+ isSubscribed = new AtomicBoolean(false);
+ }
+
+ /**
+ * Only subscribe if we were not already subscribed
+ */
+ private void subscribe() {
+ if (!isSubscribed.getAndSet(true)) {
+ getDiscoveryService().subscribe(this, subscribeTopic);
+ }
+ }
+
+ /**
+ * Only unsubscribe if we were already subscribed
+ */
+ private void unSubscribe() {
+ if (isSubscribed.getAndSet(false)) {
+ getDiscoveryService().unsubscribe(this);
+ }
}
/**
return;
}
resetTimeout();
- getDiscoveryService().subscribe(this, subscribeTopic);
+ subscribe();
}
@Override
return;
}
stopTimeout();
- getDiscoveryService().unsubscribe(this);
+ unSubscribe();
super.stopScan();
}
protected void startBackgroundDiscovery() {
// Remove results that are restored after a restart
removeOlderResults(new Date().getTime());
- getDiscoveryService().subscribe(this, subscribeTopic);
+ subscribe();
}
@Override
protected void stopBackgroundDiscovery() {
- getDiscoveryService().unsubscribe(this);
+ unSubscribe();
}
}
*/
package org.openhab.binding.mqtt.internal;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Component(service = { ThingHandlerFactory.class,
MQTTTopicDiscoveryService.class }, configurationPid = "MqttBrokerHandlerFactory")
public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements MQTTTopicDiscoveryService {
+
private static final Set<ThingTypeUID> SUPPORTED_THING_TYPES_UIDS = Stream
.of(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, MqttBindingConstants.BRIDGE_TYPE_BROKER)
.collect(Collectors.toSet());
+
private final Logger logger = LoggerFactory.getLogger(MqttBrokerHandlerFactory.class);
- protected final Map<String, List<MQTTTopicDiscoveryParticipant>> discoveryTopics = new HashMap<>();
+
+ /**
+ * This Map provides a lookup between a Topic string (key) and a Set of MQTTTopicDiscoveryParticipants (value),
+ * where the Set itself is a list of participants which are subscribed to the respective Topic.
+ */
+ protected final Map<String, Set<MQTTTopicDiscoveryParticipant>> discoveryTopics = new ConcurrentHashMap<>();
+
+ /**
+ * This Set contains a list of all the Broker handlers that have been created by this factory
+ */
protected final Set<AbstractBrokerHandler> handlers = Collections
.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
}
/**
- * Add the given broker connection to all listeners.
+ * Add the given broker handler to the list of known handlers. And then iterate over all topics and their respective
+ * list of listeners, and register the respective new listener and topic with the given new broker handler.
*/
protected void createdHandler(AbstractBrokerHandler handler) {
handlers.add(handler);
- discoveryTopics.forEach((topic, listenerList) -> {
- listenerList.forEach(listener -> {
+ discoveryTopics.forEach((topic, listeners) -> {
+ listeners.forEach(listener -> {
handler.registerDiscoveryListener(listener, topic);
});
});
/**
* This factory also implements {@link MQTTTopicDiscoveryService} so consumers can subscribe to
* a MQTT topic that is registered on all available broker connections.
+ *
+ * Checks each topic, and if the listener is not already in the listener list for that topic, adds itself from that
+ * list, and registers itself and the respective topic with all the known brokers.
*/
@Override
+ @SuppressWarnings("null")
public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) {
- List<MQTTTopicDiscoveryParticipant> listenerList = discoveryTopics.computeIfAbsent(topic,
- t -> new ArrayList<>());
- listenerList.add(listener);
- handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic));
+ Set<MQTTTopicDiscoveryParticipant> listeners = discoveryTopics.computeIfAbsent(topic,
+ t -> ConcurrentHashMap.newKeySet());
+ if (listeners.add(listener)) {
+ handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic));
+ }
}
/**
- * Unsubscribe a listener from all available broker connections.
+ * This factory also implements {@link MQTTTopicDiscoveryService} so consumers can unsubscribe from
+ * a MQTT topic that is registered on all available broker connections.
+ *
+ * Checks each topic, and if the listener is in the listener list for that topic, removes itself from that list, and
+ * unregisters itself and the respective topic from all the known brokers.
*/
@Override
- @SuppressWarnings("null")
public void unsubscribe(MQTTTopicDiscoveryParticipant listener) {
- discoveryTopics.forEach((topic, listenerList) -> {
- listenerList.remove(listener);
- handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic));
+ discoveryTopics.forEach((topic, listeners) -> {
+ if (listeners.remove(listener)) {
+ handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic));
+ }
});
}