2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.generic;
15 import java.util.HashSet;
17 import java.util.Optional;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import java.util.concurrent.atomic.AtomicBoolean;
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.openhab.binding.mqtt.generic.utils.FutureCollector;
29 import org.openhab.binding.mqtt.generic.values.OnOffValue;
30 import org.openhab.binding.mqtt.generic.values.Value;
31 import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
32 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
33 import org.openhab.core.library.types.OnOffType;
34 import org.openhab.core.thing.Bridge;
35 import org.openhab.core.thing.ChannelGroupUID;
36 import org.openhab.core.thing.ChannelUID;
37 import org.openhab.core.thing.Thing;
38 import org.openhab.core.thing.ThingStatus;
39 import org.openhab.core.thing.ThingStatusDetail;
40 import org.openhab.core.thing.ThingStatusInfo;
41 import org.openhab.core.thing.binding.BaseThingHandler;
42 import org.openhab.core.types.Command;
43 import org.openhab.core.types.RefreshType;
44 import org.openhab.core.types.State;
45 import org.openhab.core.types.UnDefType;
46 import org.openhab.core.util.UIDUtils;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
51 * Base class for MQTT thing handlers. If you are going to implement an MQTT convention, you probably
52 * want to inherit from here.
55 * This base class will make sure you get a working {@link MqttBrokerConnection}, you will be informed
56 * when to start your subscriptions ({@link #start(MqttBrokerConnection)}) and when to free your resources
57 * because of a lost connection ({@link AbstractMQTTThingHandler#stop()}).
60 * If you inherit from this base class, you must use {@link ChannelState} to (a) keep a cached channel value,
61 * (b) to link a MQTT topic value to a channel value ("MQTT state topic") and (c) to have a secondary MQTT topic
62 * where any changes to the {@link ChannelState} are send to ("MQTT command topic").
65 * You are expected to keep your channel data structure organized in a way, to resolve a {@link ChannelUID} to
66 * the corresponding {@link ChannelState} in {@link #getChannelState(ChannelUID)}.
69 * To inform the framework of changed values, received via MQTT, a {@link ChannelState} calls a listener callback.
70 * While setting up your {@link ChannelState} you would set the callback to your thing handler,
71 * because this base class implements {@link ChannelStateUpdateListener}.
73 * @author David Graeff - Initial contribution
76 public abstract class AbstractMQTTThingHandler extends BaseThingHandler
77 implements ChannelStateUpdateListener, AvailabilityTracker {
78 private final Logger logger = LoggerFactory.getLogger(AbstractMQTTThingHandler.class);
79 // Timeout for the entire tree parsing and subscription
80 private final int subscribeTimeout;
82 protected @Nullable MqttBrokerConnection connection;
84 private AtomicBoolean messageReceived = new AtomicBoolean(false);
85 private Map<String, @Nullable ChannelState> availabilityStates = new ConcurrentHashMap<>();
86 private AvailabilityMode availabilityMode = AvailabilityMode.ALL;
88 public AbstractMQTTThingHandler(Thing thing, int subscribeTimeout) {
90 this.subscribeTimeout = subscribeTimeout;
94 * Return the channel state for the given channelUID.
96 * @param channelUID The channelUID
97 * @return A channel state. May be null.
99 public abstract @Nullable ChannelState getChannelState(ChannelUID channelUID);
102 * Start the topic discovery and subscribe to all channel state topics on all {@link ChannelState}s.
103 * Put the thing ONLINE on success otherwise complete the returned future exceptionally.
105 * @param connection A started broker connection
106 * @return A future that completes normal on success and exceptionally on any errors.
108 protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
109 return availabilityStates.values().stream().map(cChannel -> {
110 final CompletableFuture<@Nullable Void> fut = cChannel == null ? CompletableFuture.completedFuture(null)
111 : cChannel.start(connection, scheduler, 0);
113 }).collect(FutureCollector.allOf());
117 * Called when the MQTT connection disappeared.
118 * You should clean up all resources that depend on a working connection.
120 protected void stop() {
121 clearAllAvailabilityTopics();
122 resetMessageReceived();
126 public void handleCommand(ChannelUID channelUID, Command command) {
127 if (connection == null) {
131 final @Nullable ChannelState data = getChannelState(channelUID);
134 logger.warn("Channel {} not supported!", channelUID);
138 if (command instanceof RefreshType) {
139 State state = data.getCache().getChannelState();
140 if (state instanceof UnDefType) {
141 logger.debug("Channel {} received REFRESH but no value cached, ignoring", channelUID);
143 updateState(channelUID, state);
148 if (data.isReadOnly()) {
149 logger.trace("Channel {} is a read-only channel, ignoring command {}", channelUID, command);
153 final CompletableFuture<Boolean> future = data.publishValue(command);
154 future.handle((v, ex) -> {
156 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, ex.getLocalizedMessage());
157 logger.debug("Failed publishing value {} to topic {}: {}", command, data.getCommandTopic(),
160 logger.debug("Successfully published value {} to topic {}", command, data.getCommandTopic());
167 public void bridgeStatusChanged(ThingStatusInfo bridgeStatusInfo) {
168 if (bridgeStatusInfo.getStatus() == ThingStatus.OFFLINE) {
169 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE);
174 if (bridgeStatusInfo.getStatus() != ThingStatus.ONLINE) {
175 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR);
180 AbstractBrokerHandler h = getBridgeHandler();
182 resetMessageReceived();
183 logger.warn("Bridge handler not found!");
187 final MqttBrokerConnection connection;
189 connection = h.getConnectionAsync().get(500, TimeUnit.MILLISECONDS);
190 } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
191 resetMessageReceived();
192 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_UNINITIALIZED,
193 "Bridge handler has no valid broker connection!");
196 this.connection = connection;
198 // Start up (subscribe to MQTT topics). Limit with a timeout and catch exceptions.
199 // We do not set the thing to ONLINE here in the AbstractBase, that is the responsibility of a derived
202 start(connection).get(subscribeTimeout, TimeUnit.MILLISECONDS);
203 } catch (InterruptedException | ExecutionException | TimeoutException ignored) {
204 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
205 "Did not receive all required topics");
210 * Return the bride handler. The bridge is from the "MQTT" bundle.
212 public @Nullable AbstractBrokerHandler getBridgeHandler() {
213 Bridge bridge = getBridge();
214 if (bridge == null) {
217 return (AbstractBrokerHandler) bridge.getHandler();
221 * Return the bridge status.
223 public ThingStatusInfo getBridgeStatus() {
224 Bridge b = getBridge();
226 return b.getStatusInfo();
228 return new ThingStatusInfo(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null);
233 public void initialize() {
234 bridgeStatusChanged(getBridgeStatus());
238 public void handleRemoval() {
240 super.handleRemoval();
244 public void dispose() {
247 unsubscribeAll().get(500, TimeUnit.MILLISECONDS);
248 } catch (InterruptedException | ExecutionException | TimeoutException e) {
249 logger.warn("unsubscription on disposal failed for {}: ", thing.getUID(), e);
256 * this method must unsubscribe all topics used by this thing handler
260 public abstract CompletableFuture<Void> unsubscribeAll();
263 public void updateChannelState(ChannelUID channelUID, State value) {
264 if (messageReceived.compareAndSet(false, true)) {
265 calculateAndUpdateThingStatus(true);
267 super.updateState(channelUID, value);
271 public void triggerChannel(ChannelUID channelUID, String event) {
272 if (messageReceived.compareAndSet(false, true)) {
273 calculateAndUpdateThingStatus(true);
275 super.triggerChannel(channelUID, event);
279 public void postChannelCommand(ChannelUID channelUID, Command command) {
280 postCommand(channelUID, command);
283 public @Nullable MqttBrokerConnection getConnection() {
288 * This is for tests only to inject a broker connection.
290 * @param connection MQTT Broker connection
292 public void setConnection(MqttBrokerConnection connection) {
293 this.connection = connection;
297 public void setAvailabilityMode(AvailabilityMode mode) {
298 this.availabilityMode = mode;
302 public void addAvailabilityTopic(String availability_topic, String payload_available,
303 String payload_not_available) {
304 addAvailabilityTopic(availability_topic, payload_available, payload_not_available, null, null);
308 public void addAvailabilityTopic(String availability_topic, String payload_available, String payload_not_available,
309 @Nullable String transformation_pattern,
310 @Nullable TransformationServiceProvider transformationServiceProvider) {
311 availabilityStates.computeIfAbsent(availability_topic, topic -> {
312 Value value = new OnOffValue(payload_available, payload_not_available);
313 ChannelGroupUID groupUID = new ChannelGroupUID(getThing().getUID(), "availability");
314 ChannelUID channelUID = new ChannelUID(groupUID, UIDUtils.encode(topic));
315 ChannelState state = new ChannelState(ChannelConfigBuilder.create().withStateTopic(topic).build(),
316 channelUID, value, new ChannelStateUpdateListener() {
318 public void updateChannelState(ChannelUID channelUID, State value) {
319 boolean online = value.equals(OnOffType.ON);
320 calculateAndUpdateThingStatus(online);
324 public void triggerChannel(ChannelUID channelUID, String eventPayload) {
328 public void postChannelCommand(ChannelUID channelUID, Command value) {
331 if (transformation_pattern != null && transformationServiceProvider != null) {
332 state.addTransformation(transformation_pattern, transformationServiceProvider);
334 MqttBrokerConnection connection = getConnection();
335 if (connection != null) {
336 state.start(connection, scheduler, 0);
344 public void removeAvailabilityTopic(String availabilityTopic) {
345 availabilityStates.computeIfPresent(availabilityTopic, (topic, state) -> {
346 if (connection != null && state != null) {
354 public void clearAllAvailabilityTopics() {
355 Set<String> topics = new HashSet<>(availabilityStates.keySet());
356 topics.forEach(this::removeAvailabilityTopic);
360 public void resetMessageReceived() {
361 if (messageReceived.compareAndSet(true, false)) {
362 calculateAndUpdateThingStatus(false);
366 protected void calculateAndUpdateThingStatus(boolean lastValue) {
367 final Optional<Boolean> availabilityTopicsSeen;
369 if (availabilityStates.isEmpty()) {
370 availabilityTopicsSeen = Optional.empty();
372 availabilityTopicsSeen = switch (availabilityMode) {
373 case ALL -> Optional.of(availabilityStates.values().stream().allMatch(
374 c -> c != null && OnOffType.ON.equals(c.getCache().getChannelState().as(OnOffType.class))));
375 case ANY -> Optional.of(availabilityStates.values().stream().anyMatch(
376 c -> c != null && OnOffType.ON.equals(c.getCache().getChannelState().as(OnOffType.class))));
377 case LATEST -> Optional.of(lastValue);
380 updateThingStatus(messageReceived.get(), availabilityTopicsSeen);
383 protected abstract void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen);