]> git.basschouten.com Git - openhab-addons.git/blob
96a5a2d596d20029879aa444c86677b98717bef1
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.homie.internal.handler;
14
15 import java.util.List;
16 import java.util.Optional;
17 import java.util.concurrent.CompletableFuture;
18 import java.util.concurrent.ScheduledFuture;
19 import java.util.function.Consumer;
20 import java.util.stream.Collectors;
21
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
25 import org.openhab.binding.mqtt.generic.ChannelState;
26 import org.openhab.binding.mqtt.generic.MqttChannelTypeProvider;
27 import org.openhab.binding.mqtt.generic.tools.DelayedBatchProcessing;
28 import org.openhab.binding.mqtt.homie.generic.internal.MqttBindingConstants;
29 import org.openhab.binding.mqtt.homie.internal.homie300.Device;
30 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes;
31 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes.ReadyState;
32 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceCallback;
33 import org.openhab.binding.mqtt.homie.internal.homie300.HandlerConfiguration;
34 import org.openhab.binding.mqtt.homie.internal.homie300.Node;
35 import org.openhab.binding.mqtt.homie.internal.homie300.Property;
36 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
37 import org.openhab.core.thing.Channel;
38 import org.openhab.core.thing.ChannelUID;
39 import org.openhab.core.thing.Thing;
40 import org.openhab.core.thing.ThingStatus;
41 import org.openhab.core.thing.ThingStatusDetail;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Handles MQTT topics that follow the Homie MQTT convention. The convention specifies a MQTT topic layout
47  * and defines Devices, Nodes and Properties, corresponding to Things, Channel Groups and Channels respectively.
48  *
49  * @author David Graeff - Initial contribution
50  */
51 @NonNullByDefault
52 public class HomieThingHandler extends AbstractMQTTThingHandler implements DeviceCallback, Consumer<List<Object>> {
53     private final Logger logger = LoggerFactory.getLogger(HomieThingHandler.class);
54     protected Device device;
55     protected final MqttChannelTypeProvider channelTypeProvider;
56     /** The timeout per attribute field subscription */
57     protected final int attributeReceiveTimeout;
58     protected final int subscribeTimeout;
59     protected final int deviceTimeout;
60     protected HandlerConfiguration config = new HandlerConfiguration();
61     protected DelayedBatchProcessing<Object> delayedProcessing;
62     private @Nullable ScheduledFuture<?> heartBeatTimer;
63
64     /**
65      * Create a new thing handler for homie discovered things. A channel type provider and a topic value receive timeout
66      * must be provided.
67      *
68      * @param thing The thing of this handler
69      * @param channelTypeProvider A channel type provider
70      * @param deviceTimeout Timeout for the entire device subscription. In milliseconds.
71      * @param subscribeTimeout Timeout for an entire attribute class subscription and receive. In milliseconds.
72      *            Even a slow remote device will publish a full node or property within 100ms.
73      * @param attributeReceiveTimeout The timeout per attribute field subscription. In milliseconds.
74      *            One attribute subscription and receiving should not take longer than 50ms.
75      */
76     public HomieThingHandler(Thing thing, MqttChannelTypeProvider channelTypeProvider, int deviceTimeout,
77             int subscribeTimeout, int attributeReceiveTimeout) {
78         super(thing, deviceTimeout);
79         this.channelTypeProvider = channelTypeProvider;
80         this.deviceTimeout = deviceTimeout;
81         this.subscribeTimeout = subscribeTimeout;
82         this.attributeReceiveTimeout = attributeReceiveTimeout;
83         this.delayedProcessing = new DelayedBatchProcessing<>(subscribeTimeout, this, scheduler);
84         this.device = new Device(this.thing.getUID(), this, new DeviceAttributes());
85     }
86
87     /**
88      * Overwrite the {@link Device} and {@link DelayedBatchProcessing} object.
89      * Those are set in the constructor already, but require to be replaced for tests.
90      *
91      * @param device The device object
92      * @param delayedProcessing The delayed processing object
93      */
94     protected void setInternalObjects(Device device, DelayedBatchProcessing<Object> delayedProcessing) {
95         this.device = device;
96         this.delayedProcessing = delayedProcessing;
97     }
98
99     @Override
100     public void initialize() {
101         config = getConfigAs(HandlerConfiguration.class);
102         logger.debug("About to initialize Homie device {}", config.deviceid);
103         if (config.deviceid.isEmpty()) {
104             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Object ID unknown");
105             return;
106         }
107         device.initialize(config.basetopic, config.deviceid, thing.getChannels());
108         super.initialize();
109     }
110
111     @Override
112     public void handleRemoval() {
113         this.stop();
114         if (config.removetopics) {
115             this.removeRetainedTopics();
116         }
117         super.handleRemoval();
118     }
119
120     @Override
121     protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
122         logger.debug("About to start Homie device {}", config.deviceid);
123         if (connection.getQos() != 1) {
124             // QoS 1 is required.
125             logger.warn(
126                     "Homie devices require QoS 1 but Qos 0/2 is configured. Using override. Please check the configuration");
127             connection.setQos(1);
128         }
129         return device.subscribe(connection, scheduler, attributeReceiveTimeout)
130                 .thenCompose((Void v) -> device.startChannels(connection, scheduler, attributeReceiveTimeout, this))
131                 .thenRun(() -> {
132                     logger.debug("Homie device {} fully attached (start)", config.deviceid);
133                 });
134     }
135
136     @Override
137     protected void stop() {
138         logger.debug("About to stop Homie device {}", config.deviceid);
139         final ScheduledFuture<?> heartBeatTimer = this.heartBeatTimer;
140         if (heartBeatTimer != null) {
141             heartBeatTimer.cancel(false);
142             this.heartBeatTimer = null;
143         }
144         delayedProcessing.join();
145         device.stop();
146         super.stop();
147     }
148
149     @Override
150     public CompletableFuture<Void> unsubscribeAll() {
151         // already unsubscribed everything by calling stop()
152         return CompletableFuture.allOf();
153     }
154
155     @Override
156     public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
157         Property property = device.getProperty(channelUID);
158         return property != null ? property.getChannelState() : null;
159     }
160
161     @Override
162     public void readyStateChanged(ReadyState state) {
163         switch (state) {
164             case alert:
165                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.CONFIGURATION_ERROR);
166                 break;
167             case disconnected:
168                 updateStatus(ThingStatus.OFFLINE);
169                 break;
170             case init:
171                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.CONFIGURATION_PENDING);
172                 break;
173             case lost:
174                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.GONE, "Device did not send heartbeat in time");
175                 break;
176             case ready:
177                 updateStatus(ThingStatus.ONLINE);
178                 break;
179             case sleeping:
180                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.DUTY_CYCLE);
181                 break;
182             case unknown:
183                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.GONE, "Device did not publish a ready state");
184                 break;
185             default:
186                 break;
187         }
188     }
189
190     @Override
191     public void nodeRemoved(Node node) {
192         channelTypeProvider.removeChannelGroupType(node.channelGroupTypeUID);
193         delayedProcessing.accept(node);
194     }
195
196     @Override
197     public void propertyRemoved(Property property) {
198         channelTypeProvider.removeChannelType(property.channelTypeUID);
199         delayedProcessing.accept(property);
200     }
201
202     @Override
203     public void nodeAddedOrChanged(Node node) {
204         channelTypeProvider.setChannelGroupType(node.channelGroupTypeUID, node.type());
205         delayedProcessing.accept(node);
206     }
207
208     @Override
209     public void propertyAddedOrChanged(Property property) {
210         channelTypeProvider.setChannelType(property.channelTypeUID, property.getType());
211         delayedProcessing.accept(property);
212     }
213
214     /**
215      * Callback of {@link DelayedBatchProcessing}.
216      * Add all newly discovered nodes and properties to the Thing and start subscribe to each channel state topic.
217      */
218     @Override
219     public void accept(@Nullable List<Object> t) {
220         if (!device.isInitialized()) {
221             return;
222         }
223         List<Channel> channels = device.nodes().stream().flatMap(n -> n.properties.stream()).map(Property::getChannel)
224                 .collect(Collectors.toList());
225         updateThing(editThing().withChannels(channels).build());
226         updateProperty(MqttBindingConstants.HOMIE_PROPERTY_VERSION, device.attributes.homie);
227         final MqttBrokerConnection connection = this.connection;
228         if (connection != null) {
229             device.startChannels(connection, scheduler, attributeReceiveTimeout, this).thenRun(() -> {
230                 logger.debug("Homie device {} fully attached (accept)", config.deviceid);
231             });
232         }
233     }
234
235     /**
236      * Removes all retained topics related to the device
237      */
238     private void removeRetainedTopics() {
239         MqttBrokerConnection connection = this.connection;
240         if (connection == null) {
241             logger.warn("couldn't remove retained topics for {} because connection is null", thing.getUID());
242             return;
243         }
244         device.getRetainedTopics().stream().map(d -> String.format("%s/%s", config.basetopic, d))
245                 .collect(Collectors.toList()).forEach(t -> connection.publish(t, new byte[0], 1, true));
246     }
247
248     @Override
249     protected void updateThingStatus(boolean messageReceived, Optional<Boolean> availabilityTopicsSeen) {
250         // not used here
251     }
252 }