]> git.basschouten.com Git - openhab-addons.git/blob
dc2ed54565f304baa3fb172eb788aae739ecc78e
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2020 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.homie.internal.homie300;
14
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.concurrent.CompletableFuture;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.stream.Collectors;
20 import java.util.stream.Stream;
21
22 import org.eclipse.jdt.annotation.NonNullByDefault;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.openhab.binding.mqtt.generic.mapping.AbstractMqttAttributeClass;
25 import org.openhab.binding.mqtt.generic.tools.ChildMap;
26 import org.openhab.binding.mqtt.homie.generic.internal.MqttBindingConstants;
27 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
28 import org.openhab.core.thing.ChannelGroupUID;
29 import org.openhab.core.thing.ThingUID;
30 import org.openhab.core.thing.type.ChannelDefinition;
31 import org.openhab.core.thing.type.ChannelDefinitionBuilder;
32 import org.openhab.core.thing.type.ChannelGroupType;
33 import org.openhab.core.thing.type.ChannelGroupTypeBuilder;
34 import org.openhab.core.thing.type.ChannelGroupTypeUID;
35 import org.openhab.core.util.UIDUtils;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Homie 3.x Node.
41  *
42  * A Homie Node contains Homie Properties ({@link Property}) but can also have attributes ({@link NodeAttributes}).
43  * It corresponds to an ESH ChannelGroup.
44  *
45  * @author David Graeff - Initial contribution
46  */
47 @NonNullByDefault
48 public class Node implements AbstractMqttAttributeClass.AttributeChanged {
49     private final Logger logger = LoggerFactory.getLogger(Node.class);
50     // Homie
51     public final String nodeID;
52     public final NodeAttributes attributes;
53     public ChildMap<Property> properties;
54     // Runtime
55     public final DeviceCallback callback;
56     // ESH
57     protected final ChannelGroupUID channelGroupUID;
58     public final ChannelGroupTypeUID channelGroupTypeUID;
59     private final String topic;
60     private boolean initialized = false;
61
62     /**
63      * Creates a Homie Node.
64      *
65      * @param topic The base topic for this node (e.g. "homie/device")
66      * @param nodeID The node ID
67      * @param thingUID The Thing UID, used to determine the ChannelGroupUID.
68      * @param callback The callback for the handler.
69      */
70     public Node(String topic, String nodeID, ThingUID thingUID, DeviceCallback callback, NodeAttributes attributes) {
71         this.attributes = attributes;
72         this.topic = topic + "/" + nodeID;
73         this.nodeID = nodeID;
74         this.callback = callback;
75         channelGroupTypeUID = new ChannelGroupTypeUID(MqttBindingConstants.BINDING_ID, UIDUtils.encode(this.topic));
76         channelGroupUID = new ChannelGroupUID(thingUID, UIDUtils.encode(nodeID));
77         properties = new ChildMap<>();
78     }
79
80     /**
81      * Parse node properties. This will not subscribe to properties though. Call
82      * {@link Device#startChannels(MqttBrokerConnection)} as soon as the returned future has
83      * completed.
84      */
85     public CompletableFuture<@Nullable Void> subscribe(MqttBrokerConnection connection,
86             ScheduledExecutorService scheduler, int timeout) {
87         return attributes.subscribeAndReceive(connection, scheduler, topic, this, timeout)
88                 // On success, create all properties and tell the handler about this node
89                 .thenCompose(b -> attributesReceived(connection, scheduler, timeout))
90                 // No matter if values have been received or not -> the subscriptions have been performed
91                 .whenComplete((r, e) -> {
92                     initialized = true;
93                 });
94     }
95
96     public CompletableFuture<@Nullable Void> attributesReceived(MqttBrokerConnection connection,
97             ScheduledExecutorService scheduler, int timeout) {
98         callback.nodeAddedOrChanged(this);
99         return applyProperties(connection, scheduler, timeout);
100     }
101
102     public void nodeRestoredFromConfig() {
103         initialized = true;
104     }
105
106     /**
107      * Unsubscribe from node attribute and also all property attributes and the property value
108      *
109      * @param connection A broker connection
110      * @return Returns a future that completes as soon as all unsubscriptions have been performed.
111      */
112     public CompletableFuture<@Nullable Void> stop() {
113         return attributes.unsubscribe().thenCompose(b -> CompletableFuture
114                 .allOf(properties.stream().map(Property::stop).toArray(CompletableFuture[]::new)));
115     }
116
117     /**
118      * Return the channel group type for this Node.
119      */
120     public ChannelGroupType type() {
121         final List<ChannelDefinition> channelDefinitions = properties.stream()
122                 .map(c -> new ChannelDefinitionBuilder(c.propertyID, c.channelTypeUID).build())
123                 .collect(Collectors.toList());
124         return ChannelGroupTypeBuilder.instance(channelGroupTypeUID, attributes.name)
125                 .withChannelDefinitions(channelDefinitions).build();
126     }
127
128     /**
129      * Return the channel group UID.
130      */
131     public ChannelGroupUID uid() {
132         return channelGroupUID;
133     }
134
135     /**
136      * Create a Homie Property for this Node.
137      *
138      * @param propertyID The property ID
139      * @return A Homie Property
140      */
141     public Property createProperty(String propertyID) {
142         return new Property(topic, this, propertyID, callback, new PropertyAttributes());
143     }
144
145     /**
146      * Create a Homie Property for this Node.
147      *
148      * @param propertyID The property ID
149      * @param attributes The node attributes object
150      * @return A Homie Property
151      */
152     public Property createProperty(String propertyID, PropertyAttributes attributes) {
153         return new Property(topic, this, propertyID, callback, attributes);
154     }
155
156     /**
157      * <p>
158      * The properties of a node are determined by the node attribute "$properties". If that attribute changes,
159      * {@link #attributeChanged(CompletableFuture, String, Object, MqttBrokerConnection, ScheduledExecutorService)} is
160      * called. The {@link #properties} map will be synchronized and this method will be called for every removed
161      * property.
162      * </p>
163      *
164      * <p>
165      * This method will stop the property and will notify about the removed property.
166      * </p>
167      *
168      * @param property The removed property.
169      */
170     protected void notifyPropertyRemoved(Property property) {
171         property.stop();
172         callback.propertyRemoved(property);
173     }
174
175     protected CompletableFuture<@Nullable Void> applyProperties(MqttBrokerConnection connection,
176             ScheduledExecutorService scheduler, int timeout) {
177         return properties.apply(attributes.properties, prop -> prop.subscribe(connection, scheduler, timeout),
178                 this::createProperty, this::notifyPropertyRemoved).exceptionally(e -> {
179                     logger.warn("Could not subscribe", e);
180                     return null;
181                 });
182     }
183
184     @Override
185     public void attributeChanged(String name, Object value, MqttBrokerConnection connection,
186             ScheduledExecutorService scheduler, boolean allMandatoryFieldsReceived) {
187         if (!initialized || !allMandatoryFieldsReceived) {
188             return;
189         }
190         // Special case: Not all fields were known before
191         if (!attributes.isComplete()) {
192             attributesReceived(connection, scheduler, 500);
193         } else {
194             if ("properties".equals(name)) {
195                 applyProperties(connection, scheduler, 500);
196             }
197         }
198         callback.nodeAddedOrChanged(this);
199     }
200
201     @Override
202     public String toString() {
203         return channelGroupUID.toString();
204     }
205
206     /**
207      * Creates a list of retained topics related to the node
208      *
209      * @return Returns a list of relative topics
210      */
211     public List<String> getRetainedTopics() {
212         List<String> topics = new ArrayList<>();
213
214         topics.addAll(Stream.of(this.attributes.getClass().getDeclaredFields()).map(f -> {
215             return String.format("%s/$%s", this.nodeID, f.getName());
216         }).collect(Collectors.toList()));
217
218         this.properties.stream().map(p -> p.getRetainedTopics().stream().map(a -> {
219             return String.format("%s/%s", this.nodeID, a);
220         }).collect(Collectors.toList())).collect(Collectors.toList()).forEach(topics::addAll);
221
222         return topics;
223     }
224 }