2 * Copyright (c) 2010-2023 Contributors to the openHAB project
4 * See the NOTICE file(s) distributed with this work for additional
7 * This program and the accompanying materials are made available under the
8 * terms of the Eclipse Public License 2.0 which is available at
9 * http://www.eclipse.org/legal/epl-2.0
11 * SPDX-License-Identifier: EPL-2.0
13 package org.openhab.binding.mqtt.generic.mapping;
15 import java.lang.reflect.Field;
16 import java.math.BigDecimal;
17 import java.nio.charset.StandardCharsets;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
26 import org.openhab.core.io.transport.mqtt.MqttException;
27 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
32 * Use this class to subscribe to a given MQTT topic via a {@link MqttMessageSubscriber}
33 * and convert received values to the type of the given field and notify the user of the changed value.
35 * Used by {@link AbstractMqttAttributeClass}.
37 * @author David Graeff - Initial contribution
40 public class SubscribeFieldToMQTTtopic implements MqttMessageSubscriber {
41 private final Logger logger = LoggerFactory.getLogger(SubscribeFieldToMQTTtopic.class);
42 protected CompletableFuture<@Nullable Void> future = new CompletableFuture<>();
43 public final Field field;
44 public final FieldChanged changeConsumer;
45 public final String topic;
46 private final ScheduledExecutorService scheduler;
47 private @Nullable ScheduledFuture<?> scheduledFuture;
48 private final boolean mandatory;
49 private boolean receivedValue = false;
52 * Implement this interface to be notified of an updated field.
54 public interface FieldChanged {
55 void fieldChanged(Field field, Object value);
59 * Create a {@link SubscribeFieldToMQTTtopic}.
61 * @param scheduler A scheduler to realize subscription timeouts.
62 * @param field The destination field.
63 * @param fieldChangeListener A listener for field changes. This is only called if the received value
64 * could successfully be converted to the field type.
65 * @param topic The MQTT topic.
66 * @param mandatory True of this field is a mandatory one. A timeout will cause a future to complete exceptionally.
68 public SubscribeFieldToMQTTtopic(ScheduledExecutorService scheduler, Field field, FieldChanged fieldChangeListener,
69 String topic, boolean mandatory) {
70 this.scheduler = scheduler;
72 this.changeConsumer = fieldChangeListener;
74 this.mandatory = mandatory;
77 static Object numberConvert(Object value, Class<?> type) throws IllegalArgumentException, NumberFormatException {
78 Object result = value;
79 // Handle the conversion case of BigDecimal to Float,Double,Long,Integer and the respective
81 String typeName = type.getSimpleName();
82 if (value instanceof BigDecimal bdValue && !type.equals(BigDecimal.class)) {
83 if (type.equals(Float.class) || "float".equals(typeName)) {
84 result = bdValue.floatValue();
85 } else if (type.equals(Double.class) || "double".equals(typeName)) {
86 result = bdValue.doubleValue();
87 } else if (type.equals(Long.class) || "long".equals(typeName)) {
88 result = bdValue.longValue();
89 } else if (type.equals(Integer.class) || "int".equals(typeName)) {
90 result = bdValue.intValue();
93 // Handle the conversion case of String to Float,Double,Long,Integer,BigDecimal and the respective
95 if (value instanceof String bdValue && !type.equals(String.class)) {
96 if (type.equals(Float.class) || "float".equals(typeName)) {
97 result = Float.valueOf(bdValue);
98 } else if (type.equals(Double.class) || "double".equals(typeName)) {
99 result = Double.valueOf(bdValue);
100 } else if (type.equals(Long.class) || "long".equals(typeName)) {
101 result = Long.valueOf(bdValue);
102 } else if (type.equals(BigDecimal.class)) {
103 result = new BigDecimal(bdValue);
104 } else if (type.equals(Integer.class) || "int".equals(typeName)) {
105 result = Integer.valueOf(bdValue);
106 } else if (type.equals(Boolean.class) || "boolean".equals(typeName)) {
107 result = Boolean.valueOf(bdValue);
108 } else if (type.isEnum()) {
109 @SuppressWarnings({ "rawtypes", "unchecked" })
110 final Class<? extends Enum> enumType = (Class<? extends Enum>) type;
111 @SuppressWarnings("unchecked")
112 Enum<?> enumValue = Enum.valueOf(enumType, value.toString());
120 * Callback by the {@link MqttBrokerConnection} if a matching topic received a new value.
121 * Because routing is already done by aforementioned class, the topic parameter is not checked again.
123 * @param topic The MQTT topic. Not used.
124 * @param payload The MQTT payload.
126 @SuppressWarnings({ "null", "unused" })
128 public void processMessage(String topic, byte[] payload) {
129 final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
130 if (scheduledFuture != null) { // Cancel timeout
131 scheduledFuture.cancel(false);
132 this.scheduledFuture = null;
135 if (payload.length == 0) {
136 logger.debug("NULL payload on topic: {}", topic);
140 String valueStr = new String(payload, StandardCharsets.UTF_8);
141 String originalValueStr = valueStr;
143 // Check if there is a manipulation annotation attached to the field
145 final MQTTvalueTransform transform = field.getAnnotation(MQTTvalueTransform.class);
147 if (transform != null) {
148 // Add a prefix/suffix to the value
149 valueStr = transform.prefix() + valueStr + transform.suffix();
150 // Split the value if the field is an array. Convert numbers/enums if necessary.
151 value = field.getType().isArray() ? valueStr.split(transform.splitCharacter())
152 : numberConvert(valueStr, field.getType());
153 } else if (field.getType().isArray()) {
154 throw new IllegalArgumentException("No split character defined!");
156 // Convert numbers/enums if necessary
157 value = numberConvert(valueStr, field.getType());
159 receivedValue = true;
160 changeConsumer.fieldChanged(field, value);
161 future.complete(null);
162 } catch (IllegalArgumentException e) {
164 future.completeExceptionally(e);
166 logger.warn("Unable to interpret {} from topic {}", originalValueStr, topic);
167 future.complete(null);
172 void timeoutReached() {
174 future.completeExceptionally(new Exception("Did not receive mandatory topic value: " + topic));
176 future.complete(null);
181 * Subscribe to the MQTT topic. A {@link SubscribeFieldToMQTTtopic} cannot be stopped.
182 * You need to manually unsubscribe from the {@link #topic} before disposing.
184 * @param connection An MQTT connection.
185 * @param timeout Timeout in milliseconds. The returned future completes after this time even if no message has
186 * been received for the MQTT topic.
187 * @return Returns a future that completes if either a value is received for the topic or a timeout happens.
188 * @throws MqttException If an MQTT IO exception happens this exception is thrown.
190 public CompletableFuture<@Nullable Void> subscribeAndReceive(MqttBrokerConnection connection, int timeout) {
191 connection.subscribe(topic, this).exceptionally(e -> {
192 logger.debug("Failed to subscribe to topic {}", topic, e);
193 final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
194 if (scheduledFuture != null) { // Cancel timeout
195 scheduledFuture.cancel(false);
196 this.scheduledFuture = null;
198 future.complete(null);
201 if (!future.isDone()) {
202 this.scheduledFuture = scheduler.schedule(this::timeoutReached, timeout, TimeUnit.MILLISECONDS);
209 * Return true if the corresponding field has received a value at least once.
211 public boolean hasReceivedValue() {
212 return receivedValue;
216 * Return true if the corresponding field is mandatory.
218 public boolean isMandatory() {