]> git.basschouten.com Git - openhab-addons.git/blob
ef47ebc22da17cbe3da71438d7b9cc220c030684
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.util.HashSet;
16 import java.util.Map;
17 import java.util.Optional;
18 import java.util.Set;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import java.util.concurrent.atomic.AtomicBoolean;
25
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
29 import org.openhab.binding.mqtt.generic.values.OnOffValue;
30 import org.openhab.binding.mqtt.generic.values.Value;
31 import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
32 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
33 import org.openhab.core.library.types.OnOffType;
34 import org.openhab.core.thing.Bridge;
35 import org.openhab.core.thing.ChannelGroupUID;
36 import org.openhab.core.thing.ChannelUID;
37 import org.openhab.core.thing.Thing;
38 import org.openhab.core.thing.ThingStatus;
39 import org.openhab.core.thing.ThingStatusDetail;
40 import org.openhab.core.thing.ThingStatusInfo;
41 import org.openhab.core.thing.binding.BaseThingHandler;
42 import org.openhab.core.types.Command;
43 import org.openhab.core.types.RefreshType;
44 import org.openhab.core.types.State;
45 import org.openhab.core.types.UnDefType;
46 import org.openhab.core.util.UIDUtils;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 /**
51  * Base class for MQTT thing handlers. If you are going to implement an MQTT convention, you probably
52  * want to inherit from here.
53  *
54  * <p>
55  * This base class will make sure you get a working {@link MqttBrokerConnection}, you will be informed
56  * when to start your subscriptions ({@link #start(MqttBrokerConnection)}) and when to free your resources
57  * because of a lost connection ({@link AbstractMQTTThingHandler#stop()}).
58  *
59  * <p>
60  * If you inherit from this base class, you must use {@link ChannelState} to (a) keep a cached channel value,
61  * (b) to link a MQTT topic value to a channel value ("MQTT state topic") and (c) to have a secondary MQTT topic
62  * where any changes to the {@link ChannelState} are send to ("MQTT command topic").
63  *
64  * <p>
65  * You are expected to keep your channel data structure organized in a way, to resolve a {@link ChannelUID} to
66  * the corresponding {@link ChannelState} in {@link #getChannelState(ChannelUID)}.
67  *
68  * <p>
69  * To inform the framework of changed values, received via MQTT, a {@link ChannelState} calls a listener callback.
70  * While setting up your {@link ChannelState} you would set the callback to your thing handler,
71  * because this base class implements {@link ChannelStateUpdateListener}.
72  *
73  * @author David Graeff - Initial contribution
74  */
75 @NonNullByDefault
76 public abstract class AbstractMQTTThingHandler extends BaseThingHandler
77         implements ChannelStateUpdateListener, AvailabilityTracker {
78     private final Logger logger = LoggerFactory.getLogger(AbstractMQTTThingHandler.class);
79     // Timeout for the entire tree parsing and subscription
80     private final int subscribeTimeout;
81
82     protected @Nullable MqttBrokerConnection connection;
83
84     private AtomicBoolean messageReceived = new AtomicBoolean(false);
85     private Map<String, @Nullable ChannelState> availabilityStates = new ConcurrentHashMap<>();
86     private AvailabilityMode availabilityMode = AvailabilityMode.ALL;
87
88     public AbstractMQTTThingHandler(Thing thing, int subscribeTimeout) {
89         super(thing);
90         this.subscribeTimeout = subscribeTimeout;
91     }
92
93     /**
94      * Return the channel state for the given channelUID.
95      *
96      * @param channelUID The channelUID
97      * @return A channel state. May be null.
98      */
99     public abstract @Nullable ChannelState getChannelState(ChannelUID channelUID);
100
101     /**
102      * Start the topic discovery and subscribe to all channel state topics on all {@link ChannelState}s.
103      * Put the thing ONLINE on success otherwise complete the returned future exceptionally.
104      *
105      * @param connection A started broker connection
106      * @return A future that completes normal on success and exceptionally on any errors.
107      */
108     protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
109         return availabilityStates.values().stream().map(cChannel -> {
110             final CompletableFuture<@Nullable Void> fut = cChannel == null ? CompletableFuture.completedFuture(null)
111                     : cChannel.start(connection, scheduler, 0);
112             return fut;
113         }).collect(FutureCollector.allOf());
114     }
115
116     /**
117      * Called when the MQTT connection disappeared.
118      * You should clean up all resources that depend on a working connection.
119      */
120     protected void stop() {
121         clearAllAvailabilityTopics();
122         resetMessageReceived();
123     }
124
125     @Override
126     public void handleCommand(ChannelUID channelUID, Command command) {
127         if (connection == null) {
128             return;
129         }
130
131         final @Nullable ChannelState data = getChannelState(channelUID);
132
133         if (data == null) {
134             logger.warn("Channel {} not supported!", channelUID);
135             return;
136         }
137
138         if (command instanceof RefreshType) {
139             State state = data.getCache().getChannelState();
140             if (state instanceof UnDefType) {
141                 logger.debug("Channel {} received REFRESH but no value cached, ignoring", channelUID);
142             } else {
143                 updateState(channelUID, state);
144             }
145             return;
146         }
147
148         if (data.isReadOnly()) {
149             logger.trace("Channel {} is a read-only channel, ignoring command {}", channelUID, command);
150             return;
151         }
152
153         final CompletableFuture<Boolean> future = data.publishValue(command);
154         future.handle((v, ex) -> {
155             if (ex != null) {
156                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, ex.getLocalizedMessage());
157                 logger.debug("Failed publishing value {} to topic {}: {}", command, data.getCommandTopic(),
158                         ex.getMessage());
159             } else {
160                 logger.debug("Successfully published value {} to topic {}", command, data.getCommandTopic());
161             }
162             return null;
163         });
164     }
165
166     @Override
167     public void bridgeStatusChanged(ThingStatusInfo bridgeStatusInfo) {
168         if (bridgeStatusInfo.getStatus() == ThingStatus.OFFLINE) {
169             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE);
170             stop();
171             connection = null;
172             return;
173         }
174         if (bridgeStatusInfo.getStatus() != ThingStatus.ONLINE) {
175             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
176             stop();
177             return;
178         }
179
180         AbstractBrokerHandler h = getBridgeHandler();
181         if (h == null) {
182             resetMessageReceived();
183             logger.warn("Bridge handler not found!");
184             return;
185         }
186
187         final MqttBrokerConnection connection;
188         try {
189             connection = h.getConnectionAsync().get(500, TimeUnit.MILLISECONDS);
190         } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
191             resetMessageReceived();
192             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_UNINITIALIZED,
193                     "Bridge handler has no valid broker connection!");
194             return;
195         }
196         this.connection = connection;
197
198         // Start up (subscribe to MQTT topics). Limit with a timeout and catch exceptions.
199         // We do not set the thing to ONLINE here in the AbstractBase, that is the responsibility of a derived
200         // class.
201         try {
202             start(connection).get(subscribeTimeout, TimeUnit.MILLISECONDS);
203         } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
204             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
205                     "Did not receive all required topics");
206         }
207     }
208
209     /**
210      * Return the bride handler. The bridge is from the "MQTT" bundle.
211      */
212     public @Nullable AbstractBrokerHandler getBridgeHandler() {
213         Bridge bridge = getBridge();
214         if (bridge == null) {
215             return null;
216         }
217         return (AbstractBrokerHandler) bridge.getHandler();
218     }
219
220     /**
221      * Return the bridge status.
222      */
223     public ThingStatusInfo getBridgeStatus() {
224         Bridge b = getBridge();
225         if (b != null) {
226             return b.getStatusInfo();
227         } else {
228             return new ThingStatusInfo(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null);
229         }
230     }
231
232     @Override
233     public void initialize() {
234         bridgeStatusChanged(getBridgeStatus());
235     }
236
237     @Override
238     public void handleRemoval() {
239         stop();
240         super.handleRemoval();
241     }
242
243     @Override
244     public void dispose() {
245         stop();
246         try {
247             unsubscribeAll().get(500, TimeUnit.MILLISECONDS);
248         } catch (InterruptedException | ExecutionException | TimeoutException e) {
249             logger.warn("unsubscription on disposal failed for {}: ", thing.getUID(), e);
250         }
251         connection = null;
252         super.dispose();
253     }
254
255     /**
256      * this method must unsubscribe all topics used by this thing handler
257      *
258      * @return
259      */
260     public abstract CompletableFuture<Void> unsubscribeAll();
261
262     @Override
263     public void updateChannelState(ChannelUID channelUID, State value) {
264         if (messageReceived.compareAndSet(false, true)) {
265             calculateAndUpdateThingStatus(true);
266         }
267         super.updateState(channelUID, value);
268     }
269
270     @Override
271     public void triggerChannel(ChannelUID channelUID, String event) {
272         if (messageReceived.compareAndSet(false, true)) {
273             calculateAndUpdateThingStatus(true);
274         }
275         super.triggerChannel(channelUID, event);
276     }
277
278     @Override
279     public void postChannelCommand(ChannelUID channelUID, Command command) {
280         postCommand(channelUID, command);
281     }
282
283     public @Nullable MqttBrokerConnection getConnection() {
284         return connection;
285     }
286
287     /**
288      * This is for tests only to inject a broker connection.
289      *
290      * @param connection MQTT Broker connection
291      */
292     public void setConnection(MqttBrokerConnection connection) {
293         this.connection = connection;
294     }
295
296     @Override
297     public void setAvailabilityMode(AvailabilityMode mode) {
298         this.availabilityMode = mode;
299     }
300
301     @Override
302     public void addAvailabilityTopic(String availability_topic, String payload_available,
303             String payload_not_available) {
304         addAvailabilityTopic(availability_topic, payload_available, payload_not_available, null, null);
305     }
306
307     @Override
308     public void addAvailabilityTopic(String availability_topic, String payload_available, String payload_not_available,
309             @Nullable String transformation_pattern,
310             @Nullable TransformationServiceProvider transformationServiceProvider) {
311         availabilityStates.computeIfAbsent(availability_topic, topic -> {
312             Value value = new OnOffValue(payload_available, payload_not_available);
313             ChannelGroupUID groupUID = new ChannelGroupUID(getThing().getUID(), "availability");
314             ChannelUID channelUID = new ChannelUID(groupUID, UIDUtils.encode(topic));
315             ChannelState state = new ChannelState(ChannelConfigBuilder.create().withStateTopic(topic).build(),
316                     channelUID, value, new ChannelStateUpdateListener() {
317                         @Override
318                         public void updateChannelState(ChannelUID channelUID, State value) {
319                             boolean online = value.equals(OnOffType.ON);
320                             calculateAndUpdateThingStatus(online);
321                         }
322
323                         @Override
324                         public void triggerChannel(ChannelUID channelUID, String eventPayload) {
325                         }
326
327                         @Override
328                         public void postChannelCommand(ChannelUID channelUID, Command value) {
329                         }
330                     });
331             if (transformation_pattern != null && transformationServiceProvider != null) {
332                 state.addTransformation(transformation_pattern, transformationServiceProvider);
333             }
334             MqttBrokerConnection connection = getConnection();
335             if (connection != null) {
336                 state.start(connection, scheduler, 0);
337             }
338
339             return state;
340         });
341     }
342
343     @Override
344     public void removeAvailabilityTopic(String availabilityTopic) {
345         availabilityStates.computeIfPresent(availabilityTopic, (topic, state) -> {
346             if (connection != null && state != null) {
347                 state.stop();
348             }
349             return null;
350         });
351     }
352
353     @Override
354     public void clearAllAvailabilityTopics() {
355         Set<String> topics = new HashSet<>(availabilityStates.keySet());
356         topics.forEach(this::removeAvailabilityTopic);
357     }
358
359     @Override
360     public void resetMessageReceived() {
361         if (messageReceived.compareAndSet(true, false)) {
362             calculateAndUpdateThingStatus(false);
363         }
364     }
365
366     protected void calculateAndUpdateThingStatus(boolean lastValue) {
367         final Optional<Boolean> availabilityTopicsSeen;
368
369         if (availabilityStates.isEmpty()) {
370             availabilityTopicsSeen = Optional.empty();
371         } else {
372             availabilityTopicsSeen = switch (availabilityMode) {
373                 case ALL -> Optional.of(availabilityStates.values().stream().allMatch(
374                         c -> c != null && OnOffType.ON.equals(c.getCache().getChannelState().as(OnOffType.class))));
375                 case ANY -> Optional.of(availabilityStates.values().stream().anyMatch(
376                         c -> c != null && OnOffType.ON.equals(c.getCache().getChannelState().as(OnOffType.class))));
377                 case LATEST -> Optional.of(lastValue);
378             };
379         }
380         updateThingStatus(messageReceived.get(), availabilityTopicsSeen);
381     }
382
383     protected abstract void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen);
384 }