]> git.basschouten.com Git - openhab-addons.git/blob
a7038a1ecdeb75bdf5a903843b7acb05e2f379f5
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2021 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.homie.internal.homie300;
14
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.concurrent.CompletableFuture;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.stream.Collectors;
20 import java.util.stream.Stream;
21
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.openhab.binding.mqtt.generic.ChannelConfig;
25 import org.openhab.binding.mqtt.generic.mapping.AbstractMqttAttributeClass;
26 import org.openhab.binding.mqtt.generic.tools.ChildMap;
27 import org.openhab.binding.mqtt.homie.internal.handler.HomieThingHandler;
28 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
29 import org.openhab.core.thing.Channel;
30 import org.openhab.core.thing.ChannelUID;
31 import org.openhab.core.thing.ThingUID;
32 import org.openhab.core.util.UIDUtils;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Homie 3.x Device. This is also the base class to subscribe to and parse a homie MQTT topic tree.
38  * First use {@link #subscribe(AbstractMqttAttributeClass)} to subscribe to the device/nodes/properties tree.
39  * If everything has been received and parsed, call {@link #startChannels(MqttBrokerConnection, HomieThingHandler)}
40  * to also subscribe to the property values. Usage:
41  *
42  * <pre>
43  * Device device(thingUID, callback);
44  * device.subscribe(topicMapper,timeout).thenRun(()-> {
45  *   System.out.println("All attributes received. Device tree ready");
46  *   device.startChannels(connection, handler);
47  * });
48  * </pre>
49  *
50  * @author David Graeff - Initial contribution
51  */
52 @NonNullByDefault
53 public class Device implements AbstractMqttAttributeClass.AttributeChanged {
54     private final Logger logger = LoggerFactory.getLogger(Device.class);
55     // The device attributes, statistics and nodes of this device
56     public final DeviceAttributes attributes;
57     public final ChildMap<Node> nodes;
58
59     // The corresponding ThingUID and callback of this device object
60     public final ThingUID thingUID;
61     private final DeviceCallback callback;
62
63     // Unique identifier and topic
64     private String topic = "";
65     public String deviceID = "";
66     private boolean initialized = false;
67
68     /**
69      * Creates a Homie Device structure. It consists of device attributes, device statistics and nodes.
70      *
71      * @param thingUID The thing UID
72      * @param callback A callback, used to notify about new/removed nodes/properties and more.
73      * @param attributes The device attributes object
74      */
75     public Device(ThingUID thingUID, DeviceCallback callback, DeviceAttributes attributes) {
76         this.thingUID = thingUID;
77         this.callback = callback;
78         this.attributes = attributes;
79         this.nodes = new ChildMap<>();
80     }
81
82     /**
83      * Creates a Homie Device structure. It consists of device attributes, device statistics and nodes.
84      *
85      * @param thingUID The thing UID
86      * @param callback A callback, used to notify about new/removed nodes/properties and more.
87      * @param attributes The device attributes object
88      * @param nodes The nodes map
89      */
90     public Device(ThingUID thingUID, DeviceCallback callback, DeviceAttributes attributes, ChildMap<Node> nodes) {
91         this.thingUID = thingUID;
92         this.callback = callback;
93         this.attributes = attributes;
94         this.nodes = nodes;
95     }
96
97     /**
98      * Subscribe to all device attributes and device statistics. Parse the nodes
99      * and subscribe to all node attributes. Parse node properties. This will not subscribe
100      * to properties though. If subscribing to all necessary topics worked {@link #isInitialized()} will return true.
101      *
102      * Call {@link #startChannels(MqttBrokerConnection)} subsequently.
103      *
104      * @param connection A broker connection
105      * @param scheduler A scheduler to realize the timeout
106      * @param timeout A timeout in milliseconds
107      * @return A future that is complete as soon as all attributes, nodes and properties have been requested and have
108      *         been subscribed to.
109      */
110     public CompletableFuture<@Nullable Void> subscribe(MqttBrokerConnection connection,
111             ScheduledExecutorService scheduler, int timeout) {
112         if (topic.isEmpty()) {
113             throw new IllegalStateException("You must call initialize()!");
114         }
115
116         return attributes.subscribeAndReceive(connection, scheduler, topic, this, timeout)
117                 // On success, create all nodes and tell the handler about the ready state
118                 .thenCompose(b -> attributesReceived(connection, scheduler, timeout))
119                 // No matter if values have been received or not -> the subscriptions have been performed
120                 .whenComplete((r, e) -> {
121                     initialized = true;
122                 });
123     }
124
125     public CompletableFuture<@Nullable Void> attributesReceived(MqttBrokerConnection connection,
126             ScheduledExecutorService scheduler, int timeout) {
127         callback.readyStateChanged(attributes.state);
128         return applyNodes(connection, scheduler, timeout);
129     }
130
131     /**
132      * Subscribe to all property state topics. The handler will receive an update call for each
133      * received value. Therefore the thing channels should have been created before.
134      *
135      * @param connection A broker connection
136      * @param scheduler A scheduler to realize the timeout
137      * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
138      * @param handler The Homie handler, that receives property (channel) updates.
139      * @return A future that is complete as soon as all properties have subscribed to their state topics.
140      */
141     public CompletableFuture<@Nullable Void> startChannels(MqttBrokerConnection connection,
142             ScheduledExecutorService scheduler, int timeout, HomieThingHandler handler) {
143         if (!isInitialized() || deviceID.isEmpty()) {
144             CompletableFuture<@Nullable Void> c = new CompletableFuture<>();
145             c.completeExceptionally(new Exception("Homie Device Tree not inialized yet."));
146             return c;
147         }
148
149         return CompletableFuture.allOf(nodes.stream().flatMap(node -> node.properties.stream())
150                 .map(p -> p.startChannel(connection, scheduler, timeout)).toArray(CompletableFuture[]::new));
151     }
152
153     /**
154      * Get a homie property (which translates to a channel).
155      *
156      * @param channelUID The group ID corresponds to the Homie Node, the channel ID (without group ID) corresponds to
157      *            the Nodes Property.
158      * @return A Homie Property, addressed by the given ChannelUID
159      */
160     @SuppressWarnings({ "null", "unused" })
161     public @Nullable Property getProperty(ChannelUID channelUID) {
162         final String groupId = channelUID.getGroupId();
163         if (groupId == null) {
164             return null;
165         }
166         Node node = nodes.get(UIDUtils.decode(groupId));
167         if (node == null) {
168             return null;
169         }
170         return node.properties.get(UIDUtils.decode(channelUID.getIdWithoutGroup()));
171     }
172
173     /**
174      * Unsubscribe from everything.
175      */
176     public CompletableFuture<@Nullable Void> stop() {
177         return attributes.unsubscribe().thenCompose(
178                 b -> CompletableFuture.allOf(nodes.stream().map(Node::stop).toArray(CompletableFuture[]::new)));
179     }
180
181     /**
182      * Return all homie nodes on this device
183      */
184     public ChildMap<Node> nodes() {
185         return nodes;
186     }
187
188     /**
189      * @return Return true if this device is initialized
190      */
191     public boolean isInitialized() {
192         return initialized;
193     }
194
195     /**
196      * Restore Nodes and Properties from Thing channels after handler initalization.
197      *
198      * @param channels
199      */
200     @SuppressWarnings({ "null", "unused" })
201     public void initialize(String baseTopic, String deviceID, List<Channel> channels) {
202         this.topic = baseTopic + "/" + deviceID;
203         this.deviceID = deviceID;
204         nodes.clear();
205         for (Channel channel : channels) {
206             final ChannelConfig channelConfig = channel.getConfiguration().as(ChannelConfig.class);
207             if (!channelConfig.commandTopic.isEmpty() && !channelConfig.retained) {
208                 logger.warn("Channel {} in device {} is missing the 'retained' flag. Check your configuration.",
209                         channel.getUID(), deviceID);
210             }
211             final String channelGroupId = channel.getUID().getGroupId();
212             if (channelGroupId == null) {
213                 continue;
214             }
215             final String nodeID = UIDUtils.decode(channelGroupId);
216             final String propertyID = UIDUtils.decode(channel.getUID().getIdWithoutGroup());
217             Node node = nodes.get(nodeID);
218             if (node == null) {
219                 node = createNode(nodeID);
220                 node.nodeRestoredFromConfig();
221                 nodes.put(nodeID, node);
222             }
223             // Restores the properties attribute object via the channels configuration.
224             Property property = node.createProperty(propertyID,
225                     channel.getConfiguration().as(PropertyAttributes.class));
226             property.attributesReceived();
227
228             node.properties.put(propertyID, property);
229         }
230     }
231
232     /**
233      * Creates a new Homie Node, a child of this Homie Device.
234      *
235      * <p>
236      * Implementation detail: Cannot be used for mocking or spying within tests.
237      * </p>
238      *
239      * @param nodeID The node ID
240      * @return A child node
241      */
242     public Node createNode(String nodeID) {
243         return new Node(topic, nodeID, thingUID, callback, new NodeAttributes());
244     }
245
246     /**
247      * Creates a new Homie Node, a child of this Homie Device.
248      *
249      * @param nodeID The node ID
250      * @param attributes The node attributes object
251      * @return A child node
252      */
253     public Node createNode(String nodeID, NodeAttributes attributes) {
254         return new Node(topic, nodeID, thingUID, callback, attributes);
255     }
256
257     /**
258      * <p>
259      * The nodes of a device are determined by the device attribute "$nodes". If that attribute changes,
260      * {@link #attributeChanged(CompletableFuture, String, Object, MqttBrokerConnection, ScheduledExecutorService)} is
261      * called. The {@link #nodes} map will be synchronized and this method will be called for every removed node.
262      * </p>
263      *
264      * <p>
265      * This method will stop the node and will notify about the removed node all removed properties.
266      * </p>
267      *
268      * @param node The removed node.
269      */
270     protected void notifyNodeRemoved(Node node) {
271         node.stop();
272         node.properties.stream().forEach(property -> node.notifyPropertyRemoved(property));
273         callback.nodeRemoved(node);
274     }
275
276     CompletableFuture<@Nullable Void> applyNodes(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
277             int timeout) {
278         return nodes.apply(attributes.nodes, node -> node.subscribe(connection, scheduler, timeout), this::createNode,
279                 this::notifyNodeRemoved).exceptionally(e -> {
280                     logger.warn("Could not subscribe", e);
281                     return null;
282                 });
283     }
284
285     @Override
286     public void attributeChanged(String name, Object value, MqttBrokerConnection connection,
287             ScheduledExecutorService scheduler, boolean allMandatoryFieldsReceived) {
288         if (!initialized || !allMandatoryFieldsReceived) {
289             return;
290         }
291         // Special case: Not all fields were known before
292         if (!attributes.isComplete()) {
293             attributesReceived(connection, scheduler, 500);
294         } else {
295             switch (name) {
296                 case "state": {
297                     callback.readyStateChanged(attributes.state);
298                     return;
299                 }
300                 case "nodes": {
301                     applyNodes(connection, scheduler, 500);
302                     return;
303                 }
304             }
305         }
306     }
307
308     /**
309      * Creates a list of retained topics related to the device
310      *
311      * @return Returns a list of relative topics
312      */
313     public List<String> getRetainedTopics() {
314         List<String> topics = new ArrayList<>();
315
316         topics.addAll(Stream.of(this.attributes.getClass().getDeclaredFields()).map(f -> {
317             return String.format("%s/$%s", this.deviceID, f.getName());
318         }).collect(Collectors.toList()));
319
320         this.nodes.stream().map(n -> n.getRetainedTopics().stream().map(a -> {
321             return String.format("%s/%s", this.deviceID, a);
322         }).collect(Collectors.toList())).collect(Collectors.toList()).forEach(topics::addAll);
323
324         return topics;
325     }
326 }