]> git.basschouten.com Git - openhab-addons.git/blob
ded70ee8350bdae75fdbcd02aeedde98c5e85c41
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.util.HashSet;
16 import java.util.Map;
17 import java.util.Optional;
18 import java.util.Set;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import java.util.concurrent.atomic.AtomicBoolean;
25
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.openhab.binding.mqtt.generic.values.OnOffValue;
29 import org.openhab.binding.mqtt.generic.values.Value;
30 import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
31 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
32 import org.openhab.core.library.types.OnOffType;
33 import org.openhab.core.thing.Bridge;
34 import org.openhab.core.thing.ChannelGroupUID;
35 import org.openhab.core.thing.ChannelUID;
36 import org.openhab.core.thing.Thing;
37 import org.openhab.core.thing.ThingStatus;
38 import org.openhab.core.thing.ThingStatusDetail;
39 import org.openhab.core.thing.ThingStatusInfo;
40 import org.openhab.core.thing.binding.BaseThingHandler;
41 import org.openhab.core.types.Command;
42 import org.openhab.core.types.RefreshType;
43 import org.openhab.core.types.State;
44 import org.openhab.core.types.UnDefType;
45 import org.openhab.core.util.UIDUtils;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Base class for MQTT thing handlers. If you are going to implement an MQTT convention, you probably
51  * want to inherit from here.
52  *
53  * <p>
54  * This base class will make sure you get a working {@link MqttBrokerConnection}, you will be informed
55  * when to start your subscriptions ({@link #start(MqttBrokerConnection)}) and when to free your resources
56  * because of a lost connection ({@link AbstractMQTTThingHandler#stop()}).
57  *
58  * <p>
59  * If you inherit from this base class, you must use {@link ChannelState} to (a) keep a cached channel value,
60  * (b) to link a MQTT topic value to a channel value ("MQTT state topic") and (c) to have a secondary MQTT topic
61  * where any changes to the {@link ChannelState} are send to ("MQTT command topic").
62  *
63  * <p>
64  * You are expected to keep your channel data structure organized in a way, to resolve a {@link ChannelUID} to
65  * the corresponding {@link ChannelState} in {@link #getChannelState(ChannelUID)}.
66  *
67  * <p>
68  * To inform the framework of changed values, received via MQTT, a {@link ChannelState} calls a listener callback.
69  * While setting up your {@link ChannelState} you would set the callback to your thing handler,
70  * because this base class implements {@link ChannelStateUpdateListener}.
71  *
72  * @author David Graeff - Initial contribution
73  */
74 @NonNullByDefault
75 public abstract class AbstractMQTTThingHandler extends BaseThingHandler
76         implements ChannelStateUpdateListener, AvailabilityTracker {
77     private final Logger logger = LoggerFactory.getLogger(AbstractMQTTThingHandler.class);
78     // Timeout for the entire tree parsing and subscription
79     private final int subscribeTimeout;
80
81     protected @Nullable MqttBrokerConnection connection;
82
83     private AtomicBoolean messageReceived = new AtomicBoolean(false);
84     private Map<String, @Nullable ChannelState> availabilityStates = new ConcurrentHashMap<>();
85
86     public AbstractMQTTThingHandler(Thing thing, int subscribeTimeout) {
87         super(thing);
88         this.subscribeTimeout = subscribeTimeout;
89     }
90
91     /**
92      * Return the channel state for the given channelUID.
93      *
94      * @param channelUID The channelUID
95      * @return A channel state. May be null.
96      */
97     public abstract @Nullable ChannelState getChannelState(ChannelUID channelUID);
98
99     /**
100      * Start the topic discovery and subscribe to all channel state topics on all {@link ChannelState}s.
101      * Put the thing ONLINE on success otherwise complete the returned future exceptionally.
102      *
103      * @param connection A started broker connection
104      * @return A future that completes normal on success and exceptionally on any errors.
105      */
106     protected abstract CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection);
107
108     /**
109      * Called when the MQTT connection disappeared.
110      * You should clean up all resources that depend on a working connection.
111      */
112     protected void stop() {
113         clearAllAvailabilityTopics();
114         resetMessageReceived();
115     }
116
117     @Override
118     public void handleCommand(ChannelUID channelUID, Command command) {
119         if (connection == null) {
120             return;
121         }
122
123         final @Nullable ChannelState data = getChannelState(channelUID);
124
125         if (data == null) {
126             logger.warn("Channel {} not supported!", channelUID);
127             return;
128         }
129
130         if (command instanceof RefreshType) {
131             State state = data.getCache().getChannelState();
132             if (state instanceof UnDefType) {
133                 logger.debug("Channel {} received REFRESH but no value cached, ignoring", channelUID);
134             } else {
135                 updateState(channelUID, state);
136             }
137             return;
138         }
139
140         if (data.isReadOnly()) {
141             logger.trace("Channel {} is a read-only channel, ignoring command {}", channelUID, command);
142             return;
143         }
144
145         final CompletableFuture<Boolean> future = data.publishValue(command);
146         future.handle((v, ex) -> {
147             if (ex != null) {
148                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, ex.getLocalizedMessage());
149                 logger.debug("Failed publishing value {} to topic {}: {}", command, data.getCommandTopic(),
150                         ex.getMessage());
151             } else {
152                 logger.debug("Successfully published value {} to topic {}", command, data.getCommandTopic());
153             }
154             return null;
155         });
156     }
157
158     @Override
159     public void bridgeStatusChanged(ThingStatusInfo bridgeStatusInfo) {
160         if (bridgeStatusInfo.getStatus() == ThingStatus.OFFLINE) {
161             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE);
162             stop();
163             connection = null;
164             return;
165         }
166         if (bridgeStatusInfo.getStatus() != ThingStatus.ONLINE) {
167             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
168             stop();
169             return;
170         }
171
172         AbstractBrokerHandler h = getBridgeHandler();
173         if (h == null) {
174             resetMessageReceived();
175             logger.warn("Bridge handler not found!");
176             return;
177         }
178
179         final MqttBrokerConnection connection;
180         try {
181             connection = h.getConnectionAsync().get(500, TimeUnit.MILLISECONDS);
182         } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
183             resetMessageReceived();
184             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_UNINITIALIZED,
185                     "Bridge handler has no valid broker connection!");
186             return;
187         }
188         this.connection = connection;
189
190         // Start up (subscribe to MQTT topics). Limit with a timeout and catch exceptions.
191         // We do not set the thing to ONLINE here in the AbstractBase, that is the responsibility of a derived
192         // class.
193         try {
194             start(connection).get(subscribeTimeout, TimeUnit.MILLISECONDS);
195         } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
196             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
197                     "Did not receive all required topics");
198         }
199     }
200
201     /**
202      * Return the bride handler. The bridge is from the "MQTT" bundle.
203      */
204     public @Nullable AbstractBrokerHandler getBridgeHandler() {
205         Bridge bridge = getBridge();
206         if (bridge == null) {
207             return null;
208         }
209         return (AbstractBrokerHandler) bridge.getHandler();
210     }
211
212     /**
213      * Return the bridge status.
214      */
215     public ThingStatusInfo getBridgeStatus() {
216         Bridge b = getBridge();
217         if (b != null) {
218             return b.getStatusInfo();
219         } else {
220             return new ThingStatusInfo(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null);
221         }
222     }
223
224     @Override
225     public void initialize() {
226         bridgeStatusChanged(getBridgeStatus());
227     }
228
229     @Override
230     public void handleRemoval() {
231         stop();
232         super.handleRemoval();
233     }
234
235     @Override
236     public void dispose() {
237         stop();
238         try {
239             unsubscribeAll().get(500, TimeUnit.MILLISECONDS);
240         } catch (InterruptedException | ExecutionException | TimeoutException e) {
241             logger.warn("unsubscription on disposal failed for {}: ", thing.getUID(), e);
242         }
243         connection = null;
244         super.dispose();
245     }
246
247     /**
248      * this method must unsubscribe all topics used by this thing handler
249      *
250      * @return
251      */
252     public abstract CompletableFuture<Void> unsubscribeAll();
253
254     @Override
255     public void updateChannelState(ChannelUID channelUID, State value) {
256         if (messageReceived.compareAndSet(false, true)) {
257             calculateThingStatus();
258         }
259         super.updateState(channelUID, value);
260     }
261
262     @Override
263     public void triggerChannel(ChannelUID channelUID, String event) {
264         if (messageReceived.compareAndSet(false, true)) {
265             calculateThingStatus();
266         }
267         super.triggerChannel(channelUID, event);
268     }
269
270     @Override
271     public void postChannelCommand(ChannelUID channelUID, Command command) {
272         postCommand(channelUID, command);
273     }
274
275     public @Nullable MqttBrokerConnection getConnection() {
276         return connection;
277     }
278
279     /**
280      * This is for tests only to inject a broker connection.
281      *
282      * @param connection MQTT Broker connection
283      */
284     public void setConnection(MqttBrokerConnection connection) {
285         this.connection = connection;
286     }
287
288     @Override
289     public void addAvailabilityTopic(String availability_topic, String payload_available,
290             String payload_not_available) {
291         addAvailabilityTopic(availability_topic, payload_available, payload_not_available, null, null);
292     }
293
294     @Override
295     public void addAvailabilityTopic(String availability_topic, String payload_available, String payload_not_available,
296             @Nullable String transformation_pattern,
297             @Nullable TransformationServiceProvider transformationServiceProvider) {
298         availabilityStates.computeIfAbsent(availability_topic, topic -> {
299             Value value = new OnOffValue(payload_available, payload_not_available);
300             ChannelGroupUID groupUID = new ChannelGroupUID(getThing().getUID(), "availability");
301             ChannelUID channelUID = new ChannelUID(groupUID, UIDUtils.encode(topic));
302             ChannelState state = new ChannelState(ChannelConfigBuilder.create().withStateTopic(topic).build(),
303                     channelUID, value, new ChannelStateUpdateListener() {
304                         @Override
305                         public void updateChannelState(ChannelUID channelUID, State value) {
306                             calculateThingStatus();
307                         }
308
309                         @Override
310                         public void triggerChannel(ChannelUID channelUID, String eventPayload) {
311                         }
312
313                         @Override
314                         public void postChannelCommand(ChannelUID channelUID, Command value) {
315                         }
316                     });
317             if (transformation_pattern != null && transformationServiceProvider != null) {
318                 state.addTransformation(transformation_pattern, transformationServiceProvider);
319             }
320             MqttBrokerConnection connection = getConnection();
321             if (connection != null) {
322                 state.start(connection, scheduler, 0);
323             }
324
325             return state;
326         });
327     }
328
329     @Override
330     public void removeAvailabilityTopic(String availabilityTopic) {
331         availabilityStates.computeIfPresent(availabilityTopic, (topic, state) -> {
332             if (connection != null && state != null) {
333                 state.stop();
334             }
335             return null;
336         });
337     }
338
339     @Override
340     public void clearAllAvailabilityTopics() {
341         Set<String> topics = new HashSet<>(availabilityStates.keySet());
342         topics.forEach(this::removeAvailabilityTopic);
343     }
344
345     @Override
346     public void resetMessageReceived() {
347         if (messageReceived.compareAndSet(true, false)) {
348             calculateThingStatus();
349         }
350     }
351
352     protected void calculateThingStatus() {
353         final Optional<Boolean> availabilityTopicsSeen;
354
355         if (availabilityStates.isEmpty()) {
356             availabilityTopicsSeen = Optional.empty();
357         } else {
358             availabilityTopicsSeen = Optional.of(availabilityStates.values().stream().allMatch(
359                     c -> c != null && OnOffType.ON.equals(c.getCache().getChannelState().as(OnOffType.class))));
360         }
361         updateThingStatus(messageReceived.get(), availabilityTopicsSeen);
362     }
363
364     protected abstract void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen);
365 }