]> git.basschouten.com Git - openhab-addons.git/blob
cfb27ab9f6e18d79dc7bf57c20dd93461cce8b6d
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic.mapping;
14
15 import java.lang.ref.WeakReference;
16 import java.lang.reflect.Field;
17 import java.lang.reflect.Modifier;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.List;
21 import java.util.concurrent.CompletableFuture;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.stream.Collectors;
24
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * <p>
33  * MQTT does not directly support key-value configuration maps. We do need those for device discovery and thing
34  * configuration though.<br>
35  * Different conventions came up with different solutions, and one is to have "attribute classes".
36  * </p>
37  *
38  * <p>
39  * An attribute class is a java class that extends {@link AbstractMqttAttributeClass} and
40  * contains annotated fields where each field corresponds to a MQTT topic.
41  * To automatically subscribe to all fields, a call to
42  * {@link #subscribeAndReceive(MqttBrokerConnection, ScheduledExecutorService, String, AttributeChanged, int)} is
43  * required.
44  * Unsubscribe with a call to {@link #unsubscribe(AbstractMqttAttributeClass)}.
45  * </p>
46  *
47  * <p>
48  * The Homie 3.x convention uses attribute classes for Devices, Nodes and Properties configuration.
49  * </p>
50  *
51  * <p>
52  * The given object, called bean in this context, can consist of all basic java types boolean, int, double, long,
53  * String, respective object wrappers like Integer, Double, Long, the BigDecimal type and Enum types. Enums need to be
54  * declared within the bean class though. Arrays like String[] are supported as well, but require an annotation because
55  * the separator needs to be known.
56  * </p>
57  * A topic prefix can be defined for the entire class or for a single field. A field annotation overwrites a class
58  * annotation.
59  *
60  * An example:
61  *
62  * <pre>
63  * &#64;TopicPrefix("$")
64  * class MyAttributes extends AbstractMqttAttributeClass {
65  *    public String testString;
66  *    public @MapToField(splitCharacter=",") String[] multipleStrings;
67  *
68  *    public int anInt = 2;
69  *
70  *    public enum AnEnum {
71  *      Value1,
72  *      Value2
73  *    };
74  *    public AnEnum anEnum = AnEnum.Value1;
75  *
76  *    public BigDecimal aDecimalValue
77  *
78  *    &#64;Override
79  *    public Object getFieldsOf() {
80  *        return this;
81  *    }
82  * };
83  * </pre>
84  *
85  * You would use this class in this way:
86  *
87  * <pre>
88  * MyAttributes bean = new MyAttributes();
89  * bean.subscribe(connection, new ScheduledExecutorService(), "mqtt/topic/bean", null, 500)
90  *         .thenRun(() -> System.out.println("subscribed"));
91  * </pre>
92  *
93  * The above attribute class would end up with subscriptions to "mqtt/topic/bean/$testString",
94  * "mqtt/topic/bean/$multipleStrings", "mqtt/topic/bean/$anInt" and so on. It is assumed that all MQTT messages are
95  * UTF-8 strings.
96  *
97  * @author David Graeff - Initial contribution
98  */
99 @NonNullByDefault
100 public abstract class AbstractMqttAttributeClass implements SubscribeFieldToMQTTtopic.FieldChanged {
101     private final Logger logger = LoggerFactory.getLogger(AbstractMqttAttributeClass.class);
102     protected transient List<SubscribeFieldToMQTTtopic> subscriptions = new ArrayList<>();
103     public transient WeakReference<@Nullable MqttBrokerConnection> connection = new WeakReference<>(null);
104     protected transient WeakReference<@Nullable ScheduledExecutorService> scheduler = new WeakReference<>(null);
105     private final String prefix;
106     private transient String basetopic = "";
107     protected transient AttributeChanged attributeChangedListener = (b, c, d, e, f) -> {
108     };
109     private transient boolean complete = false;
110
111     /**
112      * Implement this interface to be notified of an updated field.
113      */
114     public interface AttributeChanged {
115         /**
116          * An attribute has changed
117          *
118          * @param name The name of the field
119          * @param value The new value
120          * @param connection The broker connection
121          * @param scheduler The scheduler that was used for timeouts
122          * @param allMandatoryFieldsReceived True if now all mandatory fields have values
123          */
124         void attributeChanged(String name, Object value, MqttBrokerConnection connection,
125                 ScheduledExecutorService scheduler, boolean allMandatoryFieldsReceived);
126     }
127
128     @SuppressWarnings("null")
129     protected AbstractMqttAttributeClass() {
130         TopicPrefix topicUsesPrefix = getFieldsOf().getClass().getAnnotation(TopicPrefix.class);
131         prefix = (topicUsesPrefix != null) ? topicUsesPrefix.value() : "";
132     }
133
134     /**
135      * Unsubscribe from all topics of the managed object.
136      *
137      * @param connection A broker connection to remove the subscriptions from.
138      * @param objectWithFields A bean class
139      * @return Returns a future that completes as soon as all unsubscriptions have been performed.
140      */
141     public CompletableFuture<@Nullable Void> unsubscribe() {
142         final MqttBrokerConnection connection = this.connection.get();
143         if (connection == null) {
144             subscriptions.clear();
145             return CompletableFuture.completedFuture(null);
146         }
147
148         final CompletableFuture<?>[] futures = subscriptions.stream().map(m -> connection.unsubscribe(m.topic, m))
149                 .toArray(CompletableFuture[]::new);
150         subscriptions.clear();
151         return CompletableFuture.allOf(futures);
152     }
153
154     /**
155      * Subscribe to all subtopics on a MQTT broker connection base topic that match field names of s java object.
156      * The fields will be kept in sync with their respective topics. Optionally, you can register update-observers for
157      * specific fields.
158      *
159      * @param connection A MQTT broker connection.
160      * @param scheduler A scheduler for timeouts.
161      * @param basetopic The base topic. Given a base topic of "base/topic", a field "test" would be registered as
162      *            "base/topic/test".
163      * @param attributeChangedListener Field change listener
164      * @param timeout Timeout per subscription in milliseconds. The returned future completes after this time
165      *            even if no
166      *            message has been received for a single MQTT topic.
167      * @return Returns a future that completes as soon as values for all subscriptions have been received or have timed
168      *         out.
169      */
170     public CompletableFuture<@Nullable Void> subscribeAndReceive(MqttBrokerConnection connection,
171             ScheduledExecutorService scheduler, String basetopic, @Nullable AttributeChanged attributeChangedListener,
172             int timeout) {
173         // We first need to unsubscribe old subscriptions if any
174         final CompletableFuture<@Nullable Void> startFuture;
175         if (!subscriptions.isEmpty()) {
176             startFuture = unsubscribe();
177         } else {
178             startFuture = CompletableFuture.completedFuture(null);
179         }
180
181         this.connection = new WeakReference<>(connection);
182         this.scheduler = new WeakReference<>(scheduler);
183         this.basetopic = basetopic;
184         if (attributeChangedListener != null) {
185             this.attributeChangedListener = attributeChangedListener;
186         } else {
187             this.attributeChangedListener = (b, c, d, e, f) -> {
188             };
189         }
190
191         subscriptions = getAllFields(getFieldsOf().getClass()).stream().filter(AbstractMqttAttributeClass::filterField)
192                 .map(this::mapFieldToSubscriber).collect(Collectors.toList());
193
194         final CompletableFuture<?>[] futures = subscriptions.stream()
195                 .map(m -> m.subscribeAndReceive(connection, timeout)).toArray(CompletableFuture[]::new);
196         return CompletableFuture.allOf(startFuture, CompletableFuture.allOf(futures));
197     }
198
199     /**
200      * Return fields of the given class as well as all super classes.
201      *
202      * @param clazz The class
203      * @return A list of Field objects
204      */
205     protected static List<Field> getAllFields(Class<?> clazz) {
206         List<Field> fields = new ArrayList<>();
207
208         Class<?> currentClass = clazz;
209         while (currentClass != null) {
210             fields.addAll(Arrays.asList(currentClass.getDeclaredFields()));
211             currentClass = currentClass.getSuperclass();
212         }
213
214         return fields;
215     }
216
217     /**
218      * Return true if the given field is not final and not transient
219      */
220     protected static boolean filterField(Field field) {
221         return !Modifier.isFinal(field.getModifiers()) && !Modifier.isTransient(field.getModifiers())
222                 && !Modifier.isStatic(field.getModifiers());
223     }
224
225     /**
226      * Maps the given field to a newly created {@link SubscribeFieldToMQTTtopic}.
227      * Requires the scheduler of this class to be set.
228      *
229      * @param field A field
230      * @return A newly created {@link SubscribeFieldToMQTTtopic}.
231      */
232     protected SubscribeFieldToMQTTtopic mapFieldToSubscriber(Field field) {
233         final ScheduledExecutorService scheduler = this.scheduler.get();
234         if (scheduler == null) {
235             throw new IllegalStateException("No scheduler set!");
236         }
237
238         MandatoryField mandatoryField = field.getAnnotation(MandatoryField.class);
239         @SuppressWarnings("null")
240         boolean mandatory = mandatoryField != null;
241
242         TopicPrefix topicUsesPrefix = field.getAnnotation(TopicPrefix.class);
243         @SuppressWarnings("null")
244         String localPrefix = (topicUsesPrefix != null) ? topicUsesPrefix.value() : prefix;
245
246         final String topic = basetopic + "/" + localPrefix + field.getName();
247
248         return createSubscriber(scheduler, field, topic, mandatory);
249     }
250
251     /**
252      * Creates a field subscriber for the given field on the given object
253      *
254      * @param scheduler A scheduler for the timeout functionality
255      * @param field The field
256      * @param topic The full topic to subscribe to
257      * @param mandatory True of this field is a mandatory one. A timeout will cause a future to complete exceptionally.
258      * @return Returns a MQTT message subscriber for a single class field
259      */
260     public SubscribeFieldToMQTTtopic createSubscriber(ScheduledExecutorService scheduler, Field field, String topic,
261             boolean mandatory) {
262         return new SubscribeFieldToMQTTtopic(scheduler, field, this, topic, mandatory);
263     }
264
265     /**
266      * Return true if this attribute class has received a value for each mandatory field.
267      * In contrast to the parameter "allMandatoryFieldsReceived" of
268      * {@link AttributeChanged#attributeChanged(String, Object, MqttBrokerConnection, ScheduledExecutorService, boolean)}
269      * this flag will only be updated after that call.
270      *
271      * <p>
272      * You can use this behaviour to compare if a changed field was the last one to complete
273      * this attribute class. E.g.:
274      * </p>
275      *
276      * <pre>
277      * void attributeChanged(..., boolean allMandatoryFieldsReceived) {
278      *   if (allMandatoryFieldsReceived && !attributes.isComplete()) {
279      *      // The attribute class is now complete but wasn't before...
280      *   }
281      * }
282      * </pre>
283      */
284     public boolean isComplete() {
285         return complete;
286     }
287
288     /**
289      * One of the observed MQTT topics got a new value. Apply this to the given field now
290      * and propagate the changed value event.
291      */
292     @Override
293     public void fieldChanged(Field field, Object value) {
294         // This object holds only a weak reference to connection and scheduler.
295         // Attribute classes should perform an unsubscribe when a connection is lost.
296         // We fail the future exceptionally here if that didn't happen so that everyone knows.
297         final MqttBrokerConnection connection = this.connection.get();
298         final ScheduledExecutorService scheduler = this.scheduler.get();
299         if (connection == null || scheduler == null) {
300             logger.warn("No connection or scheduler set!");
301             return;
302         }
303         // Set field. It is not a reason to fail the future exceptionally if a field could not be set.
304         // But at least issue a warning to the log.
305         try {
306             field.set(getFieldsOf(), value);
307             final boolean newComplete = !subscriptions.stream().anyMatch(s -> s.isMandatory() && !s.hasReceivedValue());
308             attributeChangedListener.attributeChanged(field.getName(), value, connection, scheduler, newComplete);
309             complete = newComplete;
310         } catch (IllegalArgumentException | IllegalAccessException e) {
311             logger.warn("Could not assign value {} to field {}", value, field, e);
312         }
313     }
314
315     /**
316      * Implement this method in your field class and return "this".
317      */
318     public abstract Object getFieldsOf();
319 }