2 * Copyright (c) 2010-2024 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.homie.internal.homie300;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.Objects;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.stream.Collectors;
21 import java.util.stream.Stream;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.mqtt.generic.mapping.AbstractMqttAttributeClass;
26 import org.openhab.binding.mqtt.generic.tools.ChildMap;
27 import org.openhab.binding.mqtt.homie.generic.internal.MqttBindingConstants;
28 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
29 import org.openhab.core.thing.ChannelGroupUID;
30 import org.openhab.core.thing.ThingUID;
31 import org.openhab.core.thing.type.ChannelDefinition;
32 import org.openhab.core.thing.type.ChannelGroupDefinition;
33 import org.openhab.core.thing.type.ChannelGroupType;
34 import org.openhab.core.thing.type.ChannelGroupTypeBuilder;
35 import org.openhab.core.thing.type.ChannelGroupTypeUID;
36 import org.openhab.core.util.UIDUtils;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
43 * A Homie Node contains Homie Properties ({@link Property}) but can also have attributes ({@link NodeAttributes}).
44 * It corresponds to a ChannelGroup.
46 * @author David Graeff - Initial contribution
49 public class Node implements AbstractMqttAttributeClass.AttributeChanged {
50 private final Logger logger = LoggerFactory.getLogger(Node.class);
52 public final String nodeID;
53 public final NodeAttributes attributes;
54 public ChildMap<Property> properties;
56 public final DeviceCallback callback;
57 protected final ChannelGroupUID channelGroupUID;
58 public final ChannelGroupTypeUID channelGroupTypeUID;
59 private final String topic;
60 private boolean initialized = false;
63 * Creates a Homie Node.
65 * @param topic The base topic for this node (e.g. "homie/device")
66 * @param nodeID The node ID
67 * @param thingUID The Thing UID, used to determine the ChannelGroupUID.
68 * @param callback The callback for the handler.
70 public Node(String topic, String nodeID, ThingUID thingUID, DeviceCallback callback, NodeAttributes attributes) {
71 this.attributes = attributes;
72 this.topic = topic + "/" + nodeID;
74 this.callback = callback;
75 channelGroupTypeUID = new ChannelGroupTypeUID(MqttBindingConstants.BINDING_ID, UIDUtils.encode(this.topic));
76 channelGroupUID = new ChannelGroupUID(thingUID, UIDUtils.encode(nodeID));
77 properties = new ChildMap<>();
81 * Parse node properties. This will not subscribe to properties though. Call
82 * {@link Device#startChannels(MqttBrokerConnection, ScheduledExecutorService, int, HomieThingHandler)}
83 * as soon as the returned future has completed.
85 public CompletableFuture<@Nullable Void> subscribe(MqttBrokerConnection connection,
86 ScheduledExecutorService scheduler, int timeout) {
87 return attributes.subscribeAndReceive(connection, scheduler, topic, this, timeout)
88 // On success, create all properties and tell the handler about this node
89 .thenCompose(b -> attributesReceived(connection, scheduler, timeout))
90 // No matter if values have been received or not -> the subscriptions have been performed
91 .whenComplete((r, e) -> {
96 public CompletableFuture<@Nullable Void> attributesReceived(MqttBrokerConnection connection,
97 ScheduledExecutorService scheduler, int timeout) {
98 callback.nodeAddedOrChanged(this);
99 return applyProperties(connection, scheduler, timeout);
102 public void nodeRestoredFromConfig() {
104 attributes.name = nodeID;
108 * Unsubscribe from node attribute and also all property attributes and the property value
110 * @return Returns a future that completes as soon as all unsubscriptions have been performed.
112 public CompletableFuture<@Nullable Void> stop() {
113 return attributes.unsubscribe().thenCompose(b -> CompletableFuture
114 .allOf(properties.stream().map(Property::stop).toArray(CompletableFuture[]::new)));
118 * Return the channel group type for this Node.
120 public ChannelGroupType type() {
121 final List<ChannelDefinition> channelDefinitions = properties.stream()
122 .map(p -> Objects.requireNonNull(p.getChannelDefinition())).collect(Collectors.toList());
123 return ChannelGroupTypeBuilder.instance(channelGroupTypeUID, attributes.name)
124 .withChannelDefinitions(channelDefinitions).build();
127 public ChannelGroupDefinition getChannelGroupDefinition() {
128 return new ChannelGroupDefinition(channelGroupUID.getId(), channelGroupTypeUID, attributes.name, null);
132 * Return the channel group UID.
134 public ChannelGroupUID uid() {
135 return channelGroupUID;
139 * Create a Homie Property for this Node.
141 * @param propertyID The property ID
142 * @return A Homie Property
144 public Property createProperty(String propertyID) {
145 return new Property(topic, this, propertyID, callback, new PropertyAttributes());
149 * Create a Homie Property for this Node.
151 * @param propertyID The property ID
152 * @param attributes The node attributes object
153 * @return A Homie Property
155 public Property createProperty(String propertyID, PropertyAttributes attributes) {
156 return new Property(topic, this, propertyID, callback, attributes);
161 * The properties of a node are determined by the node attribute "$properties". If that attribute changes,
162 * {@link #attributeChanged(String, Object, MqttBrokerConnection, ScheduledExecutorServic, boolean)} is
163 * called. The {@link #properties} map will be synchronized and this method will be called for every removed
168 * This method will stop the property and will notify about the removed property.
171 * @param property The removed property.
173 protected void notifyPropertyRemoved(Property property) {
175 callback.propertyRemoved(property);
178 protected CompletableFuture<@Nullable Void> applyProperties(MqttBrokerConnection connection,
179 ScheduledExecutorService scheduler, int timeout) {
180 return properties.apply(Objects.requireNonNull(attributes.properties),
181 prop -> prop.subscribe(connection, scheduler, timeout), this::createProperty,
182 this::notifyPropertyRemoved).exceptionally(e -> {
183 logger.warn("Could not subscribe", e);
189 public void attributeChanged(String name, Object value, MqttBrokerConnection connection,
190 ScheduledExecutorService scheduler, boolean allMandatoryFieldsReceived) {
191 if (!initialized || !allMandatoryFieldsReceived) {
194 // Special case: Not all fields were known before
195 if (!attributes.isComplete()) {
196 attributesReceived(connection, scheduler, 500);
197 } else if ("properties".equals(name)) {
198 applyProperties(connection, scheduler, 500);
200 callback.nodeAddedOrChanged(this);
204 public String toString() {
205 return channelGroupUID.toString();
209 * Creates a list of retained topics related to the node
211 * @return Returns a list of relative topics
213 public List<String> getRetainedTopics() {
214 List<String> topics = new ArrayList<>(Stream.of(this.attributes.getClass().getDeclaredFields())
215 .map(f -> String.format("%s/$%s", this.nodeID, f.getName())).collect(Collectors.toList()));
217 this.properties.stream().map(p -> p.getRetainedTopics().stream()
218 .map(a -> String.format("%s/%s", this.nodeID, a)).collect(Collectors.toList()))
219 .collect(Collectors.toList()).forEach(topics::addAll);