]> git.basschouten.com Git - openhab-addons.git/blob
c6432e1ad13a2ad866cd13812d5136372653f127
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.homeassistant.internal;
14
15 import java.lang.ref.WeakReference;
16 import java.util.HashSet;
17 import java.util.Set;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22 import java.util.stream.Collectors;
23
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.binding.mqtt.generic.AvailabilityTracker;
27 import org.openhab.binding.mqtt.generic.ChannelStateUpdateListener;
28 import org.openhab.binding.mqtt.generic.TransformationServiceProvider;
29 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
30 import org.openhab.binding.mqtt.homeassistant.internal.component.AbstractComponent;
31 import org.openhab.binding.mqtt.homeassistant.internal.component.ComponentFactory;
32 import org.openhab.binding.mqtt.homeassistant.internal.exception.ConfigurationException;
33 import org.openhab.binding.mqtt.homeassistant.internal.exception.UnsupportedComponentException;
34 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
35 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
36 import org.openhab.core.thing.ThingUID;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import com.google.gson.Gson;
41
42 /**
43  * Responsible for subscribing to the HomeAssistant MQTT components wildcard topic, either
44  * in a time limited discovery mode or as a background discovery.
45  *
46  * @author David Graeff - Initial contribution
47  */
48 @NonNullByDefault
49 public class DiscoverComponents implements MqttMessageSubscriber {
50     private final Logger logger = LoggerFactory.getLogger(DiscoverComponents.class);
51     private final ThingUID thingUID;
52     private final ScheduledExecutorService scheduler;
53     private final ChannelStateUpdateListener updateListener;
54     private final AvailabilityTracker tracker;
55     private final TransformationServiceProvider transformationServiceProvider;
56
57     protected final CompletableFuture<@Nullable Void> discoverFinishedFuture = new CompletableFuture<>();
58     private final Gson gson;
59
60     private @Nullable ScheduledFuture<?> stopDiscoveryFuture;
61     private WeakReference<@Nullable MqttBrokerConnection> connectionRef = new WeakReference<>(null);
62     protected @Nullable ComponentDiscovered discoveredListener;
63     private int discoverTime;
64     private Set<String> topics = new HashSet<>();
65
66     /**
67      * Implement this to get notified of new components
68      */
69     public static interface ComponentDiscovered {
70         void componentDiscovered(HaID homeAssistantTopicID, AbstractComponent<?> component);
71     }
72
73     /**
74      * Create a new discovery object.
75      *
76      * @param thingUID The Thing UID to perform the discovery for.
77      * @param scheduler A scheduler for timeouts
78      * @param channelStateUpdateListener Channel update listener. Usually the handler.
79      */
80     public DiscoverComponents(ThingUID thingUID, ScheduledExecutorService scheduler,
81             ChannelStateUpdateListener channelStateUpdateListener, AvailabilityTracker tracker, Gson gson,
82             TransformationServiceProvider transformationServiceProvider) {
83         this.thingUID = thingUID;
84         this.scheduler = scheduler;
85         this.updateListener = channelStateUpdateListener;
86         this.gson = gson;
87         this.tracker = tracker;
88         this.transformationServiceProvider = transformationServiceProvider;
89     }
90
91     @Override
92     public void processMessage(String topic, byte[] payload) {
93         if (!topic.endsWith("/config")) {
94             return;
95         }
96
97         HaID haID = new HaID(topic);
98         String config = new String(payload);
99         AbstractComponent<?> component = null;
100
101         if (config.length() > 0) {
102             try {
103                 component = ComponentFactory.createComponent(thingUID, haID, config, updateListener, tracker, scheduler,
104                         gson, transformationServiceProvider);
105                 component.setConfigSeen();
106
107                 logger.trace("Found HomeAssistant component {}", haID);
108
109                 if (discoveredListener != null) {
110                     discoveredListener.componentDiscovered(haID, component);
111                 }
112             } catch (UnsupportedComponentException e) {
113                 logger.warn("HomeAssistant discover error: thing {} component type is unsupported: {}", haID.objectID,
114                         haID.component);
115             } catch (ConfigurationException e) {
116                 logger.warn("HomeAssistant discover error: invalid configuration of thing {} component {}: {}",
117                         haID.objectID, haID.component, e.getMessage());
118             } catch (Exception e) {
119                 logger.warn("HomeAssistant discover error: {}", e.getMessage());
120             }
121         } else {
122             logger.warn("Configuration of HomeAssistant thing {} is empty", haID.objectID);
123         }
124     }
125
126     /**
127      * Start a components discovery.
128      *
129      * <p>
130      * We need to consider the case that the remote client is using node IDs
131      * and also the case that no node IDs are used.
132      * </p>
133      *
134      * @param connection A MQTT broker connection
135      * @param discoverTime The time in milliseconds for the discovery to run. Can be 0 to disable the
136      *            timeout.
137      *            You need to call {@link #stopDiscovery()} at some
138      *            point in that case.
139      * @param topicDescriptions Contains the object-id (=device id) and potentially a node-id as well.
140      * @param componentsDiscoveredListener Listener for results
141      * @return A future that completes normally after the given time in milliseconds or exceptionally on any error.
142      *         Completes immediately if the timeout is disabled.
143      */
144     public CompletableFuture<@Nullable Void> startDiscovery(MqttBrokerConnection connection, int discoverTime,
145             Set<HaID> topicDescriptions, ComponentDiscovered componentsDiscoveredListener) {
146         this.topics = topicDescriptions.stream().map(id -> id.getTopic("config")).collect(Collectors.toSet());
147         this.discoverTime = discoverTime;
148         this.discoveredListener = componentsDiscoveredListener;
149         this.connectionRef = new WeakReference<>(connection);
150
151         // Subscribe to the wildcard topic and start receive MQTT retained topics
152         this.topics.stream().map(t -> connection.subscribe(t, this)).collect(FutureCollector.allOf())
153                 .thenRun(this::subscribeSuccess).exceptionally(this::subscribeFail);
154
155         return discoverFinishedFuture;
156     }
157
158     private void subscribeSuccess() {
159         final MqttBrokerConnection connection = connectionRef.get();
160         // Set up a scheduled future that will stop the discovery after the given time
161         if (connection != null && discoverTime > 0) {
162             this.stopDiscoveryFuture = scheduler.schedule(() -> {
163                 this.stopDiscoveryFuture = null;
164                 this.topics.stream().forEach(t -> connection.unsubscribe(t, this));
165                 this.discoveredListener = null;
166                 discoverFinishedFuture.complete(null);
167             }, discoverTime, TimeUnit.MILLISECONDS);
168         } else {
169             // No timeout -> complete immediately
170             discoverFinishedFuture.complete(null);
171         }
172     }
173
174     private @Nullable Void subscribeFail(Throwable e) {
175         final ScheduledFuture<?> scheduledFuture = this.stopDiscoveryFuture;
176         if (scheduledFuture != null) { // Cancel timeout
177             scheduledFuture.cancel(false);
178             this.stopDiscoveryFuture = null;
179         }
180         this.discoveredListener = null;
181         final MqttBrokerConnection connection = connectionRef.get();
182         if (connection != null) {
183             this.topics.stream().forEach(t -> connection.unsubscribe(t, this));
184             connectionRef.clear();
185         }
186         discoverFinishedFuture.completeExceptionally(e);
187         return null;
188     }
189
190     /**
191      * Stops an ongoing discovery or do nothing if no discovery is running.
192      */
193     public void stopDiscovery() {
194         subscribeFail(new Throwable("Stopped"));
195     }
196 }