]> git.basschouten.com Git - openhab-addons.git/blob
4c9dbc46913e5c274721755aed47038d0405ed4a
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.util.Collection;
16 import java.util.HashSet;
17 import java.util.Map;
18 import java.util.Set;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.stream.Collectors;
26
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.NonNullByDefault;
29 import org.eclipse.jdt.annotation.Nullable;
30 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
31 import org.openhab.binding.mqtt.generic.values.OnOffValue;
32 import org.openhab.binding.mqtt.generic.values.Value;
33 import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
34 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
35 import org.openhab.core.library.types.OnOffType;
36 import org.openhab.core.thing.Bridge;
37 import org.openhab.core.thing.ChannelGroupUID;
38 import org.openhab.core.thing.ChannelUID;
39 import org.openhab.core.thing.Thing;
40 import org.openhab.core.thing.ThingStatus;
41 import org.openhab.core.thing.ThingStatusDetail;
42 import org.openhab.core.thing.ThingStatusInfo;
43 import org.openhab.core.thing.binding.BaseThingHandler;
44 import org.openhab.core.types.Command;
45 import org.openhab.core.types.RefreshType;
46 import org.openhab.core.types.State;
47 import org.openhab.core.types.UnDefType;
48 import org.openhab.core.util.UIDUtils;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 /**
53  * Base class for MQTT thing handlers. If you are going to implement an MQTT convention, you probably
54  * want to inherit from here.
55  *
56  * <p>
57  * This base class will make sure you get a working {@link MqttBrokerConnection}, you will be informed
58  * when to start your subscriptions ({@link #start(MqttBrokerConnection)}) and when to free your resources
59  * because of a lost connection ({@link AbstractMQTTThingHandler#stop()}).
60  *
61  * <p>
62  * If you inherit from this base class, you must use {@link ChannelState} to (a) keep a cached channel value,
63  * (b) to link a MQTT topic value to a channel value ("MQTT state topic") and (c) to have a secondary MQTT topic
64  * where any changes to the {@link ChannelState} are send to ("MQTT command topic").
65  *
66  * <p>
67  * You are expected to keep your channel data structure organized in a way, to resolve a {@link ChannelUID} to
68  * the corresponding {@link ChannelState} in {@link #getChannelState(ChannelUID)}.
69  *
70  * <p>
71  * To inform the framework of changed values, received via MQTT, a {@link ChannelState} calls a listener callback.
72  * While setting up your {@link ChannelState} you would set the callback to your thing handler,
73  * because this base class implements {@link ChannelStateUpdateListener}.
74  *
75  * @author David Graeff - Initial contribution
76  */
77 @NonNullByDefault
78 public abstract class AbstractMQTTThingHandler extends BaseThingHandler
79         implements ChannelStateUpdateListener, AvailabilityTracker {
80     private final Logger logger = LoggerFactory.getLogger(AbstractMQTTThingHandler.class);
81     // Timeout for the entire tree parsing and subscription
82     private final int subscribeTimeout;
83
84     protected @Nullable MqttBrokerConnection connection;
85
86     private AtomicBoolean messageReceived = new AtomicBoolean(false);
87     private Map<String, @Nullable ChannelState> availabilityStates = new ConcurrentHashMap<>();
88
89     public AbstractMQTTThingHandler(Thing thing, int subscribeTimeout) {
90         super(thing);
91         this.subscribeTimeout = subscribeTimeout;
92     }
93
94     /**
95      * Return the channel state for the given channelUID.
96      *
97      * @param channelUID The channelUID
98      * @return A channel state. May be null.
99      */
100     public abstract @Nullable ChannelState getChannelState(ChannelUID channelUID);
101
102     /**
103      * Start the topic discovery and subscribe to all channel state topics on all {@link ChannelState}s.
104      * Put the thing ONLINE on success otherwise complete the returned future exceptionally.
105      *
106      * @param connection A started broker connection
107      * @return A future that completes normal on success and exceptionally on any errors.
108      */
109     protected abstract CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection);
110
111     /**
112      * Called when the MQTT connection disappeared.
113      * You should clean up all resources that depend on a working connection.
114      */
115     protected void stop() {
116         clearAllAvailabilityTopics();
117         resetMessageReceived();
118     }
119
120     @Override
121     public void handleCommand(ChannelUID channelUID, Command command) {
122         if (connection == null) {
123             return;
124         }
125
126         final @Nullable ChannelState data = getChannelState(channelUID);
127
128         if (data == null) {
129             logger.warn("Channel {} not supported!", channelUID);
130             return;
131         }
132
133         if (command instanceof RefreshType) {
134             State state = data.getCache().getChannelState();
135             if (state instanceof UnDefType) {
136                 logger.debug("Channel {} received REFRESH but no value cached, ignoring", channelUID);
137             } else {
138                 updateState(channelUID, state);
139             }
140             return;
141         }
142
143         if (data.isReadOnly()) {
144             logger.trace("Channel {} is a read-only channel, ignoring command {}", channelUID, command);
145             return;
146         }
147
148         final CompletableFuture<Boolean> future = data.publishValue(command);
149         future.handle((v, ex) -> {
150             if (ex != null) {
151                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, ex.getLocalizedMessage());
152                 logger.debug("Failed publishing value {} to topic {}: {}", command, data.getCommandTopic(),
153                         ex.getMessage());
154             } else {
155                 logger.debug("Successfully published value {} to topic {}", command, data.getCommandTopic());
156             }
157             return null;
158         });
159     }
160
161     @Override
162     public void bridgeStatusChanged(ThingStatusInfo bridgeStatusInfo) {
163         if (bridgeStatusInfo.getStatus() == ThingStatus.OFFLINE) {
164             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE);
165             stop();
166             connection = null;
167             return;
168         }
169         if (bridgeStatusInfo.getStatus() != ThingStatus.ONLINE) {
170             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
171             stop();
172             return;
173         }
174
175         AbstractBrokerHandler h = getBridgeHandler();
176         if (h == null) {
177             resetMessageReceived();
178             logger.warn("Bridge handler not found!");
179             return;
180         }
181
182         final MqttBrokerConnection connection;
183         try {
184             connection = h.getConnectionAsync().get(500, TimeUnit.MILLISECONDS);
185         } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
186             resetMessageReceived();
187             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_UNINITIALIZED,
188                     "Bridge handler has no valid broker connection!");
189             return;
190         }
191         this.connection = connection;
192
193         // Start up (subscribe to MQTT topics). Limit with a timeout and catch exceptions.
194         // We do not set the thing to ONLINE here in the AbstractBase, that is the responsibility of a derived
195         // class.
196         try {
197             Collection<CompletableFuture<@Nullable Void>> futures = availabilityStates.values().stream().map(s -> {
198                 if (s != null) {
199                     return s.start(connection, scheduler, 0);
200                 }
201                 return CompletableFuture.allOf();
202             }).collect(Collectors.toList());
203
204             futures.add(start(connection));
205
206             futures.stream().collect(FutureCollector.allOf()).exceptionally(e -> {
207                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getLocalizedMessage());
208                 return null;
209             }).get(subscribeTimeout, TimeUnit.MILLISECONDS);
210         } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
211             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
212                     "Did not receive all required topics");
213         }
214     }
215
216     /**
217      * Return the bride handler. The bridge is from the "MQTT" bundle.
218      */
219     public @Nullable AbstractBrokerHandler getBridgeHandler() {
220         Bridge bridge = getBridge();
221         if (bridge == null) {
222             return null;
223         }
224         return (AbstractBrokerHandler) bridge.getHandler();
225     }
226
227     /**
228      * Return the bridge status.
229      */
230     public ThingStatusInfo getBridgeStatus() {
231         Bridge b = getBridge();
232         if (b != null) {
233             return b.getStatusInfo();
234         } else {
235             return new ThingStatusInfo(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null);
236         }
237     }
238
239     @Override
240     public void initialize() {
241         bridgeStatusChanged(getBridgeStatus());
242     }
243
244     @Override
245     public void handleRemoval() {
246         stop();
247         super.handleRemoval();
248     }
249
250     @Override
251     public void dispose() {
252         stop();
253         try {
254             unsubscribeAll().get(500, TimeUnit.MILLISECONDS);
255         } catch (InterruptedException | ExecutionException | TimeoutException e) {
256             logger.warn("unsubscription on disposal failed for {}: ", thing.getUID(), e);
257         }
258         connection = null;
259         super.dispose();
260     }
261
262     /**
263      * this method must unsubscribe all topics used by this thing handler
264      *
265      * @return
266      */
267     public abstract CompletableFuture<Void> unsubscribeAll();
268
269     @Override
270     public void updateChannelState(ChannelUID channelUID, State value) {
271         if (messageReceived.compareAndSet(false, true)) {
272             calculateThingStatus();
273         }
274         super.updateState(channelUID, value);
275     }
276
277     @Override
278     public void triggerChannel(ChannelUID channelUID, String event) {
279         if (messageReceived.compareAndSet(false, true)) {
280             calculateThingStatus();
281         }
282         super.triggerChannel(channelUID, event);
283     }
284
285     @Override
286     public void postChannelCommand(ChannelUID channelUID, Command command) {
287         postCommand(channelUID, command);
288     }
289
290     public @Nullable MqttBrokerConnection getConnection() {
291         return connection;
292     }
293
294     /**
295      * This is for tests only to inject a broker connection.
296      *
297      * @param connection MQTT Broker connection
298      */
299     public void setConnection(MqttBrokerConnection connection) {
300         this.connection = connection;
301     }
302
303     @Override
304     public void addAvailabilityTopic(String availability_topic, String payload_available,
305             String payload_not_available) {
306         availabilityStates.computeIfAbsent(availability_topic, topic -> {
307             Value value = new OnOffValue(payload_available, payload_not_available);
308             ChannelGroupUID groupUID = new ChannelGroupUID(getThing().getUID(), "availablility");
309             ChannelUID channelUID = new ChannelUID(groupUID, UIDUtils.encode(topic));
310             ChannelState state = new ChannelState(ChannelConfigBuilder.create().withStateTopic(topic).build(),
311                     channelUID, value, new ChannelStateUpdateListener() {
312                         @Override
313                         public void updateChannelState(ChannelUID channelUID, State value) {
314                             calculateThingStatus();
315                         }
316
317                         @Override
318                         public void triggerChannel(ChannelUID channelUID, String eventPayload) {
319                         }
320
321                         @Override
322                         public void postChannelCommand(ChannelUID channelUID, Command value) {
323                         }
324                     });
325             MqttBrokerConnection connection = getConnection();
326             if (connection != null) {
327                 state.start(connection, scheduler, 0);
328             }
329
330             return state;
331         });
332     }
333
334     @Override
335     public void removeAvailabilityTopic(@NonNull String availability_topic) {
336         availabilityStates.computeIfPresent(availability_topic, (topic, state) -> {
337             if (connection != null && state != null) {
338                 state.stop();
339             }
340             return null;
341         });
342     }
343
344     @Override
345     public void clearAllAvailabilityTopics() {
346         Set<String> topics = new HashSet<>(availabilityStates.keySet());
347         topics.forEach(this::removeAvailabilityTopic);
348     }
349
350     @Override
351     public void resetMessageReceived() {
352         if (messageReceived.compareAndSet(true, false)) {
353             calculateThingStatus();
354         }
355     }
356
357     protected void calculateThingStatus() {
358         final boolean availabilityTopicsSeen;
359
360         if (availabilityStates.isEmpty()) {
361             availabilityTopicsSeen = true;
362         } else {
363             availabilityTopicsSeen = availabilityStates.values().stream().allMatch(
364                     c -> c != null && OnOffType.ON.equals(c.getCache().getChannelState().as(OnOffType.class)));
365         }
366         updateThingStatus(messageReceived.get(), availabilityTopicsSeen);
367     }
368
369     protected abstract void updateThingStatus(boolean messageReceived, boolean availabilityTopicsSeen);
370 }