2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.generic.mapping;
15 import java.lang.ref.WeakReference;
16 import java.lang.reflect.Field;
17 import java.lang.reflect.Modifier;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.List;
21 import java.util.concurrent.CompletableFuture;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.stream.Collectors;
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
33 * MQTT does not directly support key-value configuration maps. We do need those for device discovery and thing
34 * configuration though.<br>
35 * Different conventions came up with different solutions, and one is to have "attribute classes".
39 * An attribute class is a java class that extends {@link AbstractMqttAttributeClass} and
40 * contains annotated fields where each field corresponds to a MQTT topic.
41 * To automatically subscribe to all fields, a call to
42 * {@link #subscribeAndReceive(MqttBrokerConnection, ScheduledExecutorService, String, AttributeChanged, int)} is
44 * Unsubscribe with a call to {@link #unsubscribe()}.
48 * The Homie 3.x convention uses attribute classes for Devices, Nodes and Properties configuration.
52 * The given object, called bean in this context, can consist of all basic java types boolean, int, double, long,
53 * String, respective object wrappers like Integer, Double, Long, the BigDecimal type and Enum types. Enums need to be
54 * declared within the bean class though. Arrays like String[] are supported as well, but require an annotation because
55 * the separator needs to be known.
57 * A topic prefix can be defined for the entire class or for a single field. A field annotation overwrites a class
63 * @TopicPrefix("$")
64 * class MyAttributes extends AbstractMqttAttributeClass {
65 * public String testString;
66 * public @MapToField(splitCharacter=",") String[] multipleStrings;
68 * public int anInt = 2;
70 * public enum AnEnum {
74 * public AnEnum anEnum = AnEnum.Value1;
76 * public BigDecimal aDecimalValue
79 * public Object getFieldsOf() {
85 * You would use this class in this way:
88 * MyAttributes bean = new MyAttributes();
89 * bean.subscribe(connection, new ScheduledExecutorService(), "mqtt/topic/bean", null, 500)
90 * .thenRun(() -> System.out.println("subscribed"));
93 * The above attribute class would end up with subscriptions to "mqtt/topic/bean/$testString",
94 * "mqtt/topic/bean/$multipleStrings", "mqtt/topic/bean/$anInt" and so on. It is assumed that all MQTT messages are
97 * @author David Graeff - Initial contribution
100 public abstract class AbstractMqttAttributeClass implements SubscribeFieldToMQTTtopic.FieldChanged {
101 private final Logger logger = LoggerFactory.getLogger(AbstractMqttAttributeClass.class);
102 protected transient List<SubscribeFieldToMQTTtopic> subscriptions = new ArrayList<>();
103 public transient WeakReference<@Nullable MqttBrokerConnection> connection = new WeakReference<>(null);
104 protected transient WeakReference<@Nullable ScheduledExecutorService> scheduler = new WeakReference<>(null);
105 private final String prefix;
106 private transient String basetopic = "";
107 protected transient AttributeChanged attributeChangedListener = (b, c, d, e, f) -> {
109 private transient boolean complete = false;
112 * Implement this interface to be notified of an updated field.
114 public interface AttributeChanged {
116 * An attribute has changed
118 * @param name The name of the field
119 * @param value The new value
120 * @param connection The broker connection
121 * @param scheduler The scheduler that was used for timeouts
122 * @param allMandatoryFieldsReceived True if now all mandatory fields have values
124 void attributeChanged(String name, Object value, MqttBrokerConnection connection,
125 ScheduledExecutorService scheduler, boolean allMandatoryFieldsReceived);
128 @SuppressWarnings("null")
129 protected AbstractMqttAttributeClass() {
130 TopicPrefix topicUsesPrefix = getFieldsOf().getClass().getAnnotation(TopicPrefix.class);
131 prefix = (topicUsesPrefix != null) ? topicUsesPrefix.value() : "";
135 * Unsubscribe from all topics of the managed object.
137 * @return Returns a future that completes as soon as all unsubscriptions have been performed.
139 public CompletableFuture<@Nullable Void> unsubscribe() {
140 final MqttBrokerConnection connection = this.connection.get();
141 if (connection == null) {
142 subscriptions.clear();
143 return CompletableFuture.completedFuture(null);
146 final CompletableFuture<?>[] futures = subscriptions.stream().map(m -> connection.unsubscribe(m.topic, m))
147 .toArray(CompletableFuture[]::new);
148 subscriptions.clear();
149 return CompletableFuture.allOf(futures);
153 * Subscribe to all subtopics on a MQTT broker connection base topic that match field names of s java object.
154 * The fields will be kept in sync with their respective topics. Optionally, you can register update-observers for
157 * @param connection A MQTT broker connection.
158 * @param scheduler A scheduler for timeouts.
159 * @param basetopic The base topic. Given a base topic of "base/topic", a field "test" would be registered as
161 * @param attributeChangedListener Field change listener
162 * @param timeout Timeout per subscription in milliseconds. The returned future completes after this time
164 * message has been received for a single MQTT topic.
165 * @return Returns a future that completes as soon as values for all subscriptions have been received or have timed
168 public CompletableFuture<@Nullable Void> subscribeAndReceive(MqttBrokerConnection connection,
169 ScheduledExecutorService scheduler, String basetopic, @Nullable AttributeChanged attributeChangedListener,
171 // We first need to unsubscribe old subscriptions if any
172 final CompletableFuture<@Nullable Void> startFuture;
173 if (!subscriptions.isEmpty()) {
174 startFuture = unsubscribe();
176 startFuture = CompletableFuture.completedFuture(null);
179 this.connection = new WeakReference<>(connection);
180 this.scheduler = new WeakReference<>(scheduler);
181 this.basetopic = basetopic;
182 if (attributeChangedListener != null) {
183 this.attributeChangedListener = attributeChangedListener;
185 this.attributeChangedListener = (b, c, d, e, f) -> {
189 subscriptions = getAllFields(getFieldsOf().getClass()).stream().filter(AbstractMqttAttributeClass::filterField)
190 .map(this::mapFieldToSubscriber).collect(Collectors.toList());
192 final CompletableFuture<?>[] futures = subscriptions.stream()
193 .map(m -> m.subscribeAndReceive(connection, timeout)).toArray(CompletableFuture[]::new);
194 return CompletableFuture.allOf(startFuture, CompletableFuture.allOf(futures));
198 * Return fields of the given class as well as all super classes.
200 * @param clazz The class
201 * @return A list of Field objects
203 protected static List<Field> getAllFields(Class<?> clazz) {
204 List<Field> fields = new ArrayList<>();
206 Class<?> currentClass = clazz;
207 while (currentClass != null) {
208 fields.addAll(Arrays.asList(currentClass.getDeclaredFields()));
209 currentClass = currentClass.getSuperclass();
216 * Return true if the given field is not final and not transient
218 protected static boolean filterField(Field field) {
219 return !Modifier.isFinal(field.getModifiers()) && !Modifier.isTransient(field.getModifiers())
220 && !Modifier.isStatic(field.getModifiers());
224 * Maps the given field to a newly created {@link SubscribeFieldToMQTTtopic}.
225 * Requires the scheduler of this class to be set.
227 * @param field A field
228 * @return A newly created {@link SubscribeFieldToMQTTtopic}.
230 protected SubscribeFieldToMQTTtopic mapFieldToSubscriber(Field field) {
231 final ScheduledExecutorService scheduler = this.scheduler.get();
232 if (scheduler == null) {
233 throw new IllegalStateException("No scheduler set!");
236 MandatoryField mandatoryField = field.getAnnotation(MandatoryField.class);
237 @SuppressWarnings("null")
238 boolean mandatory = mandatoryField != null;
240 TopicPrefix topicUsesPrefix = field.getAnnotation(TopicPrefix.class);
241 @SuppressWarnings("null")
242 String localPrefix = (topicUsesPrefix != null) ? topicUsesPrefix.value() : prefix;
244 final String topic = basetopic + "/" + localPrefix + field.getName();
246 return createSubscriber(scheduler, field, topic, mandatory);
250 * Creates a field subscriber for the given field on the given object
252 * @param scheduler A scheduler for the timeout functionality
253 * @param field The field
254 * @param topic The full topic to subscribe to
255 * @param mandatory True of this field is a mandatory one. A timeout will cause a future to complete exceptionally.
256 * @return Returns a MQTT message subscriber for a single class field
258 public SubscribeFieldToMQTTtopic createSubscriber(ScheduledExecutorService scheduler, Field field, String topic,
260 return new SubscribeFieldToMQTTtopic(scheduler, field, this, topic, mandatory);
264 * Return true if this attribute class has received a value for each mandatory field.
265 * In contrast to the parameter "allMandatoryFieldsReceived" of
266 * {@link AttributeChanged#attributeChanged(String, Object, MqttBrokerConnection, ScheduledExecutorService, boolean)}
267 * this flag will only be updated after that call.
270 * You can use this behaviour to compare if a changed field was the last one to complete
271 * this attribute class. E.g.:
275 * void attributeChanged(..., boolean allMandatoryFieldsReceived) {
276 * if (allMandatoryFieldsReceived && !attributes.isComplete()) {
277 * // The attribute class is now complete but wasn't before...
282 public boolean isComplete() {
287 * One of the observed MQTT topics got a new value. Apply this to the given field now
288 * and propagate the changed value event.
291 public void fieldChanged(Field field, Object value) {
292 // This object holds only a weak reference to connection and scheduler.
293 // Attribute classes should perform an unsubscribe when a connection is lost.
294 // We fail the future exceptionally here if that didn't happen so that everyone knows.
295 final MqttBrokerConnection connection = this.connection.get();
296 final ScheduledExecutorService scheduler = this.scheduler.get();
297 if (connection == null || scheduler == null) {
298 logger.warn("No connection or scheduler set!");
301 // Set field. It is not a reason to fail the future exceptionally if a field could not be set.
302 // But at least issue a warning to the log.
304 field.set(getFieldsOf(), value);
305 final boolean newComplete = !subscriptions.stream().anyMatch(s -> s.isMandatory() && !s.hasReceivedValue());
306 attributeChangedListener.attributeChanged(field.getName(), value, connection, scheduler, newComplete);
307 complete = newComplete;
308 } catch (IllegalArgumentException | IllegalAccessException e) {
309 logger.warn("Could not assign value {} to field {}", value, field, e);
314 * Implement this method in your field class and return "this".
316 public abstract Object getFieldsOf();