]> git.basschouten.com Git - openhab-addons.git/blob
832f94397e13a39cf52b4a2394ddc332e133cf37
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.nio.charset.StandardCharsets;
16 import java.util.ArrayList;
17 import java.util.IllegalFormatException;
18 import java.util.List;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.ScheduledFuture;
22 import java.util.concurrent.TimeUnit;
23 import java.util.stream.Stream;
24
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.binding.mqtt.generic.values.TextValue;
28 import org.openhab.binding.mqtt.generic.values.Value;
29 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
30 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
31 import org.openhab.core.library.types.DecimalType;
32 import org.openhab.core.library.types.QuantityType;
33 import org.openhab.core.library.types.StringType;
34 import org.openhab.core.thing.ChannelUID;
35 import org.openhab.core.types.Command;
36 import org.openhab.core.types.State;
37 import org.openhab.core.types.Type;
38 import org.openhab.core.types.TypeParser;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * This object consists of a {@link Value}, which is updated on the respective MQTT topic change.
44  * Updates to the value are propagated via the {@link ChannelStateUpdateListener}.
45  *
46  * @author David Graeff - Initial contribution
47  */
48 @NonNullByDefault
49 public class ChannelState implements MqttMessageSubscriber {
50     private final Logger logger = LoggerFactory.getLogger(ChannelState.class);
51
52     // Immutable channel configuration
53     protected final boolean readOnly;
54     protected final ChannelUID channelUID;
55     protected final ChannelConfig config;
56
57     /** Channel value **/
58     protected final Value cachedValue;
59
60     // Runtime variables
61     private @Nullable MqttBrokerConnection connection;
62     protected final List<ChannelStateTransformation> transformationsIn = new ArrayList<>();
63     protected final List<ChannelStateTransformation> transformationsOut = new ArrayList<>();
64     private @Nullable ChannelStateUpdateListener channelStateUpdateListener;
65     protected boolean hasSubscribed = false;
66     private @Nullable ScheduledFuture<?> scheduledFuture;
67     private CompletableFuture<@Nullable Void> future = CompletableFuture.completedFuture(null);
68     private final Object futureLock = new Object();
69
70     /**
71      * Creates a new channel state.
72      *
73      * @param config The channel configuration
74      * @param channelUID The channelUID is used for the {@link ChannelStateUpdateListener} to notify about value changes
75      * @param cachedValue MQTT only notifies us once about a value, during the subscribe. The channel state therefore
76      *            needs a cache for the current value.
77      * @param channelStateUpdateListener A channel state update listener
78      */
79     public ChannelState(ChannelConfig config, ChannelUID channelUID, Value cachedValue,
80             @Nullable ChannelStateUpdateListener channelStateUpdateListener) {
81         this.config = config;
82         this.channelStateUpdateListener = channelStateUpdateListener;
83         this.channelUID = channelUID;
84         this.cachedValue = cachedValue;
85         this.readOnly = config.commandTopic.isBlank();
86     }
87
88     public boolean isReadOnly() {
89         return this.readOnly;
90     }
91
92     /**
93      * Add a transformation that is applied for each received MQTT topic value.
94      * The transformations are executed in order.
95      *
96      * @param transformation A transformation
97      */
98     public void addTransformation(ChannelStateTransformation transformation) {
99         transformationsIn.add(transformation);
100     }
101
102     public void addTransformation(String transformation, TransformationServiceProvider transformationServiceProvider) {
103         parseTransformation(transformation, transformationServiceProvider).forEach(t -> addTransformation(t));
104     }
105
106     /**
107      * Add a transformation that is applied for each value to be published.
108      * The transformations are executed in order.
109      *
110      * @param transformation A transformation
111      */
112     public void addTransformationOut(ChannelStateTransformation transformation) {
113         transformationsOut.add(transformation);
114     }
115
116     public void addTransformationOut(String transformation,
117             TransformationServiceProvider transformationServiceProvider) {
118         parseTransformation(transformation, transformationServiceProvider).forEach(t -> addTransformationOut(t));
119     }
120
121     public static Stream<ChannelStateTransformation> parseTransformation(String transformation,
122             TransformationServiceProvider transformationServiceProvider) {
123         String[] transformations = transformation.split("∩");
124         return Stream.of(transformations).filter(t -> !t.isBlank())
125                 .map(t -> new ChannelStateTransformation(t, transformationServiceProvider));
126     }
127
128     /**
129      * Clear transformations
130      */
131     public void clearTransformations() {
132         transformationsIn.clear();
133         transformationsOut.clear();
134     }
135
136     /**
137      * Returns the cached value state object of this message subscriber.
138      * <p>
139      * MQTT only notifies us once about a value, during the subscribe.
140      * The channel state therefore needs a cache for the current value.
141      * If MQTT has not yet published a value, the cache might still be in UNDEF state.
142      * </p>
143      */
144     public Value getCache() {
145         return cachedValue;
146     }
147
148     /**
149      * Return the channelUID
150      */
151     public ChannelUID channelUID() {
152         return channelUID;
153     }
154
155     /**
156      * Incoming message from the MqttBrokerConnection
157      *
158      * @param topic The topic. Is the same as the field stateTopic.
159      * @param payload The byte payload. Must be UTF8 encoded text or binary data.
160      */
161     @Override
162     public void processMessage(String topic, byte[] payload) {
163         final ChannelStateUpdateListener channelStateUpdateListener = this.channelStateUpdateListener;
164         if (channelStateUpdateListener == null) {
165             logger.warn("MQTT message received for topic {}, but MessageSubscriber object hasn't been started!", topic);
166             return;
167         }
168
169         if (cachedValue.isBinary()) {
170             cachedValue.update(payload);
171             channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
172             receivedOrTimeout();
173             return;
174         }
175
176         // String value: Apply transformations
177         String strValue = new String(payload, StandardCharsets.UTF_8);
178         for (ChannelStateTransformation t : transformationsIn) {
179             String transformedValue = t.processValue(strValue);
180             if (transformedValue != null) {
181                 strValue = transformedValue;
182             } else {
183                 logger.debug("Transformation '{}' returned null on '{}', discarding message", strValue, t.serviceName);
184                 receivedOrTimeout();
185                 return;
186             }
187         }
188
189         // Is trigger?: Special handling
190         if (config.trigger) {
191             channelStateUpdateListener.triggerChannel(channelUID, strValue);
192             receivedOrTimeout();
193             return;
194         }
195
196         Command command = TypeParser.parseCommand(cachedValue.getSupportedCommandTypes(), strValue);
197         if (command == null) {
198             logger.warn("Incoming payload '{}' on '{}' not supported by type '{}'", strValue, topic,
199                     cachedValue.getClass().getSimpleName());
200             receivedOrTimeout();
201             return;
202         }
203
204         Type parsedType;
205         // Map the string to a command, update the cached value and post the command to the framework
206         try {
207             parsedType = cachedValue.parseMessage(command);
208         } catch (IllegalArgumentException | IllegalStateException e) {
209             logger.warn("Command '{}' from channel '{}' not supported by type '{}': {}", strValue, channelUID,
210                     cachedValue.getClass().getSimpleName(), e.getMessage());
211             receivedOrTimeout();
212             return;
213         }
214
215         if (parsedType instanceof State parsedState) {
216             cachedValue.update(parsedState);
217         } else {
218             // things that are only Commands _must_ be posted as a command (like STOP)
219             channelStateUpdateListener.postChannelCommand(channelUID, (Command) parsedType);
220             receivedOrTimeout();
221             return;
222         }
223
224         State newState = cachedValue.getChannelState();
225         // If the user explicitly wants a command sent, not an update, do that. But
226         // we have to check that the state is even possible to send as a command
227         // (i.e. not UNDEF)
228         if (config.postCommand && newState instanceof Command newCommand) {
229             channelStateUpdateListener.postChannelCommand(channelUID, newCommand);
230         } else {
231             channelStateUpdateListener.updateChannelState(channelUID, newState);
232         }
233         receivedOrTimeout();
234     }
235
236     /**
237      * Returns the state topic. Might be an empty string if this is a stateless channel (TRIGGER kind channel).
238      */
239     public String getStateTopic() {
240         return config.stateTopic;
241     }
242
243     /**
244      * Return the command topic. Might be an empty string, if this is a read-only channel.
245      */
246     public String getCommandTopic() {
247         return config.commandTopic;
248     }
249
250     /**
251      * Returns the channelType ID which also happens to be an item-type
252      */
253     public String getItemType() {
254         return cachedValue.getItemType();
255     }
256
257     /**
258      * Returns true if this is a stateful channel.
259      */
260     public boolean isStateful() {
261         return config.retained;
262     }
263
264     /**
265      * Removes the subscription to the state topic and resets the channelStateUpdateListener.
266      *
267      * @return A future that completes with true if unsubscribing from the state topic succeeded.
268      *         It completes with false if no connection is established and completes exceptionally otherwise.
269      */
270     public CompletableFuture<@Nullable Void> stop() {
271         final MqttBrokerConnection connection = this.connection;
272         if (connection != null && !config.stateTopic.isBlank()) {
273             return connection.unsubscribe(config.stateTopic, this).thenRun(this::internalStop);
274         } else {
275             internalStop();
276             return CompletableFuture.completedFuture(null);
277         }
278     }
279
280     private void internalStop() {
281         logger.debug("Unsubscribed channel {} from topic: {}", this.channelUID, config.stateTopic);
282         this.connection = null;
283         this.channelStateUpdateListener = null;
284         hasSubscribed = false;
285         cachedValue.resetState();
286     }
287
288     private void receivedOrTimeout() {
289         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
290         if (scheduledFuture != null) { // Cancel timeout
291             scheduledFuture.cancel(false);
292             this.scheduledFuture = null;
293         }
294         future.complete(null);
295     }
296
297     private @Nullable Void subscribeFail(Throwable e) {
298         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
299         if (scheduledFuture != null) { // Cancel timeout
300             scheduledFuture.cancel(false);
301             this.scheduledFuture = null;
302         }
303         future.completeExceptionally(e);
304         return null;
305     }
306
307     /**
308      * Subscribes to the state topic on the given connection and informs about updates on the given listener.
309      *
310      * @param connection A broker connection
311      * @param scheduler A scheduler to realize the timeout
312      * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
313      * @return A future that completes with true if the subscribing worked, with false if the stateTopic is not set
314      *         and exceptionally otherwise.
315      */
316     public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
317             int timeout) {
318         synchronized (futureLock) {
319             // if the connection is still the same, the subscription is still present, otherwise we need to renew
320             if ((hasSubscribed || !future.isDone()) && connection.equals(this.connection)) {
321                 return future;
322             }
323             hasSubscribed = false;
324
325             this.connection = connection;
326
327             if (config.stateTopic.isBlank()) {
328                 return CompletableFuture.completedFuture(null);
329             }
330
331             this.future = new CompletableFuture<>();
332         }
333         connection.subscribe(config.stateTopic, this).thenRun(() -> {
334             hasSubscribed = true;
335             logger.debug("Subscribed channel {} to topic: {}", this.channelUID, config.stateTopic);
336             if (timeout > 0 && !future.isDone()) {
337                 this.scheduledFuture = scheduler.schedule(this::receivedOrTimeout, timeout, TimeUnit.MILLISECONDS);
338             } else {
339                 receivedOrTimeout();
340             }
341         }).exceptionally(this::subscribeFail);
342         return future;
343     }
344
345     /**
346      * Return true if this channel has subscribed to its MQTT topics.
347      * You need to call {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} and
348      * have a stateTopic set, to subscribe this channel.
349      */
350     public boolean hasSubscribed() {
351         return this.hasSubscribed;
352     }
353
354     /**
355      * Publishes a value on MQTT. A command topic needs to be set in the configuration.
356      *
357      * @param command The command to send
358      * @return A future that completes with true if the publishing worked and false if it is a readonly topic
359      *         and exceptionally otherwise.
360      */
361     public CompletableFuture<Boolean> publishValue(Command command) {
362         final MqttBrokerConnection connection = this.connection;
363
364         if (connection == null) {
365             CompletableFuture<Boolean> f = new CompletableFuture<>();
366             f.completeExceptionally(new IllegalStateException(
367                     "The connection object has not been set. start() should have been called!"));
368             return f;
369         }
370
371         Command mqttCommandValue = cachedValue.parseCommand(command);
372         Value mqttFormatter = cachedValue;
373
374         if (readOnly) {
375             logger.debug(
376                     "You have tried to publish {} to the mqtt topic '{}' that was marked read-only. You can't 'set' anything on a sensor state topic for example.",
377                     mqttCommandValue, config.commandTopic);
378             return CompletableFuture.completedFuture(false);
379         }
380
381         // Outgoing transformations
382         for (ChannelStateTransformation t : transformationsOut) {
383             Command cValue = mqttCommandValue;
384             // Only pass numeric value for QuantityType.
385             if (mqttCommandValue instanceof QuantityType<?> qtCommandValue) {
386                 cValue = new DecimalType(qtCommandValue.toBigDecimal());
387
388             }
389             String commandString = mqttFormatter.getMQTTpublishValue(cValue, "%s");
390             String transformedValue = t.processValue(commandString);
391             if (transformedValue != null) {
392                 mqttFormatter = new TextValue();
393                 mqttCommandValue = new StringType(transformedValue);
394             } else {
395                 logger.debug("Transformation '{}' returned null on '{}', discarding message", mqttCommandValue,
396                         t.serviceName);
397                 return CompletableFuture.completedFuture(false);
398             }
399         }
400
401         String commandString;
402
403         // Formatter: Applied before the channel state value is published to the MQTT broker.
404         if (config.formatBeforePublish.length() > 0) {
405             try {
406                 Command cValue = mqttCommandValue;
407                 // Only pass numeric value for QuantityType of format pattern is %s.
408                 if ((mqttCommandValue instanceof QuantityType<?> qtCommandValue)
409                         && ("%s".equals(config.formatBeforePublish) || "%S".equals(config.formatBeforePublish))) {
410                     cValue = new DecimalType(qtCommandValue.toBigDecimal());
411                 }
412                 commandString = mqttFormatter.getMQTTpublishValue(cValue, config.formatBeforePublish);
413             } catch (IllegalFormatException e) {
414                 logger.debug("Format pattern incorrect for {}", channelUID, e);
415                 commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
416             }
417         } else {
418             commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
419         }
420
421         int qos = (config.qos != null) ? config.qos : connection.getQos();
422
423         return connection.publish(config.commandTopic, commandString.getBytes(), qos, config.retained);
424     }
425
426     /**
427      * @return The channelStateUpdateListener
428      */
429     public @Nullable ChannelStateUpdateListener getChannelStateUpdateListener() {
430         return channelStateUpdateListener;
431     }
432
433     /**
434      * @param channelStateUpdateListener The channelStateUpdateListener to set
435      */
436     public void setChannelStateUpdateListener(ChannelStateUpdateListener channelStateUpdateListener) {
437         this.channelStateUpdateListener = channelStateUpdateListener;
438     }
439
440     public @Nullable MqttBrokerConnection getConnection() {
441         return connection;
442     }
443
444     /**
445      * This is for tests only to inject a broker connection. Use
446      * {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} instead.
447      *
448      * @param connection MQTT Broker connection
449      */
450     public void setConnection(MqttBrokerConnection connection) {
451         this.connection = connection;
452     }
453 }