]> git.basschouten.com Git - openhab-addons.git/blob
f4e1899b65529bf2916a13824803068a797f43a2
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.homie.internal.handler;
14
15 import java.util.Collection;
16 import java.util.List;
17 import java.util.Objects;
18 import java.util.Optional;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.function.Consumer;
22 import java.util.stream.Collectors;
23 import java.util.stream.Stream;
24
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
28 import org.openhab.binding.mqtt.generic.ChannelState;
29 import org.openhab.binding.mqtt.generic.MqttChannelStateDescriptionProvider;
30 import org.openhab.binding.mqtt.generic.MqttChannelTypeProvider;
31 import org.openhab.binding.mqtt.generic.tools.DelayedBatchProcessing;
32 import org.openhab.binding.mqtt.homie.generic.internal.MqttBindingConstants;
33 import org.openhab.binding.mqtt.homie.internal.homie300.Device;
34 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes;
35 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes.ReadyState;
36 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceCallback;
37 import org.openhab.binding.mqtt.homie.internal.homie300.HandlerConfiguration;
38 import org.openhab.binding.mqtt.homie.internal.homie300.Node;
39 import org.openhab.binding.mqtt.homie.internal.homie300.Property;
40 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
41 import org.openhab.core.thing.Channel;
42 import org.openhab.core.thing.ChannelUID;
43 import org.openhab.core.thing.Thing;
44 import org.openhab.core.thing.ThingStatus;
45 import org.openhab.core.thing.ThingStatusDetail;
46 import org.openhab.core.thing.ThingTypeUID;
47 import org.openhab.core.thing.type.ChannelGroupDefinition;
48 import org.openhab.core.thing.type.ChannelTypeRegistry;
49 import org.openhab.core.thing.type.ThingType;
50 import org.openhab.core.types.CommandDescription;
51 import org.openhab.core.types.StateDescription;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * Handles MQTT topics that follow the Homie MQTT convention. The convention specifies a MQTT topic layout
57  * and defines Devices, Nodes and Properties, corresponding to Things, Channel Groups and Channels respectively.
58  *
59  * @author David Graeff - Initial contribution
60  */
61 @NonNullByDefault
62 public class HomieThingHandler extends AbstractMQTTThingHandler implements DeviceCallback, Consumer<List<Object>> {
63     private final Logger logger = LoggerFactory.getLogger(HomieThingHandler.class);
64     protected Device device;
65     protected final MqttChannelTypeProvider channelTypeProvider;
66     protected final MqttChannelStateDescriptionProvider stateDescriptionProvider;
67     protected final ChannelTypeRegistry channelTypeRegistry;
68     /** The timeout per attribute field subscription */
69     protected final int attributeReceiveTimeout;
70     protected final int subscribeTimeout;
71     protected final int deviceTimeout;
72     protected HandlerConfiguration config = new HandlerConfiguration();
73     protected DelayedBatchProcessing<Object> delayedProcessing;
74     private @Nullable ScheduledFuture<?> heartBeatTimer;
75
76     /**
77      * Create a new thing handler for homie discovered things. A channel type provider and a topic value receive timeout
78      * must be provided.
79      *
80      * @param thing The thing of this handler
81      * @param channelTypeProvider A channel type provider
82      * @param stateDescriptionProvider A state description provider
83      * @param channelTypeRegistry The channel type registry
84      * @param deviceTimeout Timeout for the entire device subscription. In milliseconds.
85      * @param subscribeTimeout Timeout for an entire attribute class subscription and receive. In milliseconds.
86      *            Even a slow remote device will publish a full node or property within 100ms.
87      * @param attributeReceiveTimeout The timeout per attribute field subscription. In milliseconds.
88      *            One attribute subscription and receiving should not take longer than 50ms.
89      */
90     public HomieThingHandler(Thing thing, MqttChannelTypeProvider channelTypeProvider,
91             MqttChannelStateDescriptionProvider stateDescriptionProvider, ChannelTypeRegistry channelTypeRegistry,
92             int deviceTimeout, int subscribeTimeout, int attributeReceiveTimeout) {
93         super(thing, deviceTimeout);
94         this.channelTypeProvider = channelTypeProvider;
95         this.stateDescriptionProvider = stateDescriptionProvider;
96         this.channelTypeRegistry = channelTypeRegistry;
97         this.deviceTimeout = deviceTimeout;
98         this.subscribeTimeout = subscribeTimeout;
99         this.attributeReceiveTimeout = attributeReceiveTimeout;
100         this.delayedProcessing = new DelayedBatchProcessing<>(subscribeTimeout, this, scheduler);
101         this.device = new Device(this.thing.getUID(), this, new DeviceAttributes());
102     }
103
104     /**
105      * Overwrite the {@link Device} and {@link DelayedBatchProcessing} object.
106      * Those are set in the constructor already, but require to be replaced for tests.
107      *
108      * @param device The device object
109      * @param delayedProcessing The delayed processing object
110      */
111     protected void setInternalObjects(Device device, DelayedBatchProcessing<Object> delayedProcessing) {
112         this.device = device;
113         this.delayedProcessing = delayedProcessing;
114     }
115
116     @Override
117     public void initialize() {
118         config = getConfigAs(HandlerConfiguration.class);
119         logger.debug("About to initialize Homie device {}", config.deviceid);
120         if (config.deviceid.isEmpty()) {
121             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Object ID unknown");
122             return;
123         }
124         device.initialize(config.basetopic, config.deviceid, thing.getChannels());
125
126         updateThingType();
127         if (getThing().getThingTypeUID().equals(MqttBindingConstants.HOMIE300_MQTT_THING)) {
128             logger.debug("Migrating Homie thing {} from generic type to dynamic type {}", getThing().getUID(),
129                     device.thingTypeUID);
130             changeThingType(device.thingTypeUID, getConfig());
131             return;
132         } else {
133             updateChannels();
134         }
135
136         super.initialize();
137     }
138
139     @Override
140     public void handleRemoval() {
141         this.stop();
142         if (config.removetopics) {
143             this.removeRetainedTopics();
144         }
145         channelTypeProvider.removeThingType(thing.getThingTypeUID());
146         channelTypeProvider.removeChannelGroupTypesForPrefix(thing.getThingTypeUID().getId());
147         super.handleRemoval();
148     }
149
150     @Override
151     protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
152         logger.debug("About to start Homie device {}", config.deviceid);
153         if (connection.getQos() != 1) {
154             // QoS 1 is required.
155             logger.warn(
156                     "Homie devices require QoS 1 but Qos 0/2 is configured. Using override. Please check the configuration");
157             connection.setQos(1);
158         }
159         return device.subscribe(connection, scheduler, attributeReceiveTimeout)
160                 .thenCompose((Void v) -> device.startChannels(connection, scheduler, attributeReceiveTimeout, this))
161                 .thenRun(() -> {
162                     logger.debug("Homie device {} fully attached (start)", config.deviceid);
163                 });
164     }
165
166     @Override
167     protected void stop() {
168         logger.debug("About to stop Homie device {}", config.deviceid);
169         final ScheduledFuture<?> heartBeatTimer = this.heartBeatTimer;
170         if (heartBeatTimer != null) {
171             heartBeatTimer.cancel(false);
172             this.heartBeatTimer = null;
173         }
174         delayedProcessing.join();
175         device.stop();
176         super.stop();
177     }
178
179     @Override
180     public CompletableFuture<Void> unsubscribeAll() {
181         // already unsubscribed everything by calling stop()
182         return CompletableFuture.allOf();
183     }
184
185     @Override
186     public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
187         Property property = device.getProperty(channelUID);
188         return property != null ? property.getChannelState() : null;
189     }
190
191     @Override
192     public void readyStateChanged(ReadyState state) {
193         switch (state) {
194             case alert:
195                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.CONFIGURATION_ERROR);
196                 break;
197             case disconnected:
198                 updateStatus(ThingStatus.OFFLINE);
199                 break;
200             case init:
201                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.CONFIGURATION_PENDING);
202                 break;
203             case lost:
204                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.GONE, "Device did not send heartbeat in time");
205                 break;
206             case ready:
207                 updateStatus(ThingStatus.ONLINE);
208                 break;
209             case sleeping:
210                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.DUTY_CYCLE);
211                 break;
212             case unknown:
213                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.GONE, "Device did not publish a ready state");
214                 break;
215             default:
216                 break;
217         }
218     }
219
220     @Override
221     public void nodeRemoved(Node node) {
222         delayedProcessing.accept(node);
223     }
224
225     @Override
226     public void propertyRemoved(Property property) {
227         stateDescriptionProvider.remove(property.getChannelUID());
228         delayedProcessing.accept(property);
229     }
230
231     @Override
232     public void nodeAddedOrChanged(Node node) {
233         delayedProcessing.accept(node);
234     }
235
236     @Override
237     public void propertyAddedOrChanged(Property property) {
238         ChannelUID channelUID = property.getChannelUID();
239         stateDescriptionProvider.remove(channelUID);
240         StateDescription stateDescription = property.getStateDescription();
241         if (stateDescription != null) {
242             stateDescriptionProvider.setDescription(channelUID, stateDescription);
243         }
244         CommandDescription commandDescription = property.getCommandDescription();
245         if (commandDescription != null) {
246             stateDescriptionProvider.setDescription(channelUID, commandDescription);
247         }
248         delayedProcessing.accept(property);
249     }
250
251     /**
252      * Callback of {@link DelayedBatchProcessing}.
253      * Add all newly discovered nodes and properties to the Thing and start subscribe to each channel state topic.
254      */
255     @Override
256     public void accept(@Nullable List<Object> t) {
257         if (!device.isInitialized()) {
258             return;
259         }
260         updateProperty(MqttBindingConstants.HOMIE_PROPERTY_VERSION, device.attributes.homie);
261         updateThingType();
262         updateChannels();
263         final MqttBrokerConnection connection = this.connection;
264         if (connection != null) {
265             device.startChannels(connection, scheduler, attributeReceiveTimeout, this).thenRun(() -> {
266                 logger.debug("Homie device {} fully attached (accept)", config.deviceid);
267             });
268         }
269     }
270
271     /**
272      * Removes all retained topics related to the device
273      */
274     private void removeRetainedTopics() {
275         MqttBrokerConnection connection = this.connection;
276         if (connection == null) {
277             logger.warn("couldn't remove retained topics for {} because connection is null", thing.getUID());
278             return;
279         }
280         device.getRetainedTopics().stream().map(d -> String.format("%s/%s", config.basetopic, d))
281                 .collect(Collectors.toList()).forEach(t -> connection.publish(t, new byte[0], 1, true));
282     }
283
284     @Override
285     protected void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen) {
286         // not used here
287     }
288
289     private void updateThingType() {
290         // Make sure any dynamic channel types exist (i.e. ones created for a number channel with a specific dimension)
291         device.nodes.stream().flatMap(n -> n.properties.stream()).map(Property::getChannelType).filter(Objects::nonNull)
292                 .forEach(ct -> channelTypeProvider.putChannelType(Objects.requireNonNull(ct)));
293
294         // if this is a dynamic type, then we update the type
295         ThingTypeUID typeID = device.thingTypeUID;
296         if (!MqttBindingConstants.HOMIE300_MQTT_THING.equals(typeID)) {
297             channelTypeProvider.updateChannelGroupTypesForPrefix(thing.getThingTypeUID().getId(), device.nodes.stream()
298                     .map(n -> n.type(thing.getThingTypeUID().getId(), channelTypeProvider)).toList());
299
300             List<ChannelGroupDefinition> groupDefs = device.nodes.stream(nodeOrder())
301                     .map(n -> n.getChannelGroupDefinition(thing.getThingTypeUID().getId())).toList();
302             var builder = channelTypeProvider.derive(typeID, MqttBindingConstants.HOMIE300_MQTT_THING)
303                     .withChannelGroupDefinitions(groupDefs);
304
305             channelTypeProvider.putThingType(builder.build());
306         }
307     }
308
309     private void updateChannels() {
310         List<Channel> channels = device.nodes.stream(nodeOrder())
311                 .flatMap(node -> node.properties
312                         .stream(node.propertyOrder(thing.getThingTypeUID().getId(), channelTypeProvider))
313                         .map(p -> p.getChannel(channelTypeRegistry)))
314                 .toList();
315         updateThing(editThing().withChannels(channels).build());
316     }
317
318     private Collection<String> nodeOrder() {
319         String[] nodes = device.attributes.nodes;
320         if (nodes != null) {
321             return Stream.of(nodes).toList();
322         }
323         ThingType thingType = channelTypeProvider.getThingType(thing.getThingTypeUID(), null);
324         if (thingType != null) {
325             return thingType.getChannelGroupDefinitions().stream().map(ChannelGroupDefinition::getId).toList();
326         }
327
328         return device.nodes.keySet();
329     }
330 }