]> git.basschouten.com Git - openhab-addons.git/blob
cfe3f7316caab83629d8e0368fbed28d82aefd10
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2023 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic.mapping;
14
15 import java.lang.reflect.Field;
16 import java.math.BigDecimal;
17 import java.nio.charset.StandardCharsets;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
26 import org.openhab.core.io.transport.mqtt.MqttException;
27 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * Use this class to subscribe to a given MQTT topic via a {@link MqttMessageSubscriber}
33  * and convert received values to the type of the given field and notify the user of the changed value.
34  *
35  * Used by {@link AbstractMqttAttributeClass}.
36  *
37  * @author David Graeff - Initial contribution
38  */
39 @NonNullByDefault
40 public class SubscribeFieldToMQTTtopic implements MqttMessageSubscriber {
41     private final Logger logger = LoggerFactory.getLogger(SubscribeFieldToMQTTtopic.class);
42     protected CompletableFuture<@Nullable Void> future = new CompletableFuture<>();
43     public final Field field;
44     public final FieldChanged changeConsumer;
45     public final String topic;
46     private final ScheduledExecutorService scheduler;
47     private @Nullable ScheduledFuture<?> scheduledFuture;
48     private final boolean mandatory;
49     private boolean receivedValue = false;
50
51     /**
52      * Implement this interface to be notified of an updated field.
53      */
54     public interface FieldChanged {
55         void fieldChanged(Field field, Object value);
56     }
57
58     /**
59      * Create a {@link SubscribeFieldToMQTTtopic}.
60      *
61      * @param scheduler A scheduler to realize subscription timeouts.
62      * @param field The destination field.
63      * @param fieldChangeListener A listener for field changes. This is only called if the received value
64      *            could successfully be converted to the field type.
65      * @param topic The MQTT topic.
66      * @param mandatory True of this field is a mandatory one. A timeout will cause a future to complete exceptionally.
67      */
68     public SubscribeFieldToMQTTtopic(ScheduledExecutorService scheduler, Field field, FieldChanged fieldChangeListener,
69             String topic, boolean mandatory) {
70         this.scheduler = scheduler;
71         this.field = field;
72         this.changeConsumer = fieldChangeListener;
73         this.topic = topic;
74         this.mandatory = mandatory;
75     }
76
77     static Object numberConvert(Object value, Class<?> type) throws IllegalArgumentException, NumberFormatException {
78         Object result = value;
79         // Handle the conversion case of BigDecimal to Float,Double,Long,Integer and the respective
80         // primitive types
81         String typeName = type.getSimpleName();
82         if (value instanceof BigDecimal && !type.equals(BigDecimal.class)) {
83             BigDecimal bdValue = (BigDecimal) value;
84             if (type.equals(Float.class) || "float".equals(typeName)) {
85                 result = bdValue.floatValue();
86             } else if (type.equals(Double.class) || "double".equals(typeName)) {
87                 result = bdValue.doubleValue();
88             } else if (type.equals(Long.class) || "long".equals(typeName)) {
89                 result = bdValue.longValue();
90             } else if (type.equals(Integer.class) || "int".equals(typeName)) {
91                 result = bdValue.intValue();
92             }
93         } else
94         // Handle the conversion case of String to Float,Double,Long,Integer,BigDecimal and the respective
95         // primitive types
96         if (value instanceof String && !type.equals(String.class)) {
97             String bdValue = (String) value;
98             if (type.equals(Float.class) || "float".equals(typeName)) {
99                 result = Float.valueOf(bdValue);
100             } else if (type.equals(Double.class) || "double".equals(typeName)) {
101                 result = Double.valueOf(bdValue);
102             } else if (type.equals(Long.class) || "long".equals(typeName)) {
103                 result = Long.valueOf(bdValue);
104             } else if (type.equals(BigDecimal.class)) {
105                 result = new BigDecimal(bdValue);
106             } else if (type.equals(Integer.class) || "int".equals(typeName)) {
107                 result = Integer.valueOf(bdValue);
108             } else if (type.equals(Boolean.class) || "boolean".equals(typeName)) {
109                 result = Boolean.valueOf(bdValue);
110             } else if (type.isEnum()) {
111                 @SuppressWarnings({ "rawtypes", "unchecked" })
112                 final Class<? extends Enum> enumType = (Class<? extends Enum>) type;
113                 @SuppressWarnings("unchecked")
114                 Enum<?> enumValue = Enum.valueOf(enumType, value.toString());
115                 result = enumValue;
116             }
117         }
118         return result;
119     }
120
121     /**
122      * Callback by the {@link MqttBrokerConnection} if a matching topic received a new value.
123      * Because routing is already done by aforementioned class, the topic parameter is not checked again.
124      *
125      * @param topic The MQTT topic. Not used.
126      * @param payload The MQTT payload.
127      */
128     @SuppressWarnings({ "null", "unused" })
129     @Override
130     public void processMessage(String topic, byte[] payload) {
131         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
132         if (scheduledFuture != null) { // Cancel timeout
133             scheduledFuture.cancel(false);
134             this.scheduledFuture = null;
135         }
136
137         if (payload.length == 0) {
138             logger.debug("NULL payload on topic: {}", topic);
139             return;
140         }
141
142         String valueStr = new String(payload, StandardCharsets.UTF_8);
143         String originalValueStr = valueStr;
144
145         // Check if there is a manipulation annotation attached to the field
146         try {
147             final MQTTvalueTransform transform = field.getAnnotation(MQTTvalueTransform.class);
148             Object value;
149             if (transform != null) {
150                 // Add a prefix/suffix to the value
151                 valueStr = transform.prefix() + valueStr + transform.suffix();
152                 // Split the value if the field is an array. Convert numbers/enums if necessary.
153                 value = field.getType().isArray() ? valueStr.split(transform.splitCharacter())
154                         : numberConvert(valueStr, field.getType());
155             } else if (field.getType().isArray()) {
156                 throw new IllegalArgumentException("No split character defined!");
157             } else {
158                 // Convert numbers/enums if necessary
159                 value = numberConvert(valueStr, field.getType());
160             }
161             receivedValue = true;
162             changeConsumer.fieldChanged(field, value);
163             future.complete(null);
164         } catch (IllegalArgumentException e) {
165             if (mandatory) {
166                 future.completeExceptionally(e);
167             } else {
168                 logger.warn("Unable to interpret {} from topic {}", originalValueStr, topic);
169                 future.complete(null);
170             }
171         }
172     }
173
174     void timeoutReached() {
175         if (mandatory) {
176             future.completeExceptionally(new Exception("Did not receive mandatory topic value: " + topic));
177         } else {
178             future.complete(null);
179         }
180     }
181
182     /**
183      * Subscribe to the MQTT topic. A {@link SubscribeFieldToMQTTtopic} cannot be stopped.
184      * You need to manually unsubscribe from the {@link #topic} before disposing.
185      *
186      * @param connection An MQTT connection.
187      * @param timeout Timeout in milliseconds. The returned future completes after this time even if no message has
188      *            been received for the MQTT topic.
189      * @return Returns a future that completes if either a value is received for the topic or a timeout happens.
190      * @throws MqttException If an MQTT IO exception happens this exception is thrown.
191      */
192     public CompletableFuture<@Nullable Void> subscribeAndReceive(MqttBrokerConnection connection, int timeout) {
193         connection.subscribe(topic, this).exceptionally(e -> {
194             logger.debug("Failed to subscribe to topic {}", topic, e);
195             final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
196             if (scheduledFuture != null) { // Cancel timeout
197                 scheduledFuture.cancel(false);
198                 this.scheduledFuture = null;
199             }
200             future.complete(null);
201             return false;
202         }).thenRun(() -> {
203             if (!future.isDone()) {
204                 this.scheduledFuture = scheduler.schedule(this::timeoutReached, timeout, TimeUnit.MILLISECONDS);
205             }
206         });
207         return future;
208     }
209
210     /**
211      * Return true if the corresponding field has received a value at least once.
212      */
213     public boolean hasReceivedValue() {
214         return receivedValue;
215     }
216
217     /**
218      * Return true if the corresponding field is mandatory.
219      */
220     public boolean isMandatory() {
221         return mandatory;
222     }
223 }