]> git.basschouten.com Git - openhab-addons.git/blob
14ecdc8510dd29a634e5e5c0eb11e01938fa5ac1
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.util.HashSet;
16 import java.util.Map;
17 import java.util.Optional;
18 import java.util.Set;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import java.util.concurrent.atomic.AtomicBoolean;
25
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.eclipse.jdt.annotation.NonNullByDefault;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.openhab.binding.mqtt.generic.values.OnOffValue;
30 import org.openhab.binding.mqtt.generic.values.Value;
31 import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
32 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
33 import org.openhab.core.library.types.OnOffType;
34 import org.openhab.core.thing.Bridge;
35 import org.openhab.core.thing.ChannelGroupUID;
36 import org.openhab.core.thing.ChannelUID;
37 import org.openhab.core.thing.Thing;
38 import org.openhab.core.thing.ThingStatus;
39 import org.openhab.core.thing.ThingStatusDetail;
40 import org.openhab.core.thing.ThingStatusInfo;
41 import org.openhab.core.thing.binding.BaseThingHandler;
42 import org.openhab.core.types.Command;
43 import org.openhab.core.types.RefreshType;
44 import org.openhab.core.types.State;
45 import org.openhab.core.types.UnDefType;
46 import org.openhab.core.util.UIDUtils;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 /**
51  * Base class for MQTT thing handlers. If you are going to implement an MQTT convention, you probably
52  * want to inherit from here.
53  *
54  * <p>
55  * This base class will make sure you get a working {@link MqttBrokerConnection}, you will be informed
56  * when to start your subscriptions ({@link #start(MqttBrokerConnection)}) and when to free your resources
57  * because of a lost connection ({@link AbstractMQTTThingHandler#stop()}).
58  *
59  * <p>
60  * If you inherit from this base class, you must use {@link ChannelState} to (a) keep a cached channel value,
61  * (b) to link a MQTT topic value to a channel value ("MQTT state topic") and (c) to have a secondary MQTT topic
62  * where any changes to the {@link ChannelState} are send to ("MQTT command topic").
63  *
64  * <p>
65  * You are expected to keep your channel data structure organized in a way, to resolve a {@link ChannelUID} to
66  * the corresponding {@link ChannelState} in {@link #getChannelState(ChannelUID)}.
67  *
68  * <p>
69  * To inform the framework of changed values, received via MQTT, a {@link ChannelState} calls a listener callback.
70  * While setting up your {@link ChannelState} you would set the callback to your thing handler,
71  * because this base class implements {@link ChannelStateUpdateListener}.
72  *
73  * @author David Graeff - Initial contribution
74  */
75 @NonNullByDefault
76 public abstract class AbstractMQTTThingHandler extends BaseThingHandler
77         implements ChannelStateUpdateListener, AvailabilityTracker {
78     private final Logger logger = LoggerFactory.getLogger(AbstractMQTTThingHandler.class);
79     // Timeout for the entire tree parsing and subscription
80     private final int subscribeTimeout;
81
82     protected @Nullable MqttBrokerConnection connection;
83
84     private AtomicBoolean messageReceived = new AtomicBoolean(false);
85     private Map<String, @Nullable ChannelState> availabilityStates = new ConcurrentHashMap<>();
86
87     public AbstractMQTTThingHandler(Thing thing, int subscribeTimeout) {
88         super(thing);
89         this.subscribeTimeout = subscribeTimeout;
90     }
91
92     /**
93      * Return the channel state for the given channelUID.
94      *
95      * @param channelUID The channelUID
96      * @return A channel state. May be null.
97      */
98     public abstract @Nullable ChannelState getChannelState(ChannelUID channelUID);
99
100     /**
101      * Start the topic discovery and subscribe to all channel state topics on all {@link ChannelState}s.
102      * Put the thing ONLINE on success otherwise complete the returned future exceptionally.
103      *
104      * @param connection A started broker connection
105      * @return A future that completes normal on success and exceptionally on any errors.
106      */
107     protected abstract CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection);
108
109     /**
110      * Called when the MQTT connection disappeared.
111      * You should clean up all resources that depend on a working connection.
112      */
113     protected void stop() {
114         clearAllAvailabilityTopics();
115         resetMessageReceived();
116     }
117
118     @Override
119     public void handleCommand(ChannelUID channelUID, Command command) {
120         if (connection == null) {
121             return;
122         }
123
124         final @Nullable ChannelState data = getChannelState(channelUID);
125
126         if (data == null) {
127             logger.warn("Channel {} not supported!", channelUID);
128             return;
129         }
130
131         if (command instanceof RefreshType) {
132             State state = data.getCache().getChannelState();
133             if (state instanceof UnDefType) {
134                 logger.debug("Channel {} received REFRESH but no value cached, ignoring", channelUID);
135             } else {
136                 updateState(channelUID, state);
137             }
138             return;
139         }
140
141         if (data.isReadOnly()) {
142             logger.trace("Channel {} is a read-only channel, ignoring command {}", channelUID, command);
143             return;
144         }
145
146         final CompletableFuture<Boolean> future = data.publishValue(command);
147         future.handle((v, ex) -> {
148             if (ex != null) {
149                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, ex.getLocalizedMessage());
150                 logger.debug("Failed publishing value {} to topic {}: {}", command, data.getCommandTopic(),
151                         ex.getMessage());
152             } else {
153                 logger.debug("Successfully published value {} to topic {}", command, data.getCommandTopic());
154             }
155             return null;
156         });
157     }
158
159     @Override
160     public void bridgeStatusChanged(ThingStatusInfo bridgeStatusInfo) {
161         if (bridgeStatusInfo.getStatus() == ThingStatus.OFFLINE) {
162             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE);
163             stop();
164             connection = null;
165             return;
166         }
167         if (bridgeStatusInfo.getStatus() != ThingStatus.ONLINE) {
168             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
169             stop();
170             return;
171         }
172
173         AbstractBrokerHandler h = getBridgeHandler();
174         if (h == null) {
175             resetMessageReceived();
176             logger.warn("Bridge handler not found!");
177             return;
178         }
179
180         final MqttBrokerConnection connection;
181         try {
182             connection = h.getConnectionAsync().get(500, TimeUnit.MILLISECONDS);
183         } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
184             resetMessageReceived();
185             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_UNINITIALIZED,
186                     "Bridge handler has no valid broker connection!");
187             return;
188         }
189         this.connection = connection;
190
191         // Start up (subscribe to MQTT topics). Limit with a timeout and catch exceptions.
192         // We do not set the thing to ONLINE here in the AbstractBase, that is the responsibility of a derived
193         // class.
194         try {
195             start(connection).get(subscribeTimeout, TimeUnit.MILLISECONDS);
196         } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
197             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
198                     "Did not receive all required topics");
199         }
200     }
201
202     /**
203      * Return the bride handler. The bridge is from the "MQTT" bundle.
204      */
205     public @Nullable AbstractBrokerHandler getBridgeHandler() {
206         Bridge bridge = getBridge();
207         if (bridge == null) {
208             return null;
209         }
210         return (AbstractBrokerHandler) bridge.getHandler();
211     }
212
213     /**
214      * Return the bridge status.
215      */
216     public ThingStatusInfo getBridgeStatus() {
217         Bridge b = getBridge();
218         if (b != null) {
219             return b.getStatusInfo();
220         } else {
221             return new ThingStatusInfo(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null);
222         }
223     }
224
225     @Override
226     public void initialize() {
227         bridgeStatusChanged(getBridgeStatus());
228     }
229
230     @Override
231     public void handleRemoval() {
232         stop();
233         super.handleRemoval();
234     }
235
236     @Override
237     public void dispose() {
238         stop();
239         try {
240             unsubscribeAll().get(500, TimeUnit.MILLISECONDS);
241         } catch (InterruptedException | ExecutionException | TimeoutException e) {
242             logger.warn("unsubscription on disposal failed for {}: ", thing.getUID(), e);
243         }
244         connection = null;
245         super.dispose();
246     }
247
248     /**
249      * this method must unsubscribe all topics used by this thing handler
250      *
251      * @return
252      */
253     public abstract CompletableFuture<Void> unsubscribeAll();
254
255     @Override
256     public void updateChannelState(ChannelUID channelUID, State value) {
257         if (messageReceived.compareAndSet(false, true)) {
258             calculateThingStatus();
259         }
260         super.updateState(channelUID, value);
261     }
262
263     @Override
264     public void triggerChannel(ChannelUID channelUID, String event) {
265         if (messageReceived.compareAndSet(false, true)) {
266             calculateThingStatus();
267         }
268         super.triggerChannel(channelUID, event);
269     }
270
271     @Override
272     public void postChannelCommand(ChannelUID channelUID, Command command) {
273         postCommand(channelUID, command);
274     }
275
276     public @Nullable MqttBrokerConnection getConnection() {
277         return connection;
278     }
279
280     /**
281      * This is for tests only to inject a broker connection.
282      *
283      * @param connection MQTT Broker connection
284      */
285     public void setConnection(MqttBrokerConnection connection) {
286         this.connection = connection;
287     }
288
289     @Override
290     public void addAvailabilityTopic(String availability_topic, String payload_available,
291             String payload_not_available) {
292         availabilityStates.computeIfAbsent(availability_topic, topic -> {
293             Value value = new OnOffValue(payload_available, payload_not_available);
294             ChannelGroupUID groupUID = new ChannelGroupUID(getThing().getUID(), "availablility");
295             ChannelUID channelUID = new ChannelUID(groupUID, UIDUtils.encode(topic));
296             ChannelState state = new ChannelState(ChannelConfigBuilder.create().withStateTopic(topic).build(),
297                     channelUID, value, new ChannelStateUpdateListener() {
298                         @Override
299                         public void updateChannelState(ChannelUID channelUID, State value) {
300                             calculateThingStatus();
301                         }
302
303                         @Override
304                         public void triggerChannel(ChannelUID channelUID, String eventPayload) {
305                         }
306
307                         @Override
308                         public void postChannelCommand(ChannelUID channelUID, Command value) {
309                         }
310                     });
311             MqttBrokerConnection connection = getConnection();
312             if (connection != null) {
313                 state.start(connection, scheduler, 0);
314             }
315
316             return state;
317         });
318     }
319
320     @Override
321     public void removeAvailabilityTopic(@NonNull String availability_topic) {
322         availabilityStates.computeIfPresent(availability_topic, (topic, state) -> {
323             if (connection != null && state != null) {
324                 state.stop();
325             }
326             return null;
327         });
328     }
329
330     @Override
331     public void clearAllAvailabilityTopics() {
332         Set<String> topics = new HashSet<>(availabilityStates.keySet());
333         topics.forEach(this::removeAvailabilityTopic);
334     }
335
336     @Override
337     public void resetMessageReceived() {
338         if (messageReceived.compareAndSet(true, false)) {
339             calculateThingStatus();
340         }
341     }
342
343     protected void calculateThingStatus() {
344         final Optional<Boolean> availabilityTopicsSeen;
345
346         if (availabilityStates.isEmpty()) {
347             availabilityTopicsSeen = Optional.empty();
348         } else {
349             availabilityTopicsSeen = Optional.of(availabilityStates.values().stream().allMatch(
350                     c -> c != null && OnOffType.ON.equals(c.getCache().getChannelState().as(OnOffType.class))));
351         }
352         updateThingStatus(messageReceived.get(), availabilityTopicsSeen);
353     }
354
355     protected abstract void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen);
356 }