]> git.basschouten.com Git - openhab-addons.git/blob
f98c905119c6bdf458c8867cf60ed65241c86bac
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic.mapping;
14
15 import java.lang.reflect.Field;
16 import java.math.BigDecimal;
17 import java.nio.charset.StandardCharsets;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
26 import org.openhab.core.io.transport.mqtt.MqttException;
27 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * Use this class to subscribe to a given MQTT topic via a {@link MqttMessageSubscriber}
33  * and convert received values to the type of the given field and notify the user of the changed value.
34  *
35  * Used by {@link AbstractMqttAttributeClass}.
36  *
37  * @author David Graeff - Initial contribution
38  */
39 @NonNullByDefault
40 public class SubscribeFieldToMQTTtopic implements MqttMessageSubscriber {
41     private final Logger logger = LoggerFactory.getLogger(SubscribeFieldToMQTTtopic.class);
42     protected CompletableFuture<@Nullable Void> future = new CompletableFuture<>();
43     public final Field field;
44     public final FieldChanged changeConsumer;
45     public final String topic;
46     private final ScheduledExecutorService scheduler;
47     private @Nullable ScheduledFuture<?> scheduledFuture;
48     private final boolean mandatory;
49     private boolean receivedValue = false;
50
51     /**
52      * Implement this interface to be notified of an updated field.
53      */
54     public interface FieldChanged {
55         void fieldChanged(Field field, Object value);
56     }
57
58     /**
59      * Create a {@link SubscribeFieldToMQTTtopic}.
60      *
61      * @param scheduler A scheduler to realize subscription timeouts.
62      * @param field The destination field.
63      * @param fieldChangeListener A listener for field changes. This is only called if the received value
64      *            could successfully be converted to the field type.
65      * @param topic The MQTT topic.
66      * @param mandatory True of this field is a mandatory one. A timeout will cause a future to complete exceptionally.
67      */
68     public SubscribeFieldToMQTTtopic(ScheduledExecutorService scheduler, Field field, FieldChanged fieldChangeListener,
69             String topic, boolean mandatory) {
70         this.scheduler = scheduler;
71         this.field = field;
72         this.changeConsumer = fieldChangeListener;
73         this.topic = topic;
74         this.mandatory = mandatory;
75     }
76
77     static Object numberConvert(Object value, Class<?> type) throws IllegalArgumentException, NumberFormatException {
78         Object result = value;
79         // Handle the conversion case of BigDecimal to Float,Double,Long,Integer and the respective
80         // primitive types
81         String typeName = type.getSimpleName();
82         if (value instanceof BigDecimal && !type.equals(BigDecimal.class)) {
83             BigDecimal bdValue = (BigDecimal) value;
84             if (type.equals(Float.class) || typeName.equals("float")) {
85                 result = bdValue.floatValue();
86             } else if (type.equals(Double.class) || typeName.equals("double")) {
87                 result = bdValue.doubleValue();
88             } else if (type.equals(Long.class) || typeName.equals("long")) {
89                 result = bdValue.longValue();
90             } else if (type.equals(Integer.class) || typeName.equals("int")) {
91                 result = bdValue.intValue();
92             }
93         } else
94         // Handle the conversion case of String to Float,Double,Long,Integer,BigDecimal and the respective
95         // primitive types
96         if (value instanceof String && !type.equals(String.class)) {
97             String bdValue = (String) value;
98             if (type.equals(Float.class) || typeName.equals("float")) {
99                 result = Float.valueOf(bdValue);
100             } else if (type.equals(Double.class) || typeName.equals("double")) {
101                 result = Double.valueOf(bdValue);
102             } else if (type.equals(Long.class) || typeName.equals("long")) {
103                 result = Long.valueOf(bdValue);
104             } else if (type.equals(BigDecimal.class)) {
105                 result = new BigDecimal(bdValue);
106             } else if (type.equals(Integer.class) || typeName.equals("int")) {
107                 result = Integer.valueOf(bdValue);
108             } else if (type.equals(Boolean.class) || typeName.equals("boolean")) {
109                 result = Boolean.valueOf(bdValue);
110             } else if (type.isEnum()) {
111                 @SuppressWarnings({ "rawtypes", "unchecked" })
112                 final Class<? extends Enum> enumType = (Class<? extends Enum>) type;
113                 @SuppressWarnings("unchecked")
114                 Enum<?> enumValue = Enum.valueOf(enumType, value.toString());
115                 result = enumValue;
116             }
117         }
118         return result;
119     }
120
121     /**
122      * Callback by the {@link MqttBrokerConnection} if a matching topic received a new value.
123      * Because routing is already done by aforementioned class, the topic parameter is not checked again.
124      *
125      * @param topic The MQTT topic. Not used.
126      * @param payload The MQTT payload.
127      */
128     @SuppressWarnings({ "null", "unused" })
129     @Override
130     public void processMessage(String topic, byte[] payload) {
131         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
132         if (scheduledFuture != null) { // Cancel timeout
133             scheduledFuture.cancel(false);
134             this.scheduledFuture = null;
135         }
136
137         if (payload.length == 0) {
138             logger.debug("NULL payload on topic: {}", topic);
139             return;
140         }
141
142         String valueStr = new String(payload, StandardCharsets.UTF_8);
143
144         // Check if there is a manipulation annotation attached to the field
145         final MQTTvalueTransform transform = field.getAnnotation(MQTTvalueTransform.class);
146         Object value;
147         if (transform != null) {
148             // Add a prefix/suffix to the value
149             valueStr = transform.prefix() + valueStr + transform.suffix();
150             // Split the value if the field is an array. Convert numbers/enums if necessary.
151             value = field.getType().isArray() ? valueStr.split(transform.splitCharacter())
152                     : numberConvert(valueStr, field.getType());
153         } else if (field.getType().isArray()) {
154             throw new IllegalArgumentException("No split character defined!");
155         } else {
156             // Convert numbers/enums if necessary
157             value = numberConvert(valueStr, field.getType());
158         }
159         receivedValue = true;
160         changeConsumer.fieldChanged(field, value);
161         future.complete(null);
162     }
163
164     void timeoutReached() {
165         if (mandatory) {
166             future.completeExceptionally(new Exception("Did not receive mandatory topic value: " + topic));
167         } else {
168             future.complete(null);
169         }
170     }
171
172     /**
173      * Subscribe to the MQTT topic. A {@link SubscribeFieldToMQTTtopic} cannot be stopped.
174      * You need to manually unsubscribe from the {@link #topic} before disposing.
175      *
176      * @param connection An MQTT connection.
177      * @param timeout Timeout in milliseconds. The returned future completes after this time even if no message has
178      *            been received for the MQTT topic.
179      * @return Returns a future that completes if either a value is received for the topic or a timeout happens.
180      * @throws MqttException If an MQTT IO exception happens this exception is thrown.
181      */
182     public CompletableFuture<@Nullable Void> subscribeAndReceive(MqttBrokerConnection connection, int timeout) {
183         connection.subscribe(topic, this).exceptionally(e -> {
184             logger.debug("Failed to subscribe to topic {}", topic, e);
185             final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
186             if (scheduledFuture != null) { // Cancel timeout
187                 scheduledFuture.cancel(false);
188                 this.scheduledFuture = null;
189             }
190             future.complete(null);
191             return false;
192         }).thenRun(() -> {
193             if (!future.isDone()) {
194                 this.scheduledFuture = scheduler.schedule(this::timeoutReached, timeout, TimeUnit.MILLISECONDS);
195             }
196         });
197         return future;
198     }
199
200     /**
201      * Return true if the corresponding field has received a value at least once.
202      */
203     public boolean hasReceivedValue() {
204         return receivedValue;
205     }
206
207     /**
208      * Return true if the corresponding field is mandatory.
209      */
210     public boolean isMandatory() {
211         return mandatory;
212     }
213 }