2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.homie.internal.homie300;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.Objects;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.stream.Collectors;
21 import java.util.stream.Stream;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.mqtt.generic.ChannelConfig;
26 import org.openhab.binding.mqtt.generic.mapping.AbstractMqttAttributeClass;
27 import org.openhab.binding.mqtt.generic.tools.ChildMap;
28 import org.openhab.binding.mqtt.homie.internal.handler.HomieThingHandler;
29 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
30 import org.openhab.core.thing.Channel;
31 import org.openhab.core.thing.ChannelUID;
32 import org.openhab.core.thing.ThingUID;
33 import org.openhab.core.util.UIDUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * Homie 3.x Device. This is also the base class to subscribe to and parse a homie MQTT topic tree.
39 * First use {@link #subscribe(MqttBrokerConnection, ScheduledExecutorService, int)}
40 * to subscribe to the device/nodes/properties tree.
41 * If everything has been received and parsed, call
42 * {@link #startChannels(MqttBrokerConnection, ScheduledExecutorService, int, HomieThingHandler)}
43 * to also subscribe to the property values. Usage:
46 * Device device(thingUID, callback);
47 * device.subscribe(topicMapper,timeout).thenRun(()-> {
48 * System.out.println("All attributes received. Device tree ready");
49 * device.startChannels(connection, handler);
53 * @author David Graeff - Initial contribution
56 public class Device implements AbstractMqttAttributeClass.AttributeChanged {
57 private final Logger logger = LoggerFactory.getLogger(Device.class);
58 // The device attributes, statistics and nodes of this device
59 public final DeviceAttributes attributes;
60 public final ChildMap<Node> nodes;
62 // The corresponding ThingUID and callback of this device object
63 public final ThingUID thingUID;
64 private final DeviceCallback callback;
66 // Unique identifier and topic
67 private String topic = "";
68 public String deviceID = "";
69 private boolean initialized = false;
72 * Creates a Homie Device structure. It consists of device attributes, device statistics and nodes.
74 * @param thingUID The thing UID
75 * @param callback A callback, used to notify about new/removed nodes/properties and more.
76 * @param attributes The device attributes object
78 public Device(ThingUID thingUID, DeviceCallback callback, DeviceAttributes attributes) {
79 this.thingUID = thingUID;
80 this.callback = callback;
81 this.attributes = attributes;
82 this.nodes = new ChildMap<>();
86 * Creates a Homie Device structure. It consists of device attributes, device statistics and nodes.
88 * @param thingUID The thing UID
89 * @param callback A callback, used to notify about new/removed nodes/properties and more.
90 * @param attributes The device attributes object
91 * @param nodes The nodes map
93 public Device(ThingUID thingUID, DeviceCallback callback, DeviceAttributes attributes, ChildMap<Node> nodes) {
94 this.thingUID = thingUID;
95 this.callback = callback;
96 this.attributes = attributes;
101 * Subscribe to all device attributes and device statistics. Parse the nodes
102 * and subscribe to all node attributes. Parse node properties. This will not subscribe
103 * to properties though. If subscribing to all necessary topics worked {@link #isInitialized()} will return true.
105 * Call {@link #startChannels(MqttBrokerConnection, ScheduledExecutorService, int, HomieThingHandler)} subsequently.
107 * @param connection A broker connection
108 * @param scheduler A scheduler to realize the timeout
109 * @param timeout A timeout in milliseconds
110 * @return A future that is complete as soon as all attributes, nodes and properties have been requested and have
111 * been subscribed to.
113 public CompletableFuture<@Nullable Void> subscribe(MqttBrokerConnection connection,
114 ScheduledExecutorService scheduler, int timeout) {
115 if (topic.isEmpty()) {
116 throw new IllegalStateException("You must call initialize()!");
119 return attributes.subscribeAndReceive(connection, scheduler, topic, this, timeout)
120 // On success, create all nodes and tell the handler about the ready state
121 .thenCompose(b -> attributesReceived(connection, scheduler, timeout))
122 // No matter if values have been received or not -> the subscriptions have been performed
123 .whenComplete((r, e) -> {
128 public CompletableFuture<@Nullable Void> attributesReceived(MqttBrokerConnection connection,
129 ScheduledExecutorService scheduler, int timeout) {
130 callback.readyStateChanged(attributes.state);
131 return applyNodes(connection, scheduler, timeout);
135 * Subscribe to all property state topics. The handler will receive an update call for each
136 * received value. Therefore the thing channels should have been created before.
138 * @param connection A broker connection
139 * @param scheduler A scheduler to realize the timeout
140 * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
141 * @param handler The Homie handler, that receives property (channel) updates.
142 * @return A future that is complete as soon as all properties have subscribed to their state topics.
144 public CompletableFuture<@Nullable Void> startChannels(MqttBrokerConnection connection,
145 ScheduledExecutorService scheduler, int timeout, HomieThingHandler handler) {
146 if (!isInitialized() || deviceID.isEmpty()) {
147 CompletableFuture<@Nullable Void> c = new CompletableFuture<>();
148 c.completeExceptionally(new Exception("Homie Device Tree not inialized yet."));
152 return CompletableFuture.allOf(nodes.stream().flatMap(node -> node.properties.stream())
153 .map(p -> p.startChannel(connection, scheduler, timeout)).toArray(CompletableFuture[]::new));
157 * Get a homie property (which translates to a channel).
159 * @param channelUID The group ID corresponds to the Homie Node, the channel ID (without group ID) corresponds to
160 * the Nodes Property.
161 * @return A Homie Property, addressed by the given ChannelUID
163 @SuppressWarnings({ "null", "unused" })
164 public @Nullable Property getProperty(ChannelUID channelUID) {
165 final String groupId = channelUID.getGroupId();
166 if (groupId == null) {
169 Node node = nodes.get(UIDUtils.decode(groupId));
173 return node.properties.get(UIDUtils.decode(channelUID.getIdWithoutGroup()));
177 * Unsubscribe from everything.
179 public CompletableFuture<@Nullable Void> stop() {
180 return attributes.unsubscribe().thenCompose(
181 b -> CompletableFuture.allOf(nodes.stream().map(Node::stop).toArray(CompletableFuture[]::new)));
185 * Return all homie nodes on this device
187 public ChildMap<Node> nodes() {
192 * @return Return true if this device is initialized
194 public boolean isInitialized() {
199 * Restore Nodes and Properties from Thing channels after handler initalization.
203 @SuppressWarnings({ "null", "unused" })
204 public void initialize(String baseTopic, String deviceID, List<Channel> channels) {
205 this.topic = baseTopic + "/" + deviceID;
206 this.deviceID = deviceID;
208 for (Channel channel : channels) {
209 final ChannelConfig channelConfig = channel.getConfiguration().as(ChannelConfig.class);
210 if (!channelConfig.commandTopic.isEmpty() && !channelConfig.retained) {
211 logger.warn("Channel {} in device {} is missing the 'retained' flag. Check your configuration.",
212 channel.getUID(), deviceID);
214 final String channelGroupId = channel.getUID().getGroupId();
215 if (channelGroupId == null) {
218 final String nodeID = UIDUtils.decode(channelGroupId);
219 final String propertyID = UIDUtils.decode(channel.getUID().getIdWithoutGroup());
220 Node node = nodes.get(nodeID);
222 node = createNode(nodeID);
223 node.nodeRestoredFromConfig();
224 nodes.put(nodeID, node);
226 // Restores the properties attribute object via the channels configuration.
227 Property property = node.createProperty(propertyID,
228 channel.getConfiguration().as(PropertyAttributes.class));
229 property.attributesReceived();
231 node.properties.put(propertyID, property);
236 * Creates a new Homie Node, a child of this Homie Device.
239 * Implementation detail: Cannot be used for mocking or spying within tests.
242 * @param nodeID The node ID
243 * @return A child node
245 public Node createNode(String nodeID) {
246 return new Node(topic, nodeID, thingUID, callback, new NodeAttributes());
250 * Creates a new Homie Node, a child of this Homie Device.
252 * @param nodeID The node ID
253 * @param attributes The node attributes object
254 * @return A child node
256 public Node createNode(String nodeID, NodeAttributes attributes) {
257 return new Node(topic, nodeID, thingUID, callback, attributes);
262 * The nodes of a device are determined by the device attribute "$nodes". If that attribute changes,
263 * {@link #attributeChanged(String, Object, MqttBrokerConnection, ScheduledExecutorService, boolean)} is
264 * called. The {@link #nodes} map will be synchronized and this method will be called for every removed node.
268 * This method will stop the node and will notify about the removed node all removed properties.
271 * @param node The removed node.
273 protected void notifyNodeRemoved(Node node) {
275 node.properties.stream().forEach(property -> node.notifyPropertyRemoved(property));
276 callback.nodeRemoved(node);
279 CompletableFuture<@Nullable Void> applyNodes(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
281 return nodes.apply(Objects.requireNonNull(attributes.nodes),
282 node -> node.subscribe(connection, scheduler, timeout), this::createNode, this::notifyNodeRemoved)
283 .exceptionally(e -> {
284 logger.warn("Could not subscribe", e);
290 public void attributeChanged(String name, Object value, MqttBrokerConnection connection,
291 ScheduledExecutorService scheduler, boolean allMandatoryFieldsReceived) {
292 if (!initialized || !allMandatoryFieldsReceived) {
295 // Special case: Not all fields were known before
296 if (!attributes.isComplete()) {
297 attributesReceived(connection, scheduler, 500);
301 callback.readyStateChanged(attributes.state);
305 applyNodes(connection, scheduler, 500);
313 * Creates a list of retained topics related to the device
315 * @return Returns a list of relative topics
317 public List<String> getRetainedTopics() {
318 List<String> topics = new ArrayList<>(Stream.of(this.attributes.getClass().getDeclaredFields())
319 .map(f -> String.format("%s/$%s", this.deviceID, f.getName())).collect(Collectors.toList()));
321 this.nodes.stream().map(n -> n.getRetainedTopics().stream().map(a -> String.format("%s/%s", this.deviceID, a))
322 .collect(Collectors.toList())).collect(Collectors.toList()).forEach(topics::addAll);