]> git.basschouten.com Git - openhab-addons.git/blob
06ea08fbdbc531cdbf9f08f68fea1f3f2dcb0dc2
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.nio.charset.StandardCharsets;
16 import java.util.ArrayList;
17 import java.util.IllegalFormatException;
18 import java.util.List;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.ScheduledFuture;
22 import java.util.concurrent.TimeUnit;
23 import java.util.stream.Stream;
24
25 import org.eclipse.jdt.annotation.NonNullByDefault;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.openhab.binding.mqtt.generic.values.TextValue;
28 import org.openhab.binding.mqtt.generic.values.Value;
29 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
30 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
31 import org.openhab.core.library.types.DecimalType;
32 import org.openhab.core.library.types.QuantityType;
33 import org.openhab.core.library.types.StopMoveType;
34 import org.openhab.core.library.types.StringType;
35 import org.openhab.core.thing.ChannelUID;
36 import org.openhab.core.types.Command;
37 import org.openhab.core.types.State;
38 import org.openhab.core.types.Type;
39 import org.openhab.core.types.TypeParser;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 /**
44  * This object consists of a {@link Value}, which is updated on the respective MQTT topic change.
45  * Updates to the value are propagated via the {@link ChannelStateUpdateListener}.
46  *
47  * @author David Graeff - Initial contribution
48  */
49 @NonNullByDefault
50 public class ChannelState implements MqttMessageSubscriber {
51     private final Logger logger = LoggerFactory.getLogger(ChannelState.class);
52
53     // Immutable channel configuration
54     protected final boolean readOnly;
55     protected final ChannelUID channelUID;
56     protected final ChannelConfig config;
57
58     /** Channel value **/
59     protected final Value cachedValue;
60
61     // Runtime variables
62     private @Nullable MqttBrokerConnection connection;
63     protected final List<ChannelStateTransformation> transformationsIn = new ArrayList<>();
64     protected final List<ChannelStateTransformation> transformationsOut = new ArrayList<>();
65     private @Nullable ChannelStateUpdateListener channelStateUpdateListener;
66     protected boolean hasSubscribed = false;
67     private @Nullable ScheduledFuture<?> scheduledFuture;
68     private CompletableFuture<@Nullable Void> future = CompletableFuture.completedFuture(null);
69     private final Object futureLock = new Object();
70
71     /**
72      * Creates a new channel state.
73      *
74      * @param config The channel configuration
75      * @param channelUID The channelUID is used for the {@link ChannelStateUpdateListener} to notify about value changes
76      * @param cachedValue MQTT only notifies us once about a value, during the subscribe. The channel state therefore
77      *            needs a cache for the current value.
78      * @param channelStateUpdateListener A channel state update listener
79      */
80     public ChannelState(ChannelConfig config, ChannelUID channelUID, Value cachedValue,
81             @Nullable ChannelStateUpdateListener channelStateUpdateListener) {
82         this.config = config;
83         this.channelStateUpdateListener = channelStateUpdateListener;
84         this.channelUID = channelUID;
85         this.cachedValue = cachedValue;
86         this.readOnly = config.commandTopic.isBlank();
87     }
88
89     public boolean isReadOnly() {
90         return this.readOnly;
91     }
92
93     /**
94      * Add a transformation that is applied for each received MQTT topic value.
95      * The transformations are executed in order.
96      *
97      * @param transformation A transformation
98      */
99     public void addTransformation(ChannelStateTransformation transformation) {
100         transformationsIn.add(transformation);
101     }
102
103     public void addTransformation(String transformation, TransformationServiceProvider transformationServiceProvider) {
104         parseTransformation(transformation, transformationServiceProvider).forEach(t -> addTransformation(t));
105     }
106
107     /**
108      * Add a transformation that is applied for each value to be published.
109      * The transformations are executed in order.
110      *
111      * @param transformation A transformation
112      */
113     public void addTransformationOut(ChannelStateTransformation transformation) {
114         transformationsOut.add(transformation);
115     }
116
117     public void addTransformationOut(String transformation,
118             TransformationServiceProvider transformationServiceProvider) {
119         parseTransformation(transformation, transformationServiceProvider).forEach(t -> addTransformationOut(t));
120     }
121
122     public static Stream<ChannelStateTransformation> parseTransformation(String transformation,
123             TransformationServiceProvider transformationServiceProvider) {
124         String[] transformations = transformation.split("∩");
125         return Stream.of(transformations).filter(t -> !t.isBlank())
126                 .map(t -> new ChannelStateTransformation(t, transformationServiceProvider));
127     }
128
129     /**
130      * Clear transformations
131      */
132     public void clearTransformations() {
133         transformationsIn.clear();
134         transformationsOut.clear();
135     }
136
137     /**
138      * Returns the cached value state object of this message subscriber.
139      * <p>
140      * MQTT only notifies us once about a value, during the subscribe.
141      * The channel state therefore needs a cache for the current value.
142      * If MQTT has not yet published a value, the cache might still be in UNDEF state.
143      * </p>
144      */
145     public Value getCache() {
146         return cachedValue;
147     }
148
149     /**
150      * Return the channelUID
151      */
152     public ChannelUID channelUID() {
153         return channelUID;
154     }
155
156     /**
157      * Incoming message from the MqttBrokerConnection
158      *
159      * @param topic The topic. Is the same as the field stateTopic.
160      * @param payload The byte payload. Must be UTF8 encoded text or binary data.
161      */
162     @Override
163     public void processMessage(String topic, byte[] payload) {
164         final ChannelStateUpdateListener channelStateUpdateListener = this.channelStateUpdateListener;
165         if (channelStateUpdateListener == null) {
166             logger.warn("MQTT message received for topic {}, but MessageSubscriber object hasn't been started!", topic);
167             return;
168         }
169
170         if (cachedValue.isBinary()) {
171             cachedValue.update(payload);
172             channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
173             receivedOrTimeout();
174             return;
175         }
176
177         // String value: Apply transformations
178         String strValue = new String(payload, StandardCharsets.UTF_8);
179         for (ChannelStateTransformation t : transformationsIn) {
180             String transformedValue = t.processValue(strValue);
181             if (transformedValue != null) {
182                 strValue = transformedValue;
183             } else {
184                 logger.debug("Transformation '{}' returned null on '{}', discarding message", strValue, t.serviceName);
185                 receivedOrTimeout();
186                 return;
187             }
188         }
189
190         // Is trigger?: Special handling
191         if (config.trigger) {
192             channelStateUpdateListener.triggerChannel(channelUID, strValue);
193             receivedOrTimeout();
194             return;
195         }
196
197         Command command = TypeParser.parseCommand(cachedValue.getSupportedCommandTypes(), strValue);
198         if (command == null) {
199             logger.warn("Incoming payload '{}' on '{}' not supported by type '{}'", strValue, topic,
200                     cachedValue.getClass().getSimpleName());
201             receivedOrTimeout();
202             return;
203         }
204
205         Type parsedType;
206         // Map the string to a command, update the cached value and post the command to the framework
207         try {
208             parsedType = cachedValue.parseMessage(command);
209         } catch (IllegalArgumentException | IllegalStateException e) {
210             logger.warn("Command '{}' from channel '{}' not supported by type '{}': {}", strValue, channelUID,
211                     cachedValue.getClass().getSimpleName(), e.getMessage());
212             receivedOrTimeout();
213             return;
214         }
215
216         if (parsedType instanceof State parsedState) {
217             cachedValue.update(parsedState);
218         } else {
219             // things that are only Commands _must_ be posted as a command (like STOP)
220             channelStateUpdateListener.postChannelCommand(channelUID, (Command) parsedType);
221             receivedOrTimeout();
222             return;
223         }
224
225         State newState = cachedValue.getChannelState();
226         // If the user explicitly wants a command sent, not an update, do that. But
227         // we have to check that the state is even possible to send as a command
228         // (i.e. not UNDEF)
229         if (config.postCommand && newState instanceof Command newCommand) {
230             channelStateUpdateListener.postChannelCommand(channelUID, newCommand);
231         } else {
232             channelStateUpdateListener.updateChannelState(channelUID, newState);
233         }
234         receivedOrTimeout();
235     }
236
237     /**
238      * Returns the state topic. Might be an empty string if this is a stateless channel (TRIGGER kind channel).
239      */
240     public String getStateTopic() {
241         return config.stateTopic;
242     }
243
244     /**
245      * Return the command topic. Might be an empty string, if this is a read-only channel.
246      */
247     public String getCommandTopic() {
248         return config.commandTopic;
249     }
250
251     /**
252      * Returns the channelType ID which also happens to be an item-type
253      */
254     public String getItemType() {
255         return cachedValue.getItemType();
256     }
257
258     /**
259      * Returns true if this is a stateful channel.
260      */
261     public boolean isStateful() {
262         return config.retained;
263     }
264
265     /**
266      * Removes the subscription to the state topic and resets the channelStateUpdateListener.
267      *
268      * @return A future that completes with true if unsubscribing from the state topic succeeded.
269      *         It completes with false if no connection is established and completes exceptionally otherwise.
270      */
271     public CompletableFuture<@Nullable Void> stop() {
272         final MqttBrokerConnection connection = this.connection;
273         if (connection != null && !config.stateTopic.isBlank()) {
274             return connection.unsubscribe(config.stateTopic, this).thenRun(this::internalStop);
275         } else {
276             internalStop();
277             return CompletableFuture.completedFuture(null);
278         }
279     }
280
281     private void internalStop() {
282         logger.debug("Unsubscribed channel {} from topic: {}", this.channelUID, config.stateTopic);
283         this.connection = null;
284         this.channelStateUpdateListener = null;
285         hasSubscribed = false;
286         cachedValue.resetState();
287     }
288
289     private void receivedOrTimeout() {
290         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
291         if (scheduledFuture != null) { // Cancel timeout
292             scheduledFuture.cancel(false);
293             this.scheduledFuture = null;
294         }
295         future.complete(null);
296     }
297
298     private @Nullable Void subscribeFail(Throwable e) {
299         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
300         if (scheduledFuture != null) { // Cancel timeout
301             scheduledFuture.cancel(false);
302             this.scheduledFuture = null;
303         }
304         future.completeExceptionally(e);
305         return null;
306     }
307
308     /**
309      * Subscribes to the state topic on the given connection and informs about updates on the given listener.
310      *
311      * @param connection A broker connection
312      * @param scheduler A scheduler to realize the timeout
313      * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
314      * @return A future that completes with true if the subscribing worked, with false if the stateTopic is not set
315      *         and exceptionally otherwise.
316      */
317     public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
318             int timeout) {
319         synchronized (futureLock) {
320             // if the connection is still the same, the subscription is still present, otherwise we need to renew
321             if ((hasSubscribed || !future.isDone()) && connection.equals(this.connection)) {
322                 return future;
323             }
324             hasSubscribed = false;
325
326             this.connection = connection;
327
328             if (config.stateTopic.isBlank()) {
329                 return CompletableFuture.completedFuture(null);
330             }
331
332             this.future = new CompletableFuture<>();
333         }
334         connection.subscribe(config.stateTopic, this).thenRun(() -> {
335             hasSubscribed = true;
336             logger.debug("Subscribed channel {} to topic: {}", this.channelUID, config.stateTopic);
337             if (timeout > 0 && !future.isDone()) {
338                 this.scheduledFuture = scheduler.schedule(this::receivedOrTimeout, timeout, TimeUnit.MILLISECONDS);
339             } else {
340                 receivedOrTimeout();
341             }
342         }).exceptionally(this::subscribeFail);
343         return future;
344     }
345
346     /**
347      * Return true if this channel has subscribed to its MQTT topics.
348      * You need to call {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} and
349      * have a stateTopic set, to subscribe this channel.
350      */
351     public boolean hasSubscribed() {
352         return this.hasSubscribed;
353     }
354
355     /**
356      * Publishes a value on MQTT. A command topic needs to be set in the configuration.
357      *
358      * @param command The command to send
359      * @return A future that completes with true if the publishing worked and false if it is a readonly topic
360      *         and exceptionally otherwise.
361      */
362     public CompletableFuture<Boolean> publishValue(Command command) {
363         final MqttBrokerConnection connection = this.connection;
364
365         if (connection == null) {
366             CompletableFuture<Boolean> f = new CompletableFuture<>();
367             f.completeExceptionally(new IllegalStateException(
368                     "The connection object has not been set. start() should have been called!"));
369             return f;
370         }
371
372         Command mqttCommandValue = cachedValue.parseCommand(command);
373         Value mqttFormatter = cachedValue;
374
375         if (readOnly) {
376             logger.debug(
377                     "You have tried to publish {} to the mqtt topic '{}' that was marked read-only. You can't 'set' anything on a sensor state topic for example.",
378                     mqttCommandValue, config.commandTopic);
379             return CompletableFuture.completedFuture(false);
380         }
381
382         // Outgoing transformations
383         for (ChannelStateTransformation t : transformationsOut) {
384             Command cValue = mqttCommandValue;
385             // Only pass numeric value for QuantityType.
386             if (mqttCommandValue instanceof QuantityType<?> qtCommandValue) {
387                 cValue = new DecimalType(qtCommandValue.toBigDecimal());
388
389             }
390             String commandString = mqttFormatter.getMQTTpublishValue(cValue, "%s");
391             String transformedValue = t.processValue(commandString);
392             if (transformedValue != null) {
393                 mqttFormatter = new TextValue();
394                 mqttCommandValue = new StringType(transformedValue);
395             } else {
396                 logger.debug("Transformation '{}' returned null on '{}', discarding message", mqttCommandValue,
397                         t.serviceName);
398                 return CompletableFuture.completedFuture(false);
399             }
400         }
401
402         String commandString;
403
404         // Formatter: Applied before the channel state value is published to the MQTT broker.
405         if (config.formatBeforePublish.length() > 0) {
406             try {
407                 Command cValue = mqttCommandValue;
408                 // Only pass numeric value for QuantityType of format pattern is %s.
409                 if ((mqttCommandValue instanceof QuantityType<?> qtCommandValue)
410                         && ("%s".equals(config.formatBeforePublish) || "%S".equals(config.formatBeforePublish))) {
411                     cValue = new DecimalType(qtCommandValue.toBigDecimal());
412                 }
413                 commandString = mqttFormatter.getMQTTpublishValue(cValue, config.formatBeforePublish);
414             } catch (IllegalFormatException e) {
415                 logger.debug("Format pattern incorrect for {}", channelUID, e);
416                 commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
417             }
418         } else {
419             commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
420         }
421
422         int qos = (config.qos != null) ? config.qos : connection.getQos();
423
424         String commandTopic;
425         if (command.equals(StopMoveType.STOP) && !config.stopCommandTopic.isEmpty()) {
426             commandTopic = config.stopCommandTopic;
427         } else {
428             commandTopic = config.commandTopic;
429         }
430         return connection.publish(commandTopic, commandString.getBytes(), qos, config.retained);
431     }
432
433     /**
434      * @return The channelStateUpdateListener
435      */
436     public @Nullable ChannelStateUpdateListener getChannelStateUpdateListener() {
437         return channelStateUpdateListener;
438     }
439
440     /**
441      * @param channelStateUpdateListener The channelStateUpdateListener to set
442      */
443     public void setChannelStateUpdateListener(ChannelStateUpdateListener channelStateUpdateListener) {
444         this.channelStateUpdateListener = channelStateUpdateListener;
445     }
446
447     public @Nullable MqttBrokerConnection getConnection() {
448         return connection;
449     }
450
451     /**
452      * This is for tests only to inject a broker connection. Use
453      * {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} instead.
454      *
455      * @param connection MQTT Broker connection
456      */
457     public void setConnection(MqttBrokerConnection connection) {
458         this.connection = connection;
459     }
460 }