]> git.basschouten.com Git - openhab-addons.git/blob
b43f8fa4cb05232a0ea6feb75ee9b98db8392b43
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.util.HashSet;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.Set;
20 import java.util.concurrent.CompletableFuture;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 import org.eclipse.jdt.annotation.NonNullByDefault;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
30 import org.openhab.binding.mqtt.generic.values.OnOffValue;
31 import org.openhab.binding.mqtt.generic.values.Value;
32 import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
33 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
34 import org.openhab.core.library.types.OnOffType;
35 import org.openhab.core.thing.Bridge;
36 import org.openhab.core.thing.ChannelGroupUID;
37 import org.openhab.core.thing.ChannelUID;
38 import org.openhab.core.thing.Thing;
39 import org.openhab.core.thing.ThingStatus;
40 import org.openhab.core.thing.ThingStatusDetail;
41 import org.openhab.core.thing.ThingStatusInfo;
42 import org.openhab.core.thing.binding.BaseThingHandler;
43 import org.openhab.core.types.Command;
44 import org.openhab.core.types.RefreshType;
45 import org.openhab.core.types.State;
46 import org.openhab.core.types.UnDefType;
47 import org.openhab.core.util.UIDUtils;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 /**
52  * Base class for MQTT thing handlers. If you are going to implement an MQTT convention, you probably
53  * want to inherit from here.
54  *
55  * <p>
56  * This base class will make sure you get a working {@link MqttBrokerConnection}, you will be informed
57  * when to start your subscriptions ({@link #start(MqttBrokerConnection)}) and when to free your resources
58  * because of a lost connection ({@link AbstractMQTTThingHandler#stop()}).
59  *
60  * <p>
61  * If you inherit from this base class, you must use {@link ChannelState} to (a) keep a cached channel value,
62  * (b) to link a MQTT topic value to a channel value ("MQTT state topic") and (c) to have a secondary MQTT topic
63  * where any changes to the {@link ChannelState} are send to ("MQTT command topic").
64  *
65  * <p>
66  * You are expected to keep your channel data structure organized in a way, to resolve a {@link ChannelUID} to
67  * the corresponding {@link ChannelState} in {@link #getChannelState(ChannelUID)}.
68  *
69  * <p>
70  * To inform the framework of changed values, received via MQTT, a {@link ChannelState} calls a listener callback.
71  * While setting up your {@link ChannelState} you would set the callback to your thing handler,
72  * because this base class implements {@link ChannelStateUpdateListener}.
73  *
74  * @author David Graeff - Initial contribution
75  */
76 @NonNullByDefault
77 public abstract class AbstractMQTTThingHandler extends BaseThingHandler
78         implements ChannelStateUpdateListener, AvailabilityTracker {
79     private final Logger logger = LoggerFactory.getLogger(AbstractMQTTThingHandler.class);
80     // Timeout for the entire tree parsing and subscription
81     private final int subscribeTimeout;
82
83     protected @Nullable MqttBrokerConnection connection;
84
85     private AtomicBoolean messageReceived = new AtomicBoolean(false);
86     private Map<String, @Nullable ChannelState> availabilityStates = new ConcurrentHashMap<>();
87     private AvailabilityMode availabilityMode = AvailabilityMode.ALL;
88
89     public AbstractMQTTThingHandler(Thing thing, int subscribeTimeout) {
90         super(thing);
91         this.subscribeTimeout = subscribeTimeout;
92     }
93
94     /**
95      * Return the channel state for the given channelUID.
96      *
97      * @param channelUID The channelUID
98      * @return A channel state. May be null.
99      */
100     public abstract @Nullable ChannelState getChannelState(ChannelUID channelUID);
101
102     /**
103      * Start the topic discovery and subscribe to all channel state topics on all {@link ChannelState}s.
104      * Put the thing ONLINE on success otherwise complete the returned future exceptionally.
105      *
106      * @param connection A started broker connection
107      * @return A future that completes normal on success and exceptionally on any errors.
108      */
109     protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
110         return availabilityStates.values().stream().map(cChannel -> {
111             final CompletableFuture<@Nullable Void> fut = cChannel == null ? CompletableFuture.completedFuture(null)
112                     : cChannel.start(connection, scheduler, 0);
113             return fut;
114         }).collect(FutureCollector.allOf());
115     }
116
117     /**
118      * Called when the MQTT connection disappeared.
119      * You should clean up all resources that depend on a working connection.
120      */
121     protected void stop() {
122         clearAllAvailabilityTopics();
123         resetMessageReceived();
124     }
125
126     @Override
127     public void handleCommand(ChannelUID channelUID, Command command) {
128         if (connection == null) {
129             return;
130         }
131
132         final @Nullable ChannelState data = getChannelState(channelUID);
133
134         if (data == null) {
135             logger.warn("Channel {} not supported!", channelUID);
136             return;
137         }
138
139         if (command instanceof RefreshType) {
140             State state = data.getCache().getChannelState();
141             if (state instanceof UnDefType) {
142                 logger.debug("Channel {} received REFRESH but no value cached, ignoring", channelUID);
143             } else {
144                 updateState(channelUID, state);
145             }
146             return;
147         }
148
149         if (data.isReadOnly()) {
150             logger.trace("Channel {} is a read-only channel, ignoring command {}", channelUID, command);
151             return;
152         }
153
154         final CompletableFuture<Boolean> future = data.publishValue(command);
155         future.handle((v, ex) -> {
156             if (ex != null) {
157                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, ex.getLocalizedMessage());
158                 logger.debug("Failed publishing value {} to topic {}: {}", command, data.getCommandTopic(),
159                         ex.getMessage());
160             } else {
161                 logger.debug("Successfully published value {} to topic {}", command, data.getCommandTopic());
162             }
163             return null;
164         });
165     }
166
167     @Override
168     public void bridgeStatusChanged(ThingStatusInfo bridgeStatusInfo) {
169         if (bridgeStatusInfo.getStatus() == ThingStatus.OFFLINE) {
170             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE);
171             stop();
172             connection = null;
173             return;
174         }
175         if (bridgeStatusInfo.getStatus() != ThingStatus.ONLINE) {
176             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
177             stop();
178             return;
179         }
180
181         AbstractBrokerHandler h = getBridgeHandler();
182         if (h == null) {
183             resetMessageReceived();
184             logger.warn("Bridge handler not found!");
185             return;
186         }
187
188         final MqttBrokerConnection connection;
189         try {
190             connection = h.getConnectionAsync().get(500, TimeUnit.MILLISECONDS);
191         } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
192             resetMessageReceived();
193             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_UNINITIALIZED,
194                     "Bridge handler has no valid broker connection!");
195             return;
196         }
197         this.connection = connection;
198
199         // Start up (subscribe to MQTT topics). Limit with a timeout and catch exceptions.
200         // We do not set the thing to ONLINE here in the AbstractBase, that is the responsibility of a derived
201         // class.
202         try {
203             start(connection).get(subscribeTimeout, TimeUnit.MILLISECONDS);
204         } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
205             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
206                     "Did not receive all required topics");
207         }
208     }
209
210     /**
211      * Return the bride handler. The bridge is from the "MQTT" bundle.
212      */
213     public @Nullable AbstractBrokerHandler getBridgeHandler() {
214         Bridge bridge = getBridge();
215         if (bridge == null) {
216             return null;
217         }
218         return (AbstractBrokerHandler) bridge.getHandler();
219     }
220
221     /**
222      * Return the bridge status.
223      */
224     public ThingStatusInfo getBridgeStatus() {
225         Bridge b = getBridge();
226         if (b != null) {
227             return b.getStatusInfo();
228         } else {
229             return new ThingStatusInfo(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null);
230         }
231     }
232
233     @Override
234     public void initialize() {
235         bridgeStatusChanged(getBridgeStatus());
236     }
237
238     @Override
239     public void handleRemoval() {
240         stop();
241         super.handleRemoval();
242     }
243
244     @Override
245     public void dispose() {
246         stop();
247         try {
248             unsubscribeAll().get(500, TimeUnit.MILLISECONDS);
249         } catch (InterruptedException | ExecutionException | TimeoutException e) {
250             logger.warn("unsubscription on disposal failed for {}: ", thing.getUID(), e);
251         }
252         connection = null;
253         super.dispose();
254     }
255
256     /**
257      * this method must unsubscribe all topics used by this thing handler
258      *
259      * @return
260      */
261     public abstract CompletableFuture<Void> unsubscribeAll();
262
263     @Override
264     public void updateChannelState(ChannelUID channelUID, State value) {
265         if (messageReceived.compareAndSet(false, true)) {
266             calculateAndUpdateThingStatus(true);
267         }
268         super.updateState(channelUID, value);
269     }
270
271     @Override
272     public void triggerChannel(ChannelUID channelUID, String event) {
273         if (messageReceived.compareAndSet(false, true)) {
274             calculateAndUpdateThingStatus(true);
275         }
276         super.triggerChannel(channelUID, event);
277     }
278
279     @Override
280     public void postChannelCommand(ChannelUID channelUID, Command command) {
281         postCommand(channelUID, command);
282     }
283
284     public @Nullable MqttBrokerConnection getConnection() {
285         return connection;
286     }
287
288     /**
289      * This is for tests only to inject a broker connection.
290      *
291      * @param connection MQTT Broker connection
292      */
293     public void setConnection(MqttBrokerConnection connection) {
294         this.connection = connection;
295     }
296
297     @Override
298     public void setAvailabilityMode(AvailabilityMode mode) {
299         this.availabilityMode = mode;
300     }
301
302     @Override
303     public void addAvailabilityTopic(String availability_topic, String payload_available,
304             String payload_not_available) {
305         addAvailabilityTopic(availability_topic, payload_available, payload_not_available, List.of());
306     }
307
308     @Override
309     public void addAvailabilityTopic(String availability_topic, String payload_available, String payload_not_available,
310             List<String> transformation_pattern) {
311         availabilityStates.computeIfAbsent(availability_topic, topic -> {
312             Value value = new OnOffValue(payload_available, payload_not_available);
313             ChannelGroupUID groupUID = new ChannelGroupUID(getThing().getUID(), "availability");
314             ChannelUID channelUID = new ChannelUID(groupUID, UIDUtils.encode(topic));
315             ChannelState state = new ChannelState(
316                     ChannelConfigBuilder.create().withStateTopic(topic)
317                             .withTransformationPattern(transformation_pattern).build(),
318                     channelUID, value, new ChannelStateUpdateListener() {
319                         @Override
320                         public void updateChannelState(ChannelUID channelUID, State value) {
321                             boolean online = value.equals(OnOffType.ON);
322                             calculateAndUpdateThingStatus(online);
323                         }
324
325                         @Override
326                         public void triggerChannel(ChannelUID channelUID, String eventPayload) {
327                         }
328
329                         @Override
330                         public void postChannelCommand(ChannelUID channelUID, Command value) {
331                         }
332                     });
333             MqttBrokerConnection connection = getConnection();
334             if (connection != null) {
335                 state.start(connection, scheduler, 0);
336             }
337
338             return state;
339         });
340     }
341
342     @Override
343     public void removeAvailabilityTopic(String availabilityTopic) {
344         availabilityStates.computeIfPresent(availabilityTopic, (topic, state) -> {
345             if (connection != null && state != null) {
346                 state.stop();
347             }
348             return null;
349         });
350     }
351
352     @Override
353     public void clearAllAvailabilityTopics() {
354         Set<String> topics = new HashSet<>(availabilityStates.keySet());
355         topics.forEach(this::removeAvailabilityTopic);
356     }
357
358     @Override
359     public void resetMessageReceived() {
360         if (messageReceived.compareAndSet(true, false)) {
361             calculateAndUpdateThingStatus(false);
362         }
363     }
364
365     protected void calculateAndUpdateThingStatus(boolean lastValue) {
366         final Optional<Boolean> availabilityTopicsSeen;
367
368         if (availabilityStates.isEmpty()) {
369             availabilityTopicsSeen = Optional.empty();
370         } else {
371             availabilityTopicsSeen = switch (availabilityMode) {
372                 case ALL -> Optional.of(availabilityStates.values().stream().allMatch(
373                         c -> c != null && OnOffType.ON.equals(c.getCache().getChannelState().as(OnOffType.class))));
374                 case ANY -> Optional.of(availabilityStates.values().stream().anyMatch(
375                         c -> c != null && OnOffType.ON.equals(c.getCache().getChannelState().as(OnOffType.class))));
376                 case LATEST -> Optional.of(lastValue);
377             };
378         }
379         updateThingStatus(messageReceived.get(), availabilityTopicsSeen);
380     }
381
382     protected abstract void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen);
383 }