]> git.basschouten.com Git - openhab-addons.git/blob
1c11e462857a2485ef3aea37c78da2fd8a6bebe4
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.util.Collection;
16 import java.util.HashSet;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.Set;
20 import java.util.concurrent.CompletableFuture;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.stream.Collectors;
27
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.eclipse.jdt.annotation.NonNullByDefault;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
32 import org.openhab.binding.mqtt.generic.values.OnOffValue;
33 import org.openhab.binding.mqtt.generic.values.Value;
34 import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
35 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
36 import org.openhab.core.library.types.OnOffType;
37 import org.openhab.core.thing.Bridge;
38 import org.openhab.core.thing.ChannelGroupUID;
39 import org.openhab.core.thing.ChannelUID;
40 import org.openhab.core.thing.Thing;
41 import org.openhab.core.thing.ThingStatus;
42 import org.openhab.core.thing.ThingStatusDetail;
43 import org.openhab.core.thing.ThingStatusInfo;
44 import org.openhab.core.thing.binding.BaseThingHandler;
45 import org.openhab.core.types.Command;
46 import org.openhab.core.types.RefreshType;
47 import org.openhab.core.types.State;
48 import org.openhab.core.types.UnDefType;
49 import org.openhab.core.util.UIDUtils;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 /**
54  * Base class for MQTT thing handlers. If you are going to implement an MQTT convention, you probably
55  * want to inherit from here.
56  *
57  * <p>
58  * This base class will make sure you get a working {@link MqttBrokerConnection}, you will be informed
59  * when to start your subscriptions ({@link #start(MqttBrokerConnection)}) and when to free your resources
60  * because of a lost connection ({@link AbstractMQTTThingHandler#stop()}).
61  *
62  * <p>
63  * If you inherit from this base class, you must use {@link ChannelState} to (a) keep a cached channel value,
64  * (b) to link a MQTT topic value to a channel value ("MQTT state topic") and (c) to have a secondary MQTT topic
65  * where any changes to the {@link ChannelState} are send to ("MQTT command topic").
66  *
67  * <p>
68  * You are expected to keep your channel data structure organized in a way, to resolve a {@link ChannelUID} to
69  * the corresponding {@link ChannelState} in {@link #getChannelState(ChannelUID)}.
70  *
71  * <p>
72  * To inform the framework of changed values, received via MQTT, a {@link ChannelState} calls a listener callback.
73  * While setting up your {@link ChannelState} you would set the callback to your thing handler,
74  * because this base class implements {@link ChannelStateUpdateListener}.
75  *
76  * @author David Graeff - Initial contribution
77  */
78 @NonNullByDefault
79 public abstract class AbstractMQTTThingHandler extends BaseThingHandler
80         implements ChannelStateUpdateListener, AvailabilityTracker {
81     private final Logger logger = LoggerFactory.getLogger(AbstractMQTTThingHandler.class);
82     // Timeout for the entire tree parsing and subscription
83     private final int subscribeTimeout;
84
85     protected @Nullable MqttBrokerConnection connection;
86
87     private AtomicBoolean messageReceived = new AtomicBoolean(false);
88     private Map<String, @Nullable ChannelState> availabilityStates = new ConcurrentHashMap<>();
89
90     public AbstractMQTTThingHandler(Thing thing, int subscribeTimeout) {
91         super(thing);
92         this.subscribeTimeout = subscribeTimeout;
93     }
94
95     /**
96      * Return the channel state for the given channelUID.
97      *
98      * @param channelUID The channelUID
99      * @return A channel state. May be null.
100      */
101     public abstract @Nullable ChannelState getChannelState(ChannelUID channelUID);
102
103     /**
104      * Start the topic discovery and subscribe to all channel state topics on all {@link ChannelState}s.
105      * Put the thing ONLINE on success otherwise complete the returned future exceptionally.
106      *
107      * @param connection A started broker connection
108      * @return A future that completes normal on success and exceptionally on any errors.
109      */
110     protected abstract CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection);
111
112     /**
113      * Called when the MQTT connection disappeared.
114      * You should clean up all resources that depend on a working connection.
115      */
116     protected void stop() {
117         clearAllAvailabilityTopics();
118         resetMessageReceived();
119     }
120
121     @Override
122     public void handleCommand(ChannelUID channelUID, Command command) {
123         if (connection == null) {
124             return;
125         }
126
127         final @Nullable ChannelState data = getChannelState(channelUID);
128
129         if (data == null) {
130             logger.warn("Channel {} not supported!", channelUID);
131             return;
132         }
133
134         if (command instanceof RefreshType) {
135             State state = data.getCache().getChannelState();
136             if (state instanceof UnDefType) {
137                 logger.debug("Channel {} received REFRESH but no value cached, ignoring", channelUID);
138             } else {
139                 updateState(channelUID, state);
140             }
141             return;
142         }
143
144         if (data.isReadOnly()) {
145             logger.trace("Channel {} is a read-only channel, ignoring command {}", channelUID, command);
146             return;
147         }
148
149         final CompletableFuture<Boolean> future = data.publishValue(command);
150         future.handle((v, ex) -> {
151             if (ex != null) {
152                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, ex.getLocalizedMessage());
153                 logger.debug("Failed publishing value {} to topic {}: {}", command, data.getCommandTopic(),
154                         ex.getMessage());
155             } else {
156                 logger.debug("Successfully published value {} to topic {}", command, data.getCommandTopic());
157             }
158             return null;
159         });
160     }
161
162     @Override
163     public void bridgeStatusChanged(ThingStatusInfo bridgeStatusInfo) {
164         if (bridgeStatusInfo.getStatus() == ThingStatus.OFFLINE) {
165             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE);
166             stop();
167             connection = null;
168             return;
169         }
170         if (bridgeStatusInfo.getStatus() != ThingStatus.ONLINE) {
171             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
172             stop();
173             return;
174         }
175
176         AbstractBrokerHandler h = getBridgeHandler();
177         if (h == null) {
178             resetMessageReceived();
179             logger.warn("Bridge handler not found!");
180             return;
181         }
182
183         final MqttBrokerConnection connection;
184         try {
185             connection = h.getConnectionAsync().get(500, TimeUnit.MILLISECONDS);
186         } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
187             resetMessageReceived();
188             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_UNINITIALIZED,
189                     "Bridge handler has no valid broker connection!");
190             return;
191         }
192         this.connection = connection;
193
194         // Start up (subscribe to MQTT topics). Limit with a timeout and catch exceptions.
195         // We do not set the thing to ONLINE here in the AbstractBase, that is the responsibility of a derived
196         // class.
197         try {
198             Collection<CompletableFuture<@Nullable Void>> futures = availabilityStates.values().stream().map(s -> {
199                 if (s != null) {
200                     return s.start(connection, scheduler, 0);
201                 }
202                 return CompletableFuture.allOf();
203             }).collect(Collectors.toList());
204
205             futures.add(start(connection));
206
207             futures.stream().collect(FutureCollector.allOf()).exceptionally(e -> {
208                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getLocalizedMessage());
209                 return null;
210             }).get(subscribeTimeout, TimeUnit.MILLISECONDS);
211         } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
212             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
213                     "Did not receive all required topics");
214         }
215     }
216
217     /**
218      * Return the bride handler. The bridge is from the "MQTT" bundle.
219      */
220     public @Nullable AbstractBrokerHandler getBridgeHandler() {
221         Bridge bridge = getBridge();
222         if (bridge == null) {
223             return null;
224         }
225         return (AbstractBrokerHandler) bridge.getHandler();
226     }
227
228     /**
229      * Return the bridge status.
230      */
231     public ThingStatusInfo getBridgeStatus() {
232         Bridge b = getBridge();
233         if (b != null) {
234             return b.getStatusInfo();
235         } else {
236             return new ThingStatusInfo(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null);
237         }
238     }
239
240     @Override
241     public void initialize() {
242         bridgeStatusChanged(getBridgeStatus());
243     }
244
245     @Override
246     public void handleRemoval() {
247         stop();
248         super.handleRemoval();
249     }
250
251     @Override
252     public void dispose() {
253         stop();
254         try {
255             unsubscribeAll().get(500, TimeUnit.MILLISECONDS);
256         } catch (InterruptedException | ExecutionException | TimeoutException e) {
257             logger.warn("unsubscription on disposal failed for {}: ", thing.getUID(), e);
258         }
259         connection = null;
260         super.dispose();
261     }
262
263     /**
264      * this method must unsubscribe all topics used by this thing handler
265      *
266      * @return
267      */
268     public abstract CompletableFuture<Void> unsubscribeAll();
269
270     @Override
271     public void updateChannelState(ChannelUID channelUID, State value) {
272         if (messageReceived.compareAndSet(false, true)) {
273             calculateThingStatus();
274         }
275         super.updateState(channelUID, value);
276     }
277
278     @Override
279     public void triggerChannel(ChannelUID channelUID, String event) {
280         if (messageReceived.compareAndSet(false, true)) {
281             calculateThingStatus();
282         }
283         super.triggerChannel(channelUID, event);
284     }
285
286     @Override
287     public void postChannelCommand(ChannelUID channelUID, Command command) {
288         postCommand(channelUID, command);
289     }
290
291     public @Nullable MqttBrokerConnection getConnection() {
292         return connection;
293     }
294
295     /**
296      * This is for tests only to inject a broker connection.
297      *
298      * @param connection MQTT Broker connection
299      */
300     public void setConnection(MqttBrokerConnection connection) {
301         this.connection = connection;
302     }
303
304     @Override
305     public void addAvailabilityTopic(String availability_topic, String payload_available,
306             String payload_not_available) {
307         availabilityStates.computeIfAbsent(availability_topic, topic -> {
308             Value value = new OnOffValue(payload_available, payload_not_available);
309             ChannelGroupUID groupUID = new ChannelGroupUID(getThing().getUID(), "availablility");
310             ChannelUID channelUID = new ChannelUID(groupUID, UIDUtils.encode(topic));
311             ChannelState state = new ChannelState(ChannelConfigBuilder.create().withStateTopic(topic).build(),
312                     channelUID, value, new ChannelStateUpdateListener() {
313                         @Override
314                         public void updateChannelState(ChannelUID channelUID, State value) {
315                             calculateThingStatus();
316                         }
317
318                         @Override
319                         public void triggerChannel(ChannelUID channelUID, String eventPayload) {
320                         }
321
322                         @Override
323                         public void postChannelCommand(ChannelUID channelUID, Command value) {
324                         }
325                     });
326             MqttBrokerConnection connection = getConnection();
327             if (connection != null) {
328                 state.start(connection, scheduler, 0);
329             }
330
331             return state;
332         });
333     }
334
335     @Override
336     public void removeAvailabilityTopic(@NonNull String availability_topic) {
337         availabilityStates.computeIfPresent(availability_topic, (topic, state) -> {
338             if (connection != null && state != null) {
339                 state.stop();
340             }
341             return null;
342         });
343     }
344
345     @Override
346     public void clearAllAvailabilityTopics() {
347         Set<String> topics = new HashSet<>(availabilityStates.keySet());
348         topics.forEach(this::removeAvailabilityTopic);
349     }
350
351     @Override
352     public void resetMessageReceived() {
353         if (messageReceived.compareAndSet(true, false)) {
354             calculateThingStatus();
355         }
356     }
357
358     protected void calculateThingStatus() {
359         final Optional<Boolean> availabilityTopicsSeen;
360
361         if (availabilityStates.isEmpty()) {
362             availabilityTopicsSeen = Optional.empty();
363         } else {
364             availabilityTopicsSeen = Optional.of(availabilityStates.values().stream().allMatch(
365                     c -> c != null && OnOffType.ON.equals(c.getCache().getChannelState().as(OnOffType.class))));
366         }
367         updateThingStatus(messageReceived.get(), availabilityTopicsSeen);
368     }
369
370     protected abstract void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen);
371 }