]> git.basschouten.com Git - openhab-addons.git/blob
6d974bf58e80b01d9fbdb2000794651ee79c3967
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.homie.internal.handler;
14
15 import java.util.List;
16 import java.util.concurrent.CompletableFuture;
17 import java.util.concurrent.ScheduledFuture;
18 import java.util.function.Consumer;
19 import java.util.stream.Collectors;
20
21 import org.eclipse.jdt.annotation.NonNullByDefault;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
24 import org.openhab.binding.mqtt.generic.ChannelState;
25 import org.openhab.binding.mqtt.generic.MqttChannelTypeProvider;
26 import org.openhab.binding.mqtt.generic.tools.DelayedBatchProcessing;
27 import org.openhab.binding.mqtt.homie.generic.internal.MqttBindingConstants;
28 import org.openhab.binding.mqtt.homie.internal.homie300.Device;
29 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes;
30 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceAttributes.ReadyState;
31 import org.openhab.binding.mqtt.homie.internal.homie300.DeviceCallback;
32 import org.openhab.binding.mqtt.homie.internal.homie300.HandlerConfiguration;
33 import org.openhab.binding.mqtt.homie.internal.homie300.Node;
34 import org.openhab.binding.mqtt.homie.internal.homie300.Property;
35 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
36 import org.openhab.core.thing.Channel;
37 import org.openhab.core.thing.ChannelUID;
38 import org.openhab.core.thing.Thing;
39 import org.openhab.core.thing.ThingStatus;
40 import org.openhab.core.thing.ThingStatusDetail;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Handles MQTT topics that follow the Homie MQTT convention. The convention specifies a MQTT topic layout
46  * and defines Devices, Nodes and Properties, corresponding to Things, Channel Groups and Channels respectively.
47  *
48  * @author David Graeff - Initial contribution
49  */
50 @NonNullByDefault
51 public class HomieThingHandler extends AbstractMQTTThingHandler implements DeviceCallback, Consumer<List<Object>> {
52     private final Logger logger = LoggerFactory.getLogger(HomieThingHandler.class);
53     protected Device device;
54     protected final MqttChannelTypeProvider channelTypeProvider;
55     /** The timeout per attribute field subscription */
56     protected final int attributeReceiveTimeout;
57     protected final int subscribeTimeout;
58     protected final int deviceTimeout;
59     protected HandlerConfiguration config = new HandlerConfiguration();
60     protected DelayedBatchProcessing<Object> delayedProcessing;
61     private @Nullable ScheduledFuture<?> heartBeatTimer;
62
63     /**
64      * Create a new thing handler for homie discovered things. A channel type provider and a topic value receive timeout
65      * must be provided.
66      *
67      * @param thing The thing of this handler
68      * @param channelTypeProvider A channel type provider
69      * @param deviceTimeout Timeout for the entire device subscription. In milliseconds.
70      * @param subscribeTimeout Timeout for an entire attribute class subscription and receive. In milliseconds.
71      *            Even a slow remote device will publish a full node or property within 100ms.
72      * @param attributeReceiveTimeout The timeout per attribute field subscription. In milliseconds.
73      *            One attribute subscription and receiving should not take longer than 50ms.
74      */
75     public HomieThingHandler(Thing thing, MqttChannelTypeProvider channelTypeProvider, int deviceTimeout,
76             int subscribeTimeout, int attributeReceiveTimeout) {
77         super(thing, deviceTimeout);
78         this.channelTypeProvider = channelTypeProvider;
79         this.deviceTimeout = deviceTimeout;
80         this.subscribeTimeout = subscribeTimeout;
81         this.attributeReceiveTimeout = attributeReceiveTimeout;
82         this.delayedProcessing = new DelayedBatchProcessing<>(subscribeTimeout, this, scheduler);
83         this.device = new Device(this.thing.getUID(), this, new DeviceAttributes());
84     }
85
86     /**
87      * Overwrite the {@link Device} and {@link DelayedBatchProcessing} object.
88      * Those are set in the constructor already, but require to be replaced for tests.
89      *
90      * @param device The device object
91      * @param delayedProcessing The delayed processing object
92      */
93     protected void setInternalObjects(Device device, DelayedBatchProcessing<Object> delayedProcessing) {
94         this.device = device;
95         this.delayedProcessing = delayedProcessing;
96     }
97
98     @Override
99     public void initialize() {
100         logger.debug("About to initialize Homie device {}", device.attributes.name);
101         config = getConfigAs(HandlerConfiguration.class);
102         if (config.deviceid.isEmpty()) {
103             updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Object ID unknown");
104             return;
105         }
106         device.initialize(config.basetopic, config.deviceid, thing.getChannels());
107         super.initialize();
108     }
109
110     @Override
111     public void handleRemoval() {
112         this.stop();
113         if (config.removetopics) {
114             this.removeRetainedTopics();
115         }
116         super.handleRemoval();
117     }
118
119     @Override
120     protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
121         logger.debug("About to start Homie device {}", device.attributes.name);
122         if (connection.getQos() != 1) {
123             // QoS 1 is required.
124             logger.warn(
125                     "Homie devices require QoS 1 but Qos 0/2 is configured. Using override. Please check the configuration");
126             connection.setQos(1);
127         }
128         return device.subscribe(connection, scheduler, attributeReceiveTimeout).thenCompose((Void v) -> {
129             return device.startChannels(connection, scheduler, attributeReceiveTimeout, this);
130         }).thenRun(() -> {
131             logger.debug("Homie device {} fully attached (start)", device.attributes.name);
132         });
133     }
134
135     @Override
136     protected void stop() {
137         logger.debug("About to stop Homie device {}", device.attributes.name);
138         final ScheduledFuture<?> heartBeatTimer = this.heartBeatTimer;
139         if (heartBeatTimer != null) {
140             heartBeatTimer.cancel(false);
141             this.heartBeatTimer = null;
142         }
143         delayedProcessing.join();
144         device.stop();
145         super.stop();
146     }
147
148     @Override
149     public CompletableFuture<Void> unsubscribeAll() {
150         // already unsubscribed everything by calling stop()
151         return CompletableFuture.allOf();
152     }
153
154     @Override
155     public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
156         Property property = device.getProperty(channelUID);
157         return property != null ? property.getChannelState() : null;
158     }
159
160     @Override
161     public void readyStateChanged(ReadyState state) {
162         switch (state) {
163             case alert:
164                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.CONFIGURATION_ERROR);
165                 break;
166             case disconnected:
167                 updateStatus(ThingStatus.OFFLINE);
168                 break;
169             case init:
170                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.CONFIGURATION_PENDING);
171                 break;
172             case lost:
173                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.GONE, "Device did not send heartbeat in time");
174                 break;
175             case ready:
176                 updateStatus(ThingStatus.ONLINE);
177                 break;
178             case sleeping:
179                 updateStatus(ThingStatus.ONLINE, ThingStatusDetail.DUTY_CYCLE);
180                 break;
181             case unknown:
182                 updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.GONE, "Device did not publish a ready state");
183                 break;
184             default:
185                 break;
186         }
187     }
188
189     @Override
190     public void nodeRemoved(Node node) {
191         channelTypeProvider.removeChannelGroupType(node.channelGroupTypeUID);
192         delayedProcessing.accept(node);
193     }
194
195     @Override
196     public void propertyRemoved(Property property) {
197         channelTypeProvider.removeChannelType(property.channelTypeUID);
198         delayedProcessing.accept(property);
199     }
200
201     @Override
202     public void nodeAddedOrChanged(Node node) {
203         channelTypeProvider.setChannelGroupType(node.channelGroupTypeUID, node.type());
204         delayedProcessing.accept(node);
205     }
206
207     @Override
208     public void propertyAddedOrChanged(Property property) {
209         channelTypeProvider.setChannelType(property.channelTypeUID, property.getType());
210         delayedProcessing.accept(property);
211     }
212
213     /**
214      * Callback of {@link DelayedBatchProcessing}.
215      * Add all newly discovered nodes and properties to the Thing and start subscribe to each channel state topic.
216      */
217     @Override
218     public void accept(@Nullable List<Object> t) {
219         if (!device.isInitialized()) {
220             return;
221         }
222         List<Channel> channels = device.nodes().stream().flatMap(n -> n.properties.stream()).map(Property::getChannel)
223                 .collect(Collectors.toList());
224         updateThing(editThing().withChannels(channels).build());
225         updateProperty(MqttBindingConstants.HOMIE_PROPERTY_VERSION, device.attributes.homie);
226         final MqttBrokerConnection connection = this.connection;
227         if (connection != null) {
228             device.startChannels(connection, scheduler, attributeReceiveTimeout, this).thenRun(() -> {
229                 logger.debug("Homie device {} fully attached (accept)", device.attributes.name);
230             });
231         }
232     }
233
234     /**
235      * Removes all retained topics related to the device
236      */
237     private void removeRetainedTopics() {
238         MqttBrokerConnection connection = this.connection;
239         if (connection == null) {
240             logger.warn("couldn't remove retained topics for {} because connection is null", thing.getUID());
241             return;
242         }
243         device.getRetainedTopics().stream().map(d -> {
244             return String.format("%s/%s", config.basetopic, d);
245         }).collect(Collectors.toList()).forEach(t -> connection.publish(t, new byte[0], 1, true));
246     }
247
248     @Override
249     protected void updateThingStatus(boolean messageReceived, boolean availabilityTopicsSeen) {
250         // not used here
251     }
252 }