]> git.basschouten.com Git - openhab-addons.git/blob
cb305be7841f87891b4483f50f2a02fb9fe67b53
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.homie.internal.handler;
14
15 import java.util.List;
16 import java.util.Objects;
17 import java.util.Optional;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.function.Consumer;
21 import java.util.stream.Collectors;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
26 import org.openhab.binding.mqtt.generic.ChannelState;
27 import org.openhab.binding.mqtt.generic.MqttChannelStateDescriptionProvider;
28 import org.openhab.binding.mqtt.generic.MqttChannelTypeProvider;
29 import org.openhab.binding.mqtt.generic.tools.DelayedBatchProcessing;
30 import org.openhab.binding.mqtt.homie.generic.internal.MqttBindingConstants;
31 import org.openhab.binding.mqtt.homie.internal.homie300.Device;
32 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes;
33 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes.ReadyState;
34 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceCallback;
35 import org.openhab.binding.mqtt.homie.internal.homie300.HandlerConfiguration;
36 import org.openhab.binding.mqtt.homie.internal.homie300.Node;
37 import org.openhab.binding.mqtt.homie.internal.homie300.Property;
38 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
39 import org.openhab.core.thing.Channel;
40 import org.openhab.core.thing.ChannelUID;
41 import org.openhab.core.thing.Thing;
42 import org.openhab.core.thing.ThingStatus;
43 import org.openhab.core.thing.ThingStatusDetail;
44 import org.openhab.core.thing.ThingTypeUID;
45 import org.openhab.core.thing.type.ChannelGroupDefinition;
46 import org.openhab.core.thing.type.ChannelTypeRegistry;
47 import org.openhab.core.thing.type.ThingType;
48 import org.openhab.core.types.CommandDescription;
49 import org.openhab.core.types.StateDescription;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 /**
54  * Handles MQTT topics that follow the Homie MQTT convention. The convention specifies a MQTT topic layout
55  * and defines Devices, Nodes and Properties, corresponding to Things, Channel Groups and Channels respectively.
56  *
57  * @author David Graeff - Initial contribution
58  */
59 @NonNullByDefault
60 public class HomieThingHandler extends AbstractMQTTThingHandler implements DeviceCallback, Consumer<List<Object>> {
61     private final Logger logger = LoggerFactory.getLogger(HomieThingHandler.class);
62     protected Device device;
63     protected final MqttChannelTypeProvider channelTypeProvider;
64     protected final MqttChannelStateDescriptionProvider stateDescriptionProvider;
65     protected final ChannelTypeRegistry channelTypeRegistry;
66     /** The timeout per attribute field subscription */
67     protected final int attributeReceiveTimeout;
68     protected final int subscribeTimeout;
69     protected final int deviceTimeout;
70     protected HandlerConfiguration config = new HandlerConfiguration();
71     protected DelayedBatchProcessing<Object> delayedProcessing;
72     private @Nullable ScheduledFuture<?> heartBeatTimer;
73
74     /**
75      * Create a new thing handler for homie discovered things. A channel type provider and a topic value receive timeout
76      * must be provided.
77      *
78      * @param thing The thing of this handler
79      * @param channelTypeProvider A channel type provider
80      * @param stateDescriptionProvider A state description provider
81      * @param channelTypeRegistry The channel type registry
82      * @param deviceTimeout Timeout for the entire device subscription. In milliseconds.
83      * @param subscribeTimeout Timeout for an entire attribute class subscription and receive. In milliseconds.
84      *            Even a slow remote device will publish a full node or property within 100ms.
85      * @param attributeReceiveTimeout The timeout per attribute field subscription. In milliseconds.
86      *            One attribute subscription and receiving should not take longer than 50ms.
87      */
88     public HomieThingHandler(Thing thing, MqttChannelTypeProvider channelTypeProvider,
89             MqttChannelStateDescriptionProvider stateDescriptionProvider, ChannelTypeRegistry channelTypeRegistry,
90             int deviceTimeout, int subscribeTimeout, int attributeReceiveTimeout) {
91         super(thing, deviceTimeout);
92         this.channelTypeProvider = channelTypeProvider;
93         this.stateDescriptionProvider = stateDescriptionProvider;
94         this.channelTypeRegistry = channelTypeRegistry;
95         this.deviceTimeout = deviceTimeout;
96         this.subscribeTimeout = subscribeTimeout;
97         this.attributeReceiveTimeout = attributeReceiveTimeout;
98         this.delayedProcessing = new DelayedBatchProcessing<>(subscribeTimeout, this, scheduler);
99         this.device = new Device(this.thing.getUID(), this, new DeviceAttributes());
100     }
101
102     /**
103      * Overwrite the {@link Device} and {@link DelayedBatchProcessing} object.
104      * Those are set in the constructor already, but require to be replaced for tests.
105      *
106      * @param device The device object
107      * @param delayedProcessing The delayed processing object
108      */
109     protected void setInternalObjects(Device device, DelayedBatchProcessing<Object> delayedProcessing) {
110         this.device = device;
111         this.delayedProcessing = delayedProcessing;
112     }
113
114     @Override
115     public void initialize() {
116         config = getConfigAs(HandlerConfiguration.class);
117         logger.debug("About to initialize Homie device {}", config.deviceid);
118         if (config.deviceid.isEmpty()) {
119             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Object ID unknown");
120             return;
121         }
122         device.initialize(config.basetopic, config.deviceid, thing.getChannels());
123
124         updateThingType();
125         if (getThing().getThingTypeUID().equals(MqttBindingConstants.HOMIE300_MQTT_THING)) {
126             logger.debug("Migrating Homie thing {} from generic type to dynamic type {}", getThing().getUID(),
127                     device.thingTypeUID);
128             changeThingType(device.thingTypeUID, getConfig());
129             return;
130         } else {
131             updateChannels();
132         }
133
134         super.initialize();
135     }
136
137     @Override
138     public void handleRemoval() {
139         this.stop();
140         if (config.removetopics) {
141             this.removeRetainedTopics();
142         }
143         super.handleRemoval();
144     }
145
146     @Override
147     protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
148         logger.debug("About to start Homie device {}", config.deviceid);
149         if (connection.getQos() != 1) {
150             // QoS 1 is required.
151             logger.warn(
152                     "Homie devices require QoS 1 but Qos 0/2 is configured. Using override. Please check the configuration");
153             connection.setQos(1);
154         }
155         return device.subscribe(connection, scheduler, attributeReceiveTimeout)
156                 .thenCompose((Void v) -> device.startChannels(connection, scheduler, attributeReceiveTimeout, this))
157                 .thenRun(() -> {
158                     logger.debug("Homie device {} fully attached (start)", config.deviceid);
159                 });
160     }
161
162     @Override
163     protected void stop() {
164         logger.debug("About to stop Homie device {}", config.deviceid);
165         final ScheduledFuture<?> heartBeatTimer = this.heartBeatTimer;
166         if (heartBeatTimer != null) {
167             heartBeatTimer.cancel(false);
168             this.heartBeatTimer = null;
169         }
170         delayedProcessing.join();
171         device.stop();
172         channelTypeProvider.removeThingType(device.thingTypeUID);
173         super.stop();
174     }
175
176     @Override
177     public CompletableFuture<Void> unsubscribeAll() {
178         // already unsubscribed everything by calling stop()
179         return CompletableFuture.allOf();
180     }
181
182     @Override
183     public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
184         Property property = device.getProperty(channelUID);
185         return property != null ? property.getChannelState() : null;
186     }
187
188     @Override
189     public void readyStateChanged(ReadyState state) {
190         switch (state) {
191             case alert:
192                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.CONFIGURATION_ERROR);
193                 break;
194             case disconnected:
195                 updateStatus(ThingStatus.OFFLINE);
196                 break;
197             case init:
198                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.CONFIGURATION_PENDING);
199                 break;
200             case lost:
201                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.GONE, "Device did not send heartbeat in time");
202                 break;
203             case ready:
204                 updateStatus(ThingStatus.ONLINE);
205                 break;
206             case sleeping:
207                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.DUTY_CYCLE);
208                 break;
209             case unknown:
210                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.GONE, "Device did not publish a ready state");
211                 break;
212             default:
213                 break;
214         }
215     }
216
217     @Override
218     public void nodeRemoved(Node node) {
219         channelTypeProvider.removeChannelGroupType(node.channelGroupTypeUID);
220         delayedProcessing.accept(node);
221     }
222
223     @Override
224     public void propertyRemoved(Property property) {
225         stateDescriptionProvider.remove(property.getChannelUID());
226         delayedProcessing.accept(property);
227     }
228
229     @Override
230     public void nodeAddedOrChanged(Node node) {
231         channelTypeProvider.setChannelGroupType(node.channelGroupTypeUID, node.type());
232         delayedProcessing.accept(node);
233     }
234
235     @Override
236     public void propertyAddedOrChanged(Property property) {
237         ChannelUID channelUID = property.getChannelUID();
238         stateDescriptionProvider.remove(channelUID);
239         StateDescription stateDescription = property.getStateDescription();
240         if (stateDescription != null) {
241             stateDescriptionProvider.setDescription(channelUID, stateDescription);
242         }
243         CommandDescription commandDescription = property.getCommandDescription();
244         if (commandDescription != null) {
245             stateDescriptionProvider.setDescription(channelUID, commandDescription);
246         }
247         delayedProcessing.accept(property);
248     }
249
250     /**
251      * Callback of {@link DelayedBatchProcessing}.
252      * Add all newly discovered nodes and properties to the Thing and start subscribe to each channel state topic.
253      */
254     @Override
255     public void accept(@Nullable List<Object> t) {
256         if (!device.isInitialized()) {
257             return;
258         }
259         updateProperty(MqttBindingConstants.HOMIE_PROPERTY_VERSION, device.attributes.homie);
260         updateThingType();
261         updateChannels();
262         final MqttBrokerConnection connection = this.connection;
263         if (connection != null) {
264             device.startChannels(connection, scheduler, attributeReceiveTimeout, this).thenRun(() -> {
265                 logger.debug("Homie device {} fully attached (accept)", config.deviceid);
266             });
267         }
268     }
269
270     /**
271      * Removes all retained topics related to the device
272      */
273     private void removeRetainedTopics() {
274         MqttBrokerConnection connection = this.connection;
275         if (connection == null) {
276             logger.warn("couldn't remove retained topics for {} because connection is null", thing.getUID());
277             return;
278         }
279         device.getRetainedTopics().stream().map(d -> String.format("%s/%s", config.basetopic, d))
280                 .collect(Collectors.toList()).forEach(t -> connection.publish(t, new byte[0], 1, true));
281     }
282
283     @Override
284     protected void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen) {
285         // not used here
286     }
287
288     private void updateThingType() {
289         // Make sure any dynamic channel types exist (i.e. ones created for a number channel with a specific dimension)
290         device.nodes.stream().flatMap(n -> n.properties.stream()).map(Property::getChannelType).filter(Objects::nonNull)
291                 .forEach(ct -> channelTypeProvider.setChannelType(ct.getUID(), ct));
292
293         // if this is a dynamic type, then we update the type
294         ThingTypeUID typeID = device.thingTypeUID;
295         if (!MqttBindingConstants.HOMIE300_MQTT_THING.equals(typeID)) {
296             device.nodes.stream()
297                     .forEach(n -> channelTypeProvider.setChannelGroupType(n.channelGroupTypeUID, n.type()));
298
299             List<ChannelGroupDefinition> groupDefs = device.nodes().stream().map(Node::getChannelGroupDefinition)
300                     .collect(Collectors.toList());
301             var builder = channelTypeProvider.derive(typeID, MqttBindingConstants.HOMIE300_MQTT_THING)
302                     .withChannelGroupDefinitions(groupDefs);
303             ThingType thingType = builder.build();
304
305             channelTypeProvider.setThingType(typeID, thingType);
306         }
307     }
308
309     private void updateChannels() {
310         List<Channel> channels = device.nodes().stream().flatMap(n -> n.properties.stream())
311                 .map(p -> p.getChannel(channelTypeRegistry)).collect(Collectors.toList());
312         updateThing(editThing().withChannels(channels).build());
313     }
314 }