]> git.basschouten.com Git - openhab-addons.git/blob
41866f0773937f82419b83eaa62690d385309efc
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic.mapping;
14
15 import java.lang.reflect.Field;
16 import java.math.BigDecimal;
17 import java.nio.charset.StandardCharsets;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
26 import org.openhab.core.io.transport.mqtt.MqttException;
27 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * Use this class to subscribe to a given MQTT topic via a {@link MqttMessageSubscriber}
33  * and convert received values to the type of the given field and notify the user of the changed value.
34  *
35  * Used by {@link AbstractMqttAttributeClass}.
36  *
37  * @author David Graeff - Initial contribution
38  */
39 @NonNullByDefault
40 public class SubscribeFieldToMQTTtopic implements MqttMessageSubscriber {
41     private final Logger logger = LoggerFactory.getLogger(SubscribeFieldToMQTTtopic.class);
42     protected CompletableFuture<@Nullable Void> future = new CompletableFuture<>();
43     public final Field field;
44     public final FieldChanged changeConsumer;
45     public final String topic;
46     private final ScheduledExecutorService scheduler;
47     private @Nullable ScheduledFuture<?> scheduledFuture;
48     private final boolean mandatory;
49     private boolean receivedValue = false;
50
51     /**
52      * Implement this interface to be notified of an updated field.
53      */
54     public interface FieldChanged {
55         void fieldChanged(Field field, Object value);
56     }
57
58     /**
59      * Create a {@link SubscribeFieldToMQTTtopic}.
60      *
61      * @param scheduler A scheduler to realize subscription timeouts.
62      * @param field The destination field.
63      * @param fieldChangeListener A listener for field changes. This is only called if the received value
64      *            could successfully be converted to the field type.
65      * @param topic The MQTT topic.
66      * @param mandatory True of this field is a mandatory one. A timeout will cause a future to complete exceptionally.
67      */
68     public SubscribeFieldToMQTTtopic(ScheduledExecutorService scheduler, Field field, FieldChanged fieldChangeListener,
69             String topic, boolean mandatory) {
70         this.scheduler = scheduler;
71         this.field = field;
72         this.changeConsumer = fieldChangeListener;
73         this.topic = topic;
74         this.mandatory = mandatory;
75     }
76
77     static Object numberConvert(Object value, Class<?> type) throws IllegalArgumentException, NumberFormatException {
78         Object result = value;
79         // Handle the conversion case of BigDecimal to Float,Double,Long,Integer and the respective
80         // primitive types
81         String typeName = type.getSimpleName();
82         if (value instanceof BigDecimal bdValue && !type.equals(BigDecimal.class)) {
83             if (type.equals(Float.class) || "float".equals(typeName)) {
84                 result = bdValue.floatValue();
85             } else if (type.equals(Double.class) || "double".equals(typeName)) {
86                 result = bdValue.doubleValue();
87             } else if (type.equals(Long.class) || "long".equals(typeName)) {
88                 result = bdValue.longValue();
89             } else if (type.equals(Integer.class) || "int".equals(typeName)) {
90                 result = bdValue.intValue();
91             }
92         } else
93         // Handle the conversion case of String to Float,Double,Long,Integer,BigDecimal and the respective
94         // primitive types
95         if (value instanceof String bdValue && !type.equals(String.class)) {
96             if (type.equals(Float.class) || "float".equals(typeName)) {
97                 result = Float.valueOf(bdValue);
98             } else if (type.equals(Double.class) || "double".equals(typeName)) {
99                 result = Double.valueOf(bdValue);
100             } else if (type.equals(Long.class) || "long".equals(typeName)) {
101                 result = Long.valueOf(bdValue);
102             } else if (type.equals(BigDecimal.class)) {
103                 result = new BigDecimal(bdValue);
104             } else if (type.equals(Integer.class) || "int".equals(typeName)) {
105                 result = Integer.valueOf(bdValue);
106             } else if (type.equals(Boolean.class) || "boolean".equals(typeName)) {
107                 result = Boolean.valueOf(bdValue);
108             } else if (type.isEnum()) {
109                 @SuppressWarnings({ "rawtypes", "unchecked" })
110                 final Class<? extends Enum> enumType = (Class<? extends Enum>) type;
111                 @SuppressWarnings("unchecked")
112                 Enum<?> enumValue = Enum.valueOf(enumType, value.toString());
113                 result = enumValue;
114             }
115         }
116         return result;
117     }
118
119     /**
120      * Callback by the {@link MqttBrokerConnection} if a matching topic received a new value.
121      * Because routing is already done by aforementioned class, the topic parameter is not checked again.
122      *
123      * @param topic The MQTT topic. Not used.
124      * @param payload The MQTT payload.
125      */
126     @SuppressWarnings({ "null", "unused" })
127     @Override
128     public void processMessage(String topic, byte[] payload) {
129         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
130         if (scheduledFuture != null) { // Cancel timeout
131             scheduledFuture.cancel(false);
132             this.scheduledFuture = null;
133         }
134
135         if (payload.length == 0) {
136             logger.debug("NULL payload on topic: {}", topic);
137             return;
138         }
139
140         String valueStr = new String(payload, StandardCharsets.UTF_8);
141         String originalValueStr = valueStr;
142
143         // Check if there is a manipulation annotation attached to the field
144         try {
145             final MQTTvalueTransform transform = field.getAnnotation(MQTTvalueTransform.class);
146             Object value;
147             if (transform != null) {
148                 // Add a prefix/suffix to the value
149                 valueStr = transform.prefix() + valueStr + transform.suffix();
150                 // Split the value if the field is an array. Convert numbers/enums if necessary.
151                 value = field.getType().isArray() ? valueStr.split(transform.splitCharacter())
152                         : numberConvert(valueStr, field.getType());
153             } else if (field.getType().isArray()) {
154                 throw new IllegalArgumentException("No split character defined!");
155             } else {
156                 // Convert numbers/enums if necessary
157                 value = numberConvert(valueStr, field.getType());
158             }
159             receivedValue = true;
160             changeConsumer.fieldChanged(field, value);
161             future.complete(null);
162         } catch (IllegalArgumentException e) {
163             if (mandatory) {
164                 future.completeExceptionally(e);
165             } else {
166                 logger.warn("Unable to interpret {} from topic {}", originalValueStr, topic);
167                 future.complete(null);
168             }
169         }
170     }
171
172     void timeoutReached() {
173         if (mandatory) {
174             future.completeExceptionally(new Exception("Did not receive mandatory topic value: " + topic));
175         } else {
176             future.complete(null);
177         }
178     }
179
180     /**
181      * Subscribe to the MQTT topic. A {@link SubscribeFieldToMQTTtopic} cannot be stopped.
182      * You need to manually unsubscribe from the {@link #topic} before disposing.
183      *
184      * @param connection An MQTT connection.
185      * @param timeout Timeout in milliseconds. The returned future completes after this time even if no message has
186      *            been received for the MQTT topic.
187      * @return Returns a future that completes if either a value is received for the topic or a timeout happens.
188      * @throws MqttException If an MQTT IO exception happens this exception is thrown.
189      */
190     public CompletableFuture<@Nullable Void> subscribeAndReceive(MqttBrokerConnection connection, int timeout) {
191         connection.subscribe(topic, this).exceptionally(e -> {
192             logger.debug("Failed to subscribe to topic {}", topic, e);
193             final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
194             if (scheduledFuture != null) { // Cancel timeout
195                 scheduledFuture.cancel(false);
196                 this.scheduledFuture = null;
197             }
198             future.complete(null);
199             return false;
200         }).thenRun(() -> {
201             if (!future.isDone()) {
202                 this.scheduledFuture = scheduler.schedule(this::timeoutReached, timeout, TimeUnit.MILLISECONDS);
203             }
204         });
205         return future;
206     }
207
208     /**
209      * Return true if the corresponding field has received a value at least once.
210      */
211     public boolean hasReceivedValue() {
212         return receivedValue;
213     }
214
215     /**
216      * Return true if the corresponding field is mandatory.
217      */
218     public boolean isMandatory() {
219         return mandatory;
220     }
221 }