]> git.basschouten.com Git - openhab-addons.git/blob
ffea8f4f4dc2978e9b6eaab78300d33288d63ce2
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.homie.internal.homie300;
14
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.Objects;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.stream.Collectors;
21 import java.util.stream.Stream;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.mqtt.generic.ChannelConfig;
26 import org.openhab.binding.mqtt.generic.mapping.AbstractMqttAttributeClass;
27 import org.openhab.binding.mqtt.generic.tools.ChildMap;
28 import org.openhab.binding.mqtt.homie.internal.handler.HomieThingHandler;
29 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
30 import org.openhab.core.thing.Channel;
31 import org.openhab.core.thing.ChannelUID;
32 import org.openhab.core.thing.ThingUID;
33 import org.openhab.core.util.UIDUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Homie 3.x Device. This is also the base class to subscribe to and parse a homie MQTT topic tree.
39  * First use {@link #subscribe(MqttBrokerConnection, ScheduledExecutorService, int)}
40  * to subscribe to the device/nodes/properties tree.
41  * If everything has been received and parsed, call
42  * {@link #startChannels(MqttBrokerConnection, ScheduledExecutorService, int, HomieThingHandler)}
43  * to also subscribe to the property values. Usage:
44  *
45  * <pre>
46  * Device device(thingUID, callback);
47  * device.subscribe(topicMapper,timeout).thenRun(()-> {
48  *   System.out.println("All attributes received. Device tree ready");
49  *   device.startChannels(connection, handler);
50  * });
51  * </pre>
52  *
53  * @author David Graeff - Initial contribution
54  */
55 @NonNullByDefault
56 public class Device implements AbstractMqttAttributeClass.AttributeChanged {
57     private final Logger logger = LoggerFactory.getLogger(Device.class);
58     // The device attributes, statistics and nodes of this device
59     public final DeviceAttributes attributes;
60     public final ChildMap<Node> nodes;
61
62     // The corresponding ThingUID and callback of this device object
63     public final ThingUID thingUID;
64     private final DeviceCallback callback;
65
66     // Unique identifier and topic
67     private String topic = "";
68     public String deviceID = "";
69     private boolean initialized = false;
70
71     /**
72      * Creates a Homie Device structure. It consists of device attributes, device statistics and nodes.
73      *
74      * @param thingUID The thing UID
75      * @param callback A callback, used to notify about new/removed nodes/properties and more.
76      * @param attributes The device attributes object
77      */
78     public Device(ThingUID thingUID, DeviceCallback callback, DeviceAttributes attributes) {
79         this.thingUID = thingUID;
80         this.callback = callback;
81         this.attributes = attributes;
82         this.nodes = new ChildMap<>();
83     }
84
85     /**
86      * Creates a Homie Device structure. It consists of device attributes, device statistics and nodes.
87      *
88      * @param thingUID The thing UID
89      * @param callback A callback, used to notify about new/removed nodes/properties and more.
90      * @param attributes The device attributes object
91      * @param nodes The nodes map
92      */
93     public Device(ThingUID thingUID, DeviceCallback callback, DeviceAttributes attributes, ChildMap<Node> nodes) {
94         this.thingUID = thingUID;
95         this.callback = callback;
96         this.attributes = attributes;
97         this.nodes = nodes;
98     }
99
100     /**
101      * Subscribe to all device attributes and device statistics. Parse the nodes
102      * and subscribe to all node attributes. Parse node properties. This will not subscribe
103      * to properties though. If subscribing to all necessary topics worked {@link #isInitialized()} will return true.
104      *
105      * Call {@link #startChannels(MqttBrokerConnection, ScheduledExecutorService, int, HomieThingHandler)} subsequently.
106      *
107      * @param connection A broker connection
108      * @param scheduler A scheduler to realize the timeout
109      * @param timeout A timeout in milliseconds
110      * @return A future that is complete as soon as all attributes, nodes and properties have been requested and have
111      *         been subscribed to.
112      */
113     public CompletableFuture<@Nullable Void> subscribe(MqttBrokerConnection connection,
114             ScheduledExecutorService scheduler, int timeout) {
115         if (topic.isEmpty()) {
116             throw new IllegalStateException("You must call initialize()!");
117         }
118
119         return attributes.subscribeAndReceive(connection, scheduler, topic, this, timeout)
120                 // On success, create all nodes and tell the handler about the ready state
121                 .thenCompose(b -> attributesReceived(connection, scheduler, timeout))
122                 // No matter if values have been received or not -> the subscriptions have been performed
123                 .whenComplete((r, e) -> {
124                     initialized = true;
125                 });
126     }
127
128     public CompletableFuture<@Nullable Void> attributesReceived(MqttBrokerConnection connection,
129             ScheduledExecutorService scheduler, int timeout) {
130         callback.readyStateChanged(attributes.state);
131         return applyNodes(connection, scheduler, timeout);
132     }
133
134     /**
135      * Subscribe to all property state topics. The handler will receive an update call for each
136      * received value. Therefore the thing channels should have been created before.
137      *
138      * @param connection A broker connection
139      * @param scheduler A scheduler to realize the timeout
140      * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
141      * @param handler The Homie handler, that receives property (channel) updates.
142      * @return A future that is complete as soon as all properties have subscribed to their state topics.
143      */
144     public CompletableFuture<@Nullable Void> startChannels(MqttBrokerConnection connection,
145             ScheduledExecutorService scheduler, int timeout, HomieThingHandler handler) {
146         if (!isInitialized() || deviceID.isEmpty()) {
147             CompletableFuture<@Nullable Void> c = new CompletableFuture<>();
148             c.completeExceptionally(new Exception("Homie Device Tree not inialized yet."));
149             return c;
150         }
151
152         return CompletableFuture.allOf(nodes.stream().flatMap(node -> node.properties.stream())
153                 .map(p -> p.startChannel(connection, scheduler, timeout)).toArray(CompletableFuture[]::new));
154     }
155
156     /**
157      * Get a homie property (which translates to a channel).
158      *
159      * @param channelUID The group ID corresponds to the Homie Node, the channel ID (without group ID) corresponds to
160      *            the Nodes Property.
161      * @return A Homie Property, addressed by the given ChannelUID
162      */
163     @SuppressWarnings({ "null", "unused" })
164     public @Nullable Property getProperty(ChannelUID channelUID) {
165         final String groupId = channelUID.getGroupId();
166         if (groupId == null) {
167             return null;
168         }
169         Node node = nodes.get(UIDUtils.decode(groupId));
170         if (node == null) {
171             return null;
172         }
173         return node.properties.get(UIDUtils.decode(channelUID.getIdWithoutGroup()));
174     }
175
176     /**
177      * Unsubscribe from everything.
178      */
179     public CompletableFuture<@Nullable Void> stop() {
180         return attributes.unsubscribe().thenCompose(
181                 b -> CompletableFuture.allOf(nodes.stream().map(Node::stop).toArray(CompletableFuture[]::new)));
182     }
183
184     /**
185      * Return all homie nodes on this device
186      */
187     public ChildMap<Node> nodes() {
188         return nodes;
189     }
190
191     /**
192      * @return Return true if this device is initialized
193      */
194     public boolean isInitialized() {
195         return initialized;
196     }
197
198     /**
199      * Restore Nodes and Properties from Thing channels after handler initalization.
200      *
201      * @param channels
202      */
203     @SuppressWarnings({ "null", "unused" })
204     public void initialize(String baseTopic, String deviceID, List<Channel> channels) {
205         this.topic = baseTopic + "/" + deviceID;
206         this.deviceID = deviceID;
207         nodes.clear();
208         for (Channel channel : channels) {
209             final ChannelConfig channelConfig = channel.getConfiguration().as(ChannelConfig.class);
210             if (!channelConfig.commandTopic.isEmpty() && !channelConfig.retained) {
211                 logger.warn("Channel {} in device {} is missing the 'retained' flag. Check your configuration.",
212                         channel.getUID(), deviceID);
213             }
214             final String channelGroupId = channel.getUID().getGroupId();
215             if (channelGroupId == null) {
216                 continue;
217             }
218             final String nodeID = UIDUtils.decode(channelGroupId);
219             final String propertyID = UIDUtils.decode(channel.getUID().getIdWithoutGroup());
220             Node node = nodes.get(nodeID);
221             if (node == null) {
222                 node = createNode(nodeID);
223                 node.nodeRestoredFromConfig();
224                 nodes.put(nodeID, node);
225             }
226             // Restores the properties attribute object via the channels configuration.
227             Property property = node.createProperty(propertyID,
228                     channel.getConfiguration().as(PropertyAttributes.class));
229             property.attributesReceived();
230
231             node.properties.put(propertyID, property);
232         }
233     }
234
235     /**
236      * Creates a new Homie Node, a child of this Homie Device.
237      *
238      * <p>
239      * Implementation detail: Cannot be used for mocking or spying within tests.
240      * </p>
241      *
242      * @param nodeID The node ID
243      * @return A child node
244      */
245     public Node createNode(String nodeID) {
246         return new Node(topic, nodeID, thingUID, callback, new NodeAttributes());
247     }
248
249     /**
250      * Creates a new Homie Node, a child of this Homie Device.
251      *
252      * @param nodeID The node ID
253      * @param attributes The node attributes object
254      * @return A child node
255      */
256     public Node createNode(String nodeID, NodeAttributes attributes) {
257         return new Node(topic, nodeID, thingUID, callback, attributes);
258     }
259
260     /**
261      * <p>
262      * The nodes of a device are determined by the device attribute "$nodes". If that attribute changes,
263      * {@link #attributeChanged(String, Object, MqttBrokerConnection, ScheduledExecutorService, boolean)} is
264      * called. The {@link #nodes} map will be synchronized and this method will be called for every removed node.
265      * </p>
266      *
267      * <p>
268      * This method will stop the node and will notify about the removed node all removed properties.
269      * </p>
270      *
271      * @param node The removed node.
272      */
273     protected void notifyNodeRemoved(Node node) {
274         node.stop();
275         node.properties.stream().forEach(property -> node.notifyPropertyRemoved(property));
276         callback.nodeRemoved(node);
277     }
278
279     CompletableFuture<@Nullable Void> applyNodes(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
280             int timeout) {
281         return nodes.apply(Objects.requireNonNull(attributes.nodes),
282                 node -> node.subscribe(connection, scheduler, timeout), this::createNode, this::notifyNodeRemoved)
283                 .exceptionally(e -> {
284                     logger.warn("Could not subscribe", e);
285                     return null;
286                 });
287     }
288
289     @Override
290     public void attributeChanged(String name, Object value, MqttBrokerConnection connection,
291             ScheduledExecutorService scheduler, boolean allMandatoryFieldsReceived) {
292         if (!initialized || !allMandatoryFieldsReceived) {
293             return;
294         }
295         // Special case: Not all fields were known before
296         if (!attributes.isComplete()) {
297             attributesReceived(connection, scheduler, 500);
298         } else {
299             switch (name) {
300                 case "state": {
301                     callback.readyStateChanged(attributes.state);
302                     return;
303                 }
304                 case "nodes": {
305                     applyNodes(connection, scheduler, 500);
306                     return;
307                 }
308             }
309         }
310     }
311
312     /**
313      * Creates a list of retained topics related to the device
314      *
315      * @return Returns a list of relative topics
316      */
317     public List<String> getRetainedTopics() {
318         List<String> topics = new ArrayList<>(Stream.of(this.attributes.getClass().getDeclaredFields())
319                 .map(f -> String.format("%s/$%s", this.deviceID, f.getName())).collect(Collectors.toList()));
320
321         this.nodes.stream().map(n -> n.getRetainedTopics().stream().map(a -> String.format("%s/%s", this.deviceID, a))
322                 .collect(Collectors.toList())).collect(Collectors.toList()).forEach(topics::addAll);
323
324         return topics;
325     }
326 }