]> git.basschouten.com Git - openhab-addons.git/blob
a24c5cabb418a206f1160ed4ded7653f07e96111
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.homeassistant.internal;
14
15 import java.lang.ref.WeakReference;
16 import java.util.HashSet;
17 import java.util.Set;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22 import java.util.stream.Collectors;
23
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.binding.mqtt.generic.AvailabilityTracker;
27 import org.openhab.binding.mqtt.generic.ChannelStateUpdateListener;
28 import org.openhab.binding.mqtt.generic.TransformationServiceProvider;
29 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
30 import org.openhab.binding.mqtt.homeassistant.internal.component.AbstractComponent;
31 import org.openhab.binding.mqtt.homeassistant.internal.component.ComponentFactory;
32 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
33 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
34 import org.openhab.core.thing.ThingUID;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import com.google.gson.Gson;
39
40 /**
41  * Responsible for subscribing to the HomeAssistant MQTT components wildcard topic, either
42  * in a time limited discovery mode or as a background discovery.
43  *
44  * @author David Graeff - Initial contribution
45  */
46 @NonNullByDefault
47 public class DiscoverComponents implements MqttMessageSubscriber {
48     private final Logger logger = LoggerFactory.getLogger(DiscoverComponents.class);
49     private final ThingUID thingUID;
50     private final ScheduledExecutorService scheduler;
51     private final ChannelStateUpdateListener updateListener;
52     private final AvailabilityTracker tracker;
53     private final TransformationServiceProvider transformationServiceProvider;
54
55     protected final CompletableFuture<@Nullable Void> discoverFinishedFuture = new CompletableFuture<>();
56     private final Gson gson;
57
58     private @Nullable ScheduledFuture<?> stopDiscoveryFuture;
59     private WeakReference<@Nullable MqttBrokerConnection> connectionRef = new WeakReference<>(null);
60     protected @Nullable ComponentDiscovered discoveredListener;
61     private int discoverTime;
62     private Set<String> topics = new HashSet<>();
63
64     /**
65      * Implement this to get notified of new components
66      */
67     public static interface ComponentDiscovered {
68         void componentDiscovered(HaID homeAssistantTopicID, AbstractComponent<?> component);
69     }
70
71     /**
72      * Create a new discovery object.
73      *
74      * @param thingUID The Thing UID to perform the discovery for.
75      * @param scheduler A scheduler for timeouts
76      * @param channelStateUpdateListener Channel update listener. Usually the handler.
77      */
78     public DiscoverComponents(ThingUID thingUID, ScheduledExecutorService scheduler,
79             ChannelStateUpdateListener channelStateUpdateListener, AvailabilityTracker tracker, Gson gson,
80             TransformationServiceProvider transformationServiceProvider) {
81         this.thingUID = thingUID;
82         this.scheduler = scheduler;
83         this.updateListener = channelStateUpdateListener;
84         this.gson = gson;
85         this.tracker = tracker;
86         this.transformationServiceProvider = transformationServiceProvider;
87     }
88
89     @Override
90     public void processMessage(String topic, byte[] payload) {
91         if (!topic.endsWith("/config")) {
92             return;
93         }
94
95         HaID haID = new HaID(topic);
96         String config = new String(payload);
97         AbstractComponent<?> component = null;
98
99         if (config.length() > 0) {
100             component = ComponentFactory.createComponent(thingUID, haID, config, updateListener, tracker, scheduler,
101                     gson, transformationServiceProvider);
102         }
103         if (component != null) {
104             component.setConfigSeen();
105
106             logger.trace("Found HomeAssistant thing {} component {}", haID.objectID, haID.component);
107             if (discoveredListener != null) {
108                 discoveredListener.componentDiscovered(haID, component);
109             }
110         } else {
111             logger.debug("Configuration of HomeAssistant thing {} invalid: {}", haID.objectID, config);
112         }
113     }
114
115     /**
116      * Start a components discovery.
117      *
118      * <p>
119      * We need to consider the case that the remote client is using node IDs
120      * and also the case that no node IDs are used.
121      * </p>
122      *
123      * @param connection A MQTT broker connection
124      * @param discoverTime The time in milliseconds for the discovery to run. Can be 0 to disable the
125      *            timeout.
126      *            You need to call {@link #stopDiscovery()} at some
127      *            point in that case.
128      * @param topicDescriptions Contains the object-id (=device id) and potentially a node-id as well.
129      * @param componentsDiscoveredListener Listener for results
130      * @return A future that completes normally after the given time in milliseconds or exceptionally on any error.
131      *         Completes immediately if the timeout is disabled.
132      */
133     public CompletableFuture<@Nullable Void> startDiscovery(MqttBrokerConnection connection, int discoverTime,
134             Set<HaID> topicDescriptions, ComponentDiscovered componentsDiscoveredListener) {
135         this.topics = topicDescriptions.stream().map(id -> id.getTopic("config")).collect(Collectors.toSet());
136         this.discoverTime = discoverTime;
137         this.discoveredListener = componentsDiscoveredListener;
138         this.connectionRef = new WeakReference<>(connection);
139
140         // Subscribe to the wildcard topic and start receive MQTT retained topics
141         this.topics.parallelStream().map(t -> connection.subscribe(t, this)).collect(FutureCollector.allOf())
142                 .thenRun(this::subscribeSuccess).exceptionally(this::subscribeFail);
143
144         return discoverFinishedFuture;
145     }
146
147     private void subscribeSuccess() {
148         final MqttBrokerConnection connection = connectionRef.get();
149         // Set up a scheduled future that will stop the discovery after the given time
150         if (connection != null && discoverTime > 0) {
151             this.stopDiscoveryFuture = scheduler.schedule(() -> {
152                 this.stopDiscoveryFuture = null;
153                 this.topics.parallelStream().forEach(t -> connection.unsubscribe(t, this));
154                 this.discoveredListener = null;
155                 discoverFinishedFuture.complete(null);
156             }, discoverTime, TimeUnit.MILLISECONDS);
157         } else {
158             // No timeout -> complete immediately
159             discoverFinishedFuture.complete(null);
160         }
161     }
162
163     private @Nullable Void subscribeFail(Throwable e) {
164         final ScheduledFuture<?> scheduledFuture = this.stopDiscoveryFuture;
165         if (scheduledFuture != null) { // Cancel timeout
166             scheduledFuture.cancel(false);
167             this.stopDiscoveryFuture = null;
168         }
169         this.discoveredListener = null;
170         final MqttBrokerConnection connection = connectionRef.get();
171         if (connection != null) {
172             this.topics.parallelStream().forEach(t -> connection.unsubscribe(t, this));
173             connectionRef.clear();
174         }
175         discoverFinishedFuture.completeExceptionally(e);
176         return null;
177     }
178
179     /**
180      * Stops an ongoing discovery or do nothing if no discovery is running.
181      */
182     public void stopDiscovery() {
183         subscribeFail(new Throwable("Stopped"));
184     }
185 }