]> git.basschouten.com Git - openhab-addons.git/blob
2915f4d31b8cd25e8bfc143d41a621031067d217
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic.mapping;
14
15 import java.lang.ref.WeakReference;
16 import java.lang.reflect.Field;
17 import java.lang.reflect.Modifier;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.List;
21 import java.util.concurrent.CompletableFuture;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.stream.Collectors;
24
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * <p>
33  * MQTT does not directly support key-value configuration maps. We do need those for device discovery and thing
34  * configuration though.<br>
35  * Different conventions came up with different solutions, and one is to have "attribute classes".
36  * </p>
37  *
38  * <p>
39  * An attribute class is a java class that extends {@link AbstractMqttAttributeClass} and
40  * contains annotated fields where each field corresponds to a MQTT topic.
41  * To automatically subscribe to all fields, a call to
42  * {@link #subscribeAndReceive(MqttBrokerConnection, ScheduledExecutorService, String, AttributeChanged, int)} is
43  * required.
44  * Unsubscribe with a call to {@link #unsubscribe()}.
45  * </p>
46  *
47  * <p>
48  * The Homie 3.x convention uses attribute classes for Devices, Nodes and Properties configuration.
49  * </p>
50  *
51  * <p>
52  * The given object, called bean in this context, can consist of all basic java types boolean, int, double, long,
53  * String, respective object wrappers like Integer, Double, Long, the BigDecimal type and Enum types. Enums need to be
54  * declared within the bean class though. Arrays like String[] are supported as well, but require an annotation because
55  * the separator needs to be known.
56  * </p>
57  * A topic prefix can be defined for the entire class or for a single field. A field annotation overwrites a class
58  * annotation.
59  *
60  * An example:
61  *
62  * <pre>
63  * &#64;TopicPrefix("$")
64  * class MyAttributes extends AbstractMqttAttributeClass {
65  *    public String testString;
66  *    public @MapToField(splitCharacter=",") String[] multipleStrings;
67  *
68  *    public int anInt = 2;
69  *
70  *    public enum AnEnum {
71  *      Value1,
72  *      Value2
73  *    };
74  *    public AnEnum anEnum = AnEnum.Value1;
75  *
76  *    public BigDecimal aDecimalValue
77  *
78  *    &#64;Override
79  *    public Object getFieldsOf() {
80  *        return this;
81  *    }
82  * };
83  * </pre>
84  *
85  * You would use this class in this way:
86  *
87  * <pre>
88  * MyAttributes bean = new MyAttributes();
89  * bean.subscribe(connection, new ScheduledExecutorService(), "mqtt/topic/bean", null, 500)
90  *         .thenRun(() -> System.out.println("subscribed"));
91  * </pre>
92  *
93  * The above attribute class would end up with subscriptions to "mqtt/topic/bean/$testString",
94  * "mqtt/topic/bean/$multipleStrings", "mqtt/topic/bean/$anInt" and so on. It is assumed that all MQTT messages are
95  * UTF-8 strings.
96  *
97  * @author David Graeff - Initial contribution
98  */
99 @NonNullByDefault
100 public abstract class AbstractMqttAttributeClass implements SubscribeFieldToMQTTtopic.FieldChanged {
101     private final Logger logger = LoggerFactory.getLogger(AbstractMqttAttributeClass.class);
102     protected transient List<SubscribeFieldToMQTTtopic> subscriptions = new ArrayList<>();
103     public transient WeakReference<@Nullable MqttBrokerConnection> connection = new WeakReference<>(null);
104     protected transient WeakReference<@Nullable ScheduledExecutorService> scheduler = new WeakReference<>(null);
105     private final String prefix;
106     private transient String basetopic = "";
107     protected transient AttributeChanged attributeChangedListener = (b, c, d, e, f) -> {
108     };
109     private transient boolean complete = false;
110
111     /**
112      * Implement this interface to be notified of an updated field.
113      */
114     public interface AttributeChanged {
115         /**
116          * An attribute has changed
117          *
118          * @param name The name of the field
119          * @param value The new value
120          * @param connection The broker connection
121          * @param scheduler The scheduler that was used for timeouts
122          * @param allMandatoryFieldsReceived True if now all mandatory fields have values
123          */
124         void attributeChanged(String name, Object value, MqttBrokerConnection connection,
125                 ScheduledExecutorService scheduler, boolean allMandatoryFieldsReceived);
126     }
127
128     @SuppressWarnings("null")
129     protected AbstractMqttAttributeClass() {
130         TopicPrefix topicUsesPrefix = getFieldsOf().getClass().getAnnotation(TopicPrefix.class);
131         prefix = (topicUsesPrefix != null) ? topicUsesPrefix.value() : "";
132     }
133
134     /**
135      * Unsubscribe from all topics of the managed object.
136      *
137      * @return Returns a future that completes as soon as all unsubscriptions have been performed.
138      */
139     public CompletableFuture<@Nullable Void> unsubscribe() {
140         final MqttBrokerConnection connection = this.connection.get();
141         if (connection == null) {
142             subscriptions.clear();
143             return CompletableFuture.completedFuture(null);
144         }
145
146         final CompletableFuture<?>[] futures = subscriptions.stream().map(m -> connection.unsubscribe(m.topic, m))
147                 .toArray(CompletableFuture[]::new);
148         subscriptions.clear();
149         return CompletableFuture.allOf(futures);
150     }
151
152     /**
153      * Subscribe to all subtopics on a MQTT broker connection base topic that match field names of s java object.
154      * The fields will be kept in sync with their respective topics. Optionally, you can register update-observers for
155      * specific fields.
156      *
157      * @param connection A MQTT broker connection.
158      * @param scheduler A scheduler for timeouts.
159      * @param basetopic The base topic. Given a base topic of "base/topic", a field "test" would be registered as
160      *            "base/topic/test".
161      * @param attributeChangedListener Field change listener
162      * @param timeout Timeout per subscription in milliseconds. The returned future completes after this time
163      *            even if no
164      *            message has been received for a single MQTT topic.
165      * @return Returns a future that completes as soon as values for all subscriptions have been received or have timed
166      *         out.
167      */
168     public CompletableFuture<@Nullable Void> subscribeAndReceive(MqttBrokerConnection connection,
169             ScheduledExecutorService scheduler, String basetopic, @Nullable AttributeChanged attributeChangedListener,
170             int timeout) {
171         // We first need to unsubscribe old subscriptions if any
172         final CompletableFuture<@Nullable Void> startFuture;
173         if (!subscriptions.isEmpty()) {
174             startFuture = unsubscribe();
175         } else {
176             startFuture = CompletableFuture.completedFuture(null);
177         }
178
179         this.connection = new WeakReference<>(connection);
180         this.scheduler = new WeakReference<>(scheduler);
181         this.basetopic = basetopic;
182         if (attributeChangedListener != null) {
183             this.attributeChangedListener = attributeChangedListener;
184         } else {
185             this.attributeChangedListener = (b, c, d, e, f) -> {
186             };
187         }
188
189         subscriptions = getAllFields(getFieldsOf().getClass()).stream().filter(AbstractMqttAttributeClass::filterField)
190                 .map(this::mapFieldToSubscriber).collect(Collectors.toList());
191
192         final CompletableFuture<?>[] futures = subscriptions.stream()
193                 .map(m -> m.subscribeAndReceive(connection, timeout)).toArray(CompletableFuture[]::new);
194         return CompletableFuture.allOf(startFuture, CompletableFuture.allOf(futures));
195     }
196
197     /**
198      * Return fields of the given class as well as all super classes.
199      *
200      * @param clazz The class
201      * @return A list of Field objects
202      */
203     protected static List<Field> getAllFields(Class<?> clazz) {
204         List<Field> fields = new ArrayList<>();
205
206         Class<?> currentClass = clazz;
207         while (currentClass != null) {
208             fields.addAll(Arrays.asList(currentClass.getDeclaredFields()));
209             currentClass = currentClass.getSuperclass();
210         }
211
212         return fields;
213     }
214
215     /**
216      * Return true if the given field is not final and not transient
217      */
218     protected static boolean filterField(Field field) {
219         return !Modifier.isFinal(field.getModifiers()) && !Modifier.isTransient(field.getModifiers())
220                 && !Modifier.isStatic(field.getModifiers());
221     }
222
223     /**
224      * Maps the given field to a newly created {@link SubscribeFieldToMQTTtopic}.
225      * Requires the scheduler of this class to be set.
226      *
227      * @param field A field
228      * @return A newly created {@link SubscribeFieldToMQTTtopic}.
229      */
230     protected SubscribeFieldToMQTTtopic mapFieldToSubscriber(Field field) {
231         final ScheduledExecutorService scheduler = this.scheduler.get();
232         if (scheduler == null) {
233             throw new IllegalStateException("No scheduler set!");
234         }
235
236         MandatoryField mandatoryField = field.getAnnotation(MandatoryField.class);
237         @SuppressWarnings("null")
238         boolean mandatory = mandatoryField != null;
239
240         TopicPrefix topicUsesPrefix = field.getAnnotation(TopicPrefix.class);
241         @SuppressWarnings("null")
242         String localPrefix = (topicUsesPrefix != null) ? topicUsesPrefix.value() : prefix;
243
244         final String topic = basetopic + "/" + localPrefix + field.getName();
245
246         return createSubscriber(scheduler, field, topic, mandatory);
247     }
248
249     /**
250      * Creates a field subscriber for the given field on the given object
251      *
252      * @param scheduler A scheduler for the timeout functionality
253      * @param field The field
254      * @param topic The full topic to subscribe to
255      * @param mandatory True of this field is a mandatory one. A timeout will cause a future to complete exceptionally.
256      * @return Returns a MQTT message subscriber for a single class field
257      */
258     public SubscribeFieldToMQTTtopic createSubscriber(ScheduledExecutorService scheduler, Field field, String topic,
259             boolean mandatory) {
260         return new SubscribeFieldToMQTTtopic(scheduler, field, this, topic, mandatory);
261     }
262
263     /**
264      * Return true if this attribute class has received a value for each mandatory field.
265      * In contrast to the parameter "allMandatoryFieldsReceived" of
266      * {@link AttributeChanged#attributeChanged(String, Object, MqttBrokerConnection, ScheduledExecutorService, boolean)}
267      * this flag will only be updated after that call.
268      *
269      * <p>
270      * You can use this behaviour to compare if a changed field was the last one to complete
271      * this attribute class. E.g.:
272      * </p>
273      *
274      * <pre>
275      * {@code
276      * void attributeChanged(..., boolean allMandatoryFieldsReceived) {
277      *   if (allMandatoryFieldsReceived && !attributes.isComplete()) {
278      *      // The attribute class is now complete but wasn't before...
279      *   }
280      * }
281      * }
282      * </pre>
283      */
284     public boolean isComplete() {
285         return complete;
286     }
287
288     /**
289      * One of the observed MQTT topics got a new value. Apply this to the given field now
290      * and propagate the changed value event.
291      */
292     @Override
293     public void fieldChanged(Field field, Object value) {
294         // This object holds only a weak reference to connection and scheduler.
295         // Attribute classes should perform an unsubscribe when a connection is lost.
296         // We fail the future exceptionally here if that didn't happen so that everyone knows.
297         final MqttBrokerConnection connection = this.connection.get();
298         final ScheduledExecutorService scheduler = this.scheduler.get();
299         if (connection == null || scheduler == null) {
300             logger.warn("No connection or scheduler set!");
301             return;
302         }
303         // Set field. It is not a reason to fail the future exceptionally if a field could not be set.
304         // But at least issue a warning to the log.
305         try {
306             field.set(getFieldsOf(), value);
307             final boolean newComplete = !subscriptions.stream().anyMatch(s -> s.isMandatory() && !s.hasReceivedValue());
308             attributeChangedListener.attributeChanged(field.getName(), value, connection, scheduler, newComplete);
309             complete = newComplete;
310         } catch (IllegalArgumentException | IllegalAccessException e) {
311             logger.warn("Could not assign value {} to field {}", value, field, e);
312         }
313     }
314
315     /**
316      * Implement this method in your field class and return "this".
317      */
318     public abstract Object getFieldsOf();
319 }