2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.homeassistant.internal;
15 import java.lang.ref.WeakReference;
16 import java.util.HashSet;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22 import java.util.stream.Collectors;
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.binding.mqtt.generic.AvailabilityTracker;
27 import org.openhab.binding.mqtt.generic.ChannelStateUpdateListener;
28 import org.openhab.binding.mqtt.generic.TransformationServiceProvider;
29 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
30 import org.openhab.binding.mqtt.homeassistant.internal.component.AbstractComponent;
31 import org.openhab.binding.mqtt.homeassistant.internal.component.ComponentFactory;
32 import org.openhab.binding.mqtt.homeassistant.internal.exception.ConfigurationException;
33 import org.openhab.binding.mqtt.homeassistant.internal.exception.UnsupportedComponentException;
34 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
35 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
36 import org.openhab.core.thing.ThingUID;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 import com.google.gson.Gson;
43 * Responsible for subscribing to the HomeAssistant MQTT components wildcard topic, either
44 * in a time limited discovery mode or as a background discovery.
46 * @author David Graeff - Initial contribution
49 public class DiscoverComponents implements MqttMessageSubscriber {
50 private final Logger logger = LoggerFactory.getLogger(DiscoverComponents.class);
51 private final ThingUID thingUID;
52 private final ScheduledExecutorService scheduler;
53 private final ChannelStateUpdateListener updateListener;
54 private final AvailabilityTracker tracker;
55 private final TransformationServiceProvider transformationServiceProvider;
57 protected final CompletableFuture<@Nullable Void> discoverFinishedFuture = new CompletableFuture<>();
58 private final Gson gson;
60 private @Nullable ScheduledFuture<?> stopDiscoveryFuture;
61 private WeakReference<@Nullable MqttBrokerConnection> connectionRef = new WeakReference<>(null);
62 protected @Nullable ComponentDiscovered discoveredListener;
63 private int discoverTime;
64 private Set<String> topics = new HashSet<>();
67 * Implement this to get notified of new components
69 public static interface ComponentDiscovered {
70 void componentDiscovered(HaID homeAssistantTopicID, AbstractComponent<?> component);
74 * Create a new discovery object.
76 * @param thingUID The Thing UID to perform the discovery for.
77 * @param scheduler A scheduler for timeouts
78 * @param channelStateUpdateListener Channel update listener. Usually the handler.
80 public DiscoverComponents(ThingUID thingUID, ScheduledExecutorService scheduler,
81 ChannelStateUpdateListener channelStateUpdateListener, AvailabilityTracker tracker, Gson gson,
82 TransformationServiceProvider transformationServiceProvider) {
83 this.thingUID = thingUID;
84 this.scheduler = scheduler;
85 this.updateListener = channelStateUpdateListener;
87 this.tracker = tracker;
88 this.transformationServiceProvider = transformationServiceProvider;
92 public void processMessage(String topic, byte[] payload) {
93 if (!topic.endsWith("/config")) {
97 HaID haID = new HaID(topic);
98 String config = new String(payload);
99 AbstractComponent<?> component = null;
101 if (config.length() > 0) {
103 component = ComponentFactory.createComponent(thingUID, haID, config, updateListener, tracker, scheduler,
104 gson, transformationServiceProvider);
105 component.setConfigSeen();
107 logger.trace("Found HomeAssistant component {}", haID);
109 if (discoveredListener != null) {
110 discoveredListener.componentDiscovered(haID, component);
112 } catch (UnsupportedComponentException e) {
113 logger.warn("HomeAssistant discover error: thing {} component type is unsupported: {}", haID.objectID,
115 } catch (ConfigurationException e) {
116 logger.warn("HomeAssistant discover error: invalid configuration of thing {} component {}: {}",
117 haID.objectID, haID.component, e.getMessage());
118 } catch (Exception e) {
119 logger.warn("HomeAssistant discover error: {}", e.getMessage());
122 logger.warn("Configuration of HomeAssistant thing {} is empty", haID.objectID);
127 * Start a components discovery.
130 * We need to consider the case that the remote client is using node IDs
131 * and also the case that no node IDs are used.
134 * @param connection A MQTT broker connection
135 * @param discoverTime The time in milliseconds for the discovery to run. Can be 0 to disable the
137 * You need to call {@link #stopDiscovery()} at some
138 * point in that case.
139 * @param topicDescriptions Contains the object-id (=device id) and potentially a node-id as well.
140 * @param componentsDiscoveredListener Listener for results
141 * @return A future that completes normally after the given time in milliseconds or exceptionally on any error.
142 * Completes immediately if the timeout is disabled.
144 public CompletableFuture<@Nullable Void> startDiscovery(MqttBrokerConnection connection, int discoverTime,
145 Set<HaID> topicDescriptions, ComponentDiscovered componentsDiscoveredListener) {
146 this.topics = topicDescriptions.stream().map(id -> id.getTopic("config")).collect(Collectors.toSet());
147 this.discoverTime = discoverTime;
148 this.discoveredListener = componentsDiscoveredListener;
149 this.connectionRef = new WeakReference<>(connection);
151 // Subscribe to the wildcard topic and start receive MQTT retained topics
152 this.topics.stream().map(t -> connection.subscribe(t, this)).collect(FutureCollector.allOf())
153 .thenRun(this::subscribeSuccess).exceptionally(this::subscribeFail);
155 return discoverFinishedFuture;
158 private void subscribeSuccess() {
159 final MqttBrokerConnection connection = connectionRef.get();
160 // Set up a scheduled future that will stop the discovery after the given time
161 if (connection != null && discoverTime > 0) {
162 this.stopDiscoveryFuture = scheduler.schedule(() -> {
163 this.stopDiscoveryFuture = null;
164 this.topics.stream().forEach(t -> connection.unsubscribe(t, this));
165 this.discoveredListener = null;
166 discoverFinishedFuture.complete(null);
167 }, discoverTime, TimeUnit.MILLISECONDS);
169 // No timeout -> complete immediately
170 discoverFinishedFuture.complete(null);
174 private @Nullable Void subscribeFail(Throwable e) {
175 final ScheduledFuture<?> scheduledFuture = this.stopDiscoveryFuture;
176 if (scheduledFuture != null) { // Cancel timeout
177 scheduledFuture.cancel(false);
178 this.stopDiscoveryFuture = null;
180 this.discoveredListener = null;
181 final MqttBrokerConnection connection = connectionRef.get();
182 if (connection != null) {
183 this.topics.stream().forEach(t -> connection.unsubscribe(t, this));
184 connectionRef.clear();
186 discoverFinishedFuture.completeExceptionally(e);
191 * Stops an ongoing discovery or do nothing if no discovery is running.
193 public void stopDiscovery() {
194 subscribeFail(new Throwable("Stopped"));