2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.homie.internal.handler;
15 import java.util.List;
16 import java.util.Optional;
17 import java.util.concurrent.CompletableFuture;
18 import java.util.concurrent.ScheduledFuture;
19 import java.util.function.Consumer;
20 import java.util.stream.Collectors;
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
25 import org.openhab.binding.mqtt.generic.ChannelState;
26 import org.openhab.binding.mqtt.generic.MqttChannelTypeProvider;
27 import org.openhab.binding.mqtt.generic.tools.DelayedBatchProcessing;
28 import org.openhab.binding.mqtt.homie.generic.internal.MqttBindingConstants;
29 import org.openhab.binding.mqtt.homie.internal.homie300.Device;
30 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes;
31 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes.ReadyState;
32 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceCallback;
33 import org.openhab.binding.mqtt.homie.internal.homie300.HandlerConfiguration;
34 import org.openhab.binding.mqtt.homie.internal.homie300.Node;
35 import org.openhab.binding.mqtt.homie.internal.homie300.Property;
36 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
37 import org.openhab.core.thing.Channel;
38 import org.openhab.core.thing.ChannelUID;
39 import org.openhab.core.thing.Thing;
40 import org.openhab.core.thing.ThingStatus;
41 import org.openhab.core.thing.ThingStatusDetail;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Handles MQTT topics that follow the Homie MQTT convention. The convention specifies a MQTT topic layout
47 * and defines Devices, Nodes and Properties, corresponding to Things, Channel Groups and Channels respectively.
49 * @author David Graeff - Initial contribution
52 public class HomieThingHandler extends AbstractMQTTThingHandler implements DeviceCallback, Consumer<List<Object>> {
53 private final Logger logger = LoggerFactory.getLogger(HomieThingHandler.class);
54 protected Device device;
55 protected final MqttChannelTypeProvider channelTypeProvider;
56 /** The timeout per attribute field subscription */
57 protected final int attributeReceiveTimeout;
58 protected final int subscribeTimeout;
59 protected final int deviceTimeout;
60 protected HandlerConfiguration config = new HandlerConfiguration();
61 protected DelayedBatchProcessing<Object> delayedProcessing;
62 private @Nullable ScheduledFuture<?> heartBeatTimer;
65 * Create a new thing handler for homie discovered things. A channel type provider and a topic value receive timeout
68 * @param thing The thing of this handler
69 * @param channelTypeProvider A channel type provider
70 * @param deviceTimeout Timeout for the entire device subscription. In milliseconds.
71 * @param subscribeTimeout Timeout for an entire attribute class subscription and receive. In milliseconds.
72 * Even a slow remote device will publish a full node or property within 100ms.
73 * @param attributeReceiveTimeout The timeout per attribute field subscription. In milliseconds.
74 * One attribute subscription and receiving should not take longer than 50ms.
76 public HomieThingHandler(Thing thing, MqttChannelTypeProvider channelTypeProvider, int deviceTimeout,
77 int subscribeTimeout, int attributeReceiveTimeout) {
78 super(thing, deviceTimeout);
79 this.channelTypeProvider = channelTypeProvider;
80 this.deviceTimeout = deviceTimeout;
81 this.subscribeTimeout = subscribeTimeout;
82 this.attributeReceiveTimeout = attributeReceiveTimeout;
83 this.delayedProcessing = new DelayedBatchProcessing<>(subscribeTimeout, this, scheduler);
84 this.device = new Device(this.thing.getUID(), this, new DeviceAttributes());
88 * Overwrite the {@link Device} and {@link DelayedBatchProcessing} object.
89 * Those are set in the constructor already, but require to be replaced for tests.
91 * @param device The device object
92 * @param delayedProcessing The delayed processing object
94 protected void setInternalObjects(Device device, DelayedBatchProcessing<Object> delayedProcessing) {
96 this.delayedProcessing = delayedProcessing;
100 public void initialize() {
101 config = getConfigAs(HandlerConfiguration.class);
102 logger.debug("About to initialize Homie device {}", config.deviceid);
103 if (config.deviceid.isEmpty()) {
104 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Object ID unknown");
107 device.initialize(config.basetopic, config.deviceid, thing.getChannels());
112 public void handleRemoval() {
114 if (config.removetopics) {
115 this.removeRetainedTopics();
117 super.handleRemoval();
121 protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
122 logger.debug("About to start Homie device {}", config.deviceid);
123 if (connection.getQos() != 1) {
124 // QoS 1 is required.
126 "Homie devices require QoS 1 but Qos 0/2 is configured. Using override. Please check the configuration");
127 connection.setQos(1);
129 return device.subscribe(connection, scheduler, attributeReceiveTimeout)
130 .thenCompose((Void v) -> device.startChannels(connection, scheduler, attributeReceiveTimeout, this))
132 logger.debug("Homie device {} fully attached (start)", config.deviceid);
137 protected void stop() {
138 logger.debug("About to stop Homie device {}", config.deviceid);
139 final ScheduledFuture<?> heartBeatTimer = this.heartBeatTimer;
140 if (heartBeatTimer != null) {
141 heartBeatTimer.cancel(false);
142 this.heartBeatTimer = null;
144 delayedProcessing.join();
150 public CompletableFuture<Void> unsubscribeAll() {
151 // already unsubscribed everything by calling stop()
152 return CompletableFuture.allOf();
156 public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
157 Property property = device.getProperty(channelUID);
158 return property != null ? property.getChannelState() : null;
162 public void readyStateChanged(ReadyState state) {
165 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.CONFIGURATION_ERROR);
168 updateStatus(ThingStatus.OFFLINE);
171 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.CONFIGURATION_PENDING);
174 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.GONE, "Device did not send heartbeat in time");
177 updateStatus(ThingStatus.ONLINE);
180 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.DUTY_CYCLE);
183 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.GONE, "Device did not publish a ready state");
191 public void nodeRemoved(Node node) {
192 channelTypeProvider.removeChannelGroupType(node.channelGroupTypeUID);
193 delayedProcessing.accept(node);
197 public void propertyRemoved(Property property) {
198 channelTypeProvider.removeChannelType(property.channelTypeUID);
199 delayedProcessing.accept(property);
203 public void nodeAddedOrChanged(Node node) {
204 channelTypeProvider.setChannelGroupType(node.channelGroupTypeUID, node.type());
205 delayedProcessing.accept(node);
209 public void propertyAddedOrChanged(Property property) {
210 channelTypeProvider.setChannelType(property.channelTypeUID, property.getType());
211 delayedProcessing.accept(property);
215 * Callback of {@link DelayedBatchProcessing}.
216 * Add all newly discovered nodes and properties to the Thing and start subscribe to each channel state topic.
219 public void accept(@Nullable List<Object> t) {
220 if (!device.isInitialized()) {
223 List<Channel> channels = device.nodes().stream().flatMap(n -> n.properties.stream()).map(Property::getChannel)
224 .collect(Collectors.toList());
225 updateThing(editThing().withChannels(channels).build());
226 updateProperty(MqttBindingConstants.HOMIE_PROPERTY_VERSION, device.attributes.homie);
227 final MqttBrokerConnection connection = this.connection;
228 if (connection != null) {
229 device.startChannels(connection, scheduler, attributeReceiveTimeout, this).thenRun(() -> {
230 logger.debug("Homie device {} fully attached (accept)", config.deviceid);
236 * Removes all retained topics related to the device
238 private void removeRetainedTopics() {
239 MqttBrokerConnection connection = this.connection;
240 if (connection == null) {
241 logger.warn("couldn't remove retained topics for {} because connection is null", thing.getUID());
244 device.getRetainedTopics().stream().map(d -> String.format("%s/%s", config.basetopic, d))
245 .collect(Collectors.toList()).forEach(t -> connection.publish(t, new byte[0], 1, true));
249 protected void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen) {