]> git.basschouten.com Git - openhab-addons.git/blob
be4a35cd89e2a005c1b46f0109e6472f6b4a33f7
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.homie.internal.homie300;
14
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.Objects;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.stream.Collectors;
21 import java.util.stream.Stream;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.mqtt.generic.ChannelConfig;
26 import org.openhab.binding.mqtt.generic.mapping.AbstractMqttAttributeClass;
27 import org.openhab.binding.mqtt.generic.tools.ChildMap;
28 import org.openhab.binding.mqtt.homie.internal.handler.HomieThingHandler;
29 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
30 import org.openhab.core.thing.Channel;
31 import org.openhab.core.thing.ChannelUID;
32 import org.openhab.core.thing.ThingUID;
33 import org.openhab.core.util.UIDUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Homie 3.x Device. This is also the base class to subscribe to and parse a homie MQTT topic tree.
39  * First use {@link #subscribe(AbstractMqttAttributeClass)} to subscribe to the device/nodes/properties tree.
40  * If everything has been received and parsed, call {@link #startChannels(MqttBrokerConnection, HomieThingHandler)}
41  * to also subscribe to the property values. Usage:
42  *
43  * <pre>
44  * Device device(thingUID, callback);
45  * device.subscribe(topicMapper,timeout).thenRun(()-> {
46  *   System.out.println("All attributes received. Device tree ready");
47  *   device.startChannels(connection, handler);
48  * });
49  * </pre>
50  *
51  * @author David Graeff - Initial contribution
52  */
53 @NonNullByDefault
54 public class Device implements AbstractMqttAttributeClass.AttributeChanged {
55     private final Logger logger = LoggerFactory.getLogger(Device.class);
56     // The device attributes, statistics and nodes of this device
57     public final DeviceAttributes attributes;
58     public final ChildMap<Node> nodes;
59
60     // The corresponding ThingUID and callback of this device object
61     public final ThingUID thingUID;
62     private final DeviceCallback callback;
63
64     // Unique identifier and topic
65     private String topic = "";
66     public String deviceID = "";
67     private boolean initialized = false;
68
69     /**
70      * Creates a Homie Device structure. It consists of device attributes, device statistics and nodes.
71      *
72      * @param thingUID The thing UID
73      * @param callback A callback, used to notify about new/removed nodes/properties and more.
74      * @param attributes The device attributes object
75      */
76     public Device(ThingUID thingUID, DeviceCallback callback, DeviceAttributes attributes) {
77         this.thingUID = thingUID;
78         this.callback = callback;
79         this.attributes = attributes;
80         this.nodes = new ChildMap<>();
81     }
82
83     /**
84      * Creates a Homie Device structure. It consists of device attributes, device statistics and nodes.
85      *
86      * @param thingUID The thing UID
87      * @param callback A callback, used to notify about new/removed nodes/properties and more.
88      * @param attributes The device attributes object
89      * @param nodes The nodes map
90      */
91     public Device(ThingUID thingUID, DeviceCallback callback, DeviceAttributes attributes, ChildMap<Node> nodes) {
92         this.thingUID = thingUID;
93         this.callback = callback;
94         this.attributes = attributes;
95         this.nodes = nodes;
96     }
97
98     /**
99      * Subscribe to all device attributes and device statistics. Parse the nodes
100      * and subscribe to all node attributes. Parse node properties. This will not subscribe
101      * to properties though. If subscribing to all necessary topics worked {@link #isInitialized()} will return true.
102      *
103      * Call {@link #startChannels(MqttBrokerConnection)} subsequently.
104      *
105      * @param connection A broker connection
106      * @param scheduler A scheduler to realize the timeout
107      * @param timeout A timeout in milliseconds
108      * @return A future that is complete as soon as all attributes, nodes and properties have been requested and have
109      *         been subscribed to.
110      */
111     public CompletableFuture<@Nullable Void> subscribe(MqttBrokerConnection connection,
112             ScheduledExecutorService scheduler, int timeout) {
113         if (topic.isEmpty()) {
114             throw new IllegalStateException("You must call initialize()!");
115         }
116
117         return attributes.subscribeAndReceive(connection, scheduler, topic, this, timeout)
118                 // On success, create all nodes and tell the handler about the ready state
119                 .thenCompose(b -> attributesReceived(connection, scheduler, timeout))
120                 // No matter if values have been received or not -> the subscriptions have been performed
121                 .whenComplete((r, e) -> {
122                     initialized = true;
123                 });
124     }
125
126     public CompletableFuture<@Nullable Void> attributesReceived(MqttBrokerConnection connection,
127             ScheduledExecutorService scheduler, int timeout) {
128         callback.readyStateChanged(attributes.state);
129         return applyNodes(connection, scheduler, timeout);
130     }
131
132     /**
133      * Subscribe to all property state topics. The handler will receive an update call for each
134      * received value. Therefore the thing channels should have been created before.
135      *
136      * @param connection A broker connection
137      * @param scheduler A scheduler to realize the timeout
138      * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
139      * @param handler The Homie handler, that receives property (channel) updates.
140      * @return A future that is complete as soon as all properties have subscribed to their state topics.
141      */
142     public CompletableFuture<@Nullable Void> startChannels(MqttBrokerConnection connection,
143             ScheduledExecutorService scheduler, int timeout, HomieThingHandler handler) {
144         if (!isInitialized() || deviceID.isEmpty()) {
145             CompletableFuture<@Nullable Void> c = new CompletableFuture<>();
146             c.completeExceptionally(new Exception("Homie Device Tree not inialized yet."));
147             return c;
148         }
149
150         return CompletableFuture.allOf(nodes.stream().flatMap(node -> node.properties.stream())
151                 .map(p -> p.startChannel(connection, scheduler, timeout)).toArray(CompletableFuture[]::new));
152     }
153
154     /**
155      * Get a homie property (which translates to a channel).
156      *
157      * @param channelUID The group ID corresponds to the Homie Node, the channel ID (without group ID) corresponds to
158      *            the Nodes Property.
159      * @return A Homie Property, addressed by the given ChannelUID
160      */
161     @SuppressWarnings({ "null", "unused" })
162     public @Nullable Property getProperty(ChannelUID channelUID) {
163         final String groupId = channelUID.getGroupId();
164         if (groupId == null) {
165             return null;
166         }
167         Node node = nodes.get(UIDUtils.decode(groupId));
168         if (node == null) {
169             return null;
170         }
171         return node.properties.get(UIDUtils.decode(channelUID.getIdWithoutGroup()));
172     }
173
174     /**
175      * Unsubscribe from everything.
176      */
177     public CompletableFuture<@Nullable Void> stop() {
178         return attributes.unsubscribe().thenCompose(
179                 b -> CompletableFuture.allOf(nodes.stream().map(Node::stop).toArray(CompletableFuture[]::new)));
180     }
181
182     /**
183      * Return all homie nodes on this device
184      */
185     public ChildMap<Node> nodes() {
186         return nodes;
187     }
188
189     /**
190      * @return Return true if this device is initialized
191      */
192     public boolean isInitialized() {
193         return initialized;
194     }
195
196     /**
197      * Restore Nodes and Properties from Thing channels after handler initalization.
198      *
199      * @param channels
200      */
201     @SuppressWarnings({ "null", "unused" })
202     public void initialize(String baseTopic, String deviceID, List<Channel> channels) {
203         this.topic = baseTopic + "/" + deviceID;
204         this.deviceID = deviceID;
205         nodes.clear();
206         for (Channel channel : channels) {
207             final ChannelConfig channelConfig = channel.getConfiguration().as(ChannelConfig.class);
208             if (!channelConfig.commandTopic.isEmpty() && !channelConfig.retained) {
209                 logger.warn("Channel {} in device {} is missing the 'retained' flag. Check your configuration.",
210                         channel.getUID(), deviceID);
211             }
212             final String channelGroupId = channel.getUID().getGroupId();
213             if (channelGroupId == null) {
214                 continue;
215             }
216             final String nodeID = UIDUtils.decode(channelGroupId);
217             final String propertyID = UIDUtils.decode(channel.getUID().getIdWithoutGroup());
218             Node node = nodes.get(nodeID);
219             if (node == null) {
220                 node = createNode(nodeID);
221                 node.nodeRestoredFromConfig();
222                 nodes.put(nodeID, node);
223             }
224             // Restores the properties attribute object via the channels configuration.
225             Property property = node.createProperty(propertyID,
226                     channel.getConfiguration().as(PropertyAttributes.class));
227             property.attributesReceived();
228
229             node.properties.put(propertyID, property);
230         }
231     }
232
233     /**
234      * Creates a new Homie Node, a child of this Homie Device.
235      *
236      * <p>
237      * Implementation detail: Cannot be used for mocking or spying within tests.
238      * </p>
239      *
240      * @param nodeID The node ID
241      * @return A child node
242      */
243     public Node createNode(String nodeID) {
244         return new Node(topic, nodeID, thingUID, callback, new NodeAttributes());
245     }
246
247     /**
248      * Creates a new Homie Node, a child of this Homie Device.
249      *
250      * @param nodeID The node ID
251      * @param attributes The node attributes object
252      * @return A child node
253      */
254     public Node createNode(String nodeID, NodeAttributes attributes) {
255         return new Node(topic, nodeID, thingUID, callback, attributes);
256     }
257
258     /**
259      * <p>
260      * The nodes of a device are determined by the device attribute "$nodes". If that attribute changes,
261      * {@link #attributeChanged(CompletableFuture, String, Object, MqttBrokerConnection, ScheduledExecutorService)} is
262      * called. The {@link #nodes} map will be synchronized and this method will be called for every removed node.
263      * </p>
264      *
265      * <p>
266      * This method will stop the node and will notify about the removed node all removed properties.
267      * </p>
268      *
269      * @param node The removed node.
270      */
271     protected void notifyNodeRemoved(Node node) {
272         node.stop();
273         node.properties.stream().forEach(property -> node.notifyPropertyRemoved(property));
274         callback.nodeRemoved(node);
275     }
276
277     CompletableFuture<@Nullable Void> applyNodes(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
278             int timeout) {
279         return nodes.apply(Objects.requireNonNull(attributes.nodes),
280                 node -> node.subscribe(connection, scheduler, timeout), this::createNode, this::notifyNodeRemoved)
281                 .exceptionally(e -> {
282                     logger.warn("Could not subscribe", e);
283                     return null;
284                 });
285     }
286
287     @Override
288     public void attributeChanged(String name, Object value, MqttBrokerConnection connection,
289             ScheduledExecutorService scheduler, boolean allMandatoryFieldsReceived) {
290         if (!initialized || !allMandatoryFieldsReceived) {
291             return;
292         }
293         // Special case: Not all fields were known before
294         if (!attributes.isComplete()) {
295             attributesReceived(connection, scheduler, 500);
296         } else {
297             switch (name) {
298                 case "state": {
299                     callback.readyStateChanged(attributes.state);
300                     return;
301                 }
302                 case "nodes": {
303                     applyNodes(connection, scheduler, 500);
304                     return;
305                 }
306             }
307         }
308     }
309
310     /**
311      * Creates a list of retained topics related to the device
312      *
313      * @return Returns a list of relative topics
314      */
315     public List<String> getRetainedTopics() {
316         List<String> topics = new ArrayList<>(Stream.of(this.attributes.getClass().getDeclaredFields())
317                 .map(f -> String.format("%s/$%s", this.deviceID, f.getName())).collect(Collectors.toList()));
318
319         this.nodes.stream().map(n -> n.getRetainedTopics().stream().map(a -> String.format("%s/%s", this.deviceID, a))
320                 .collect(Collectors.toList())).collect(Collectors.toList()).forEach(topics::addAll);
321
322         return topics;
323     }
324 }