]> git.basschouten.com Git - openhab-addons.git/blob
681e5073e9af0d86cbb521b0d405af7c7a123004
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.nio.charset.StandardCharsets;
16 import java.util.IllegalFormatException;
17 import java.util.Optional;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.mqtt.generic.values.TextValue;
26 import org.openhab.binding.mqtt.generic.values.Value;
27 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
28 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
29 import org.openhab.core.library.types.DecimalType;
30 import org.openhab.core.library.types.QuantityType;
31 import org.openhab.core.library.types.StopMoveType;
32 import org.openhab.core.library.types.StringType;
33 import org.openhab.core.thing.ChannelUID;
34 import org.openhab.core.thing.binding.generic.ChannelTransformation;
35 import org.openhab.core.types.Command;
36 import org.openhab.core.types.State;
37 import org.openhab.core.types.Type;
38 import org.openhab.core.types.TypeParser;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * This object consists of a {@link Value}, which is updated on the respective MQTT topic change.
44  * Updates to the value are propagated via the {@link ChannelStateUpdateListener}.
45  *
46  * @author David Graeff - Initial contribution
47  */
48 @NonNullByDefault
49 public class ChannelState implements MqttMessageSubscriber {
50     private final Logger logger = LoggerFactory.getLogger(ChannelState.class);
51
52     // Immutable channel configuration
53     protected final boolean readOnly;
54     protected final ChannelConfig config;
55
56     /** Channel value **/
57     protected final Value cachedValue;
58
59     // Runtime variables
60     protected ChannelUID channelUID;
61     private @Nullable MqttBrokerConnection connection;
62     protected final ChannelTransformation incomingTransformation;
63     protected final ChannelTransformation outgoingTransformation;
64     private @Nullable ChannelStateUpdateListener channelStateUpdateListener;
65     protected boolean hasSubscribed = false;
66     private @Nullable ScheduledFuture<?> scheduledFuture;
67     private CompletableFuture<@Nullable Void> future = CompletableFuture.completedFuture(null);
68     private final Object futureLock = new Object();
69
70     /**
71      * Creates a new channel state.
72      *
73      * @param config The channel configuration
74      * @param channelUID The channelUID is used for the {@link ChannelStateUpdateListener} to notify about value changes
75      * @param cachedValue MQTT only notifies us once about a value, during the subscribe. The channel state therefore
76      *            needs a cache for the current value.
77      * @param channelStateUpdateListener A channel state update listener
78      */
79     public ChannelState(ChannelConfig config, ChannelUID channelUID, Value cachedValue,
80             @Nullable ChannelStateUpdateListener channelStateUpdateListener) {
81         this(config, channelUID, cachedValue, channelStateUpdateListener,
82                 new ChannelTransformation(config.transformationPattern),
83                 new ChannelTransformation(config.transformationPatternOut));
84     }
85
86     /**
87      * Creates a new channel state.
88      *
89      * @param config The channel configuration
90      * @param channelUID The channelUID is used for the {@link ChannelStateUpdateListener} to notify about value changes
91      * @param cachedValue MQTT only notifies us once about a value, during the subscribe. The channel state therefore
92      *            needs a cache for the current value.
93      * @param channelStateUpdateListener A channel state update listener
94      * @param incomingTransformation A transformation to apply to incoming values
95      * @param outgoingTransformation A transformation to apply to outgoing values
96      */
97     public ChannelState(ChannelConfig config, ChannelUID channelUID, Value cachedValue,
98             @Nullable ChannelStateUpdateListener channelStateUpdateListener,
99             @Nullable ChannelTransformation incomingTransformation,
100             @Nullable ChannelTransformation outgoingTransformation) {
101         this.config = config;
102         this.channelStateUpdateListener = channelStateUpdateListener;
103         this.channelUID = channelUID;
104         this.cachedValue = cachedValue;
105         this.readOnly = config.commandTopic.isBlank();
106         this.incomingTransformation = incomingTransformation == null ? new ChannelTransformation((String) null)
107                 : incomingTransformation;
108         this.outgoingTransformation = outgoingTransformation == null ? new ChannelTransformation((String) null)
109                 : outgoingTransformation;
110     }
111
112     public boolean isReadOnly() {
113         return this.readOnly;
114     }
115
116     /**
117      * Returns the cached value state object of this message subscriber.
118      * <p>
119      * MQTT only notifies us once about a value, during the subscribe.
120      * The channel state therefore needs a cache for the current value.
121      * If MQTT has not yet published a value, the cache might still be in UNDEF state.
122      * </p>
123      */
124     public Value getCache() {
125         return cachedValue;
126     }
127
128     /**
129      * Return the channelUID
130      */
131     public ChannelUID channelUID() {
132         return channelUID;
133     }
134
135     // If the UID of the channel changed after it was initially created
136     public void setChannelUID(ChannelUID channelUID) {
137         this.channelUID = channelUID;
138     }
139
140     /**
141      * Incoming message from the MqttBrokerConnection
142      *
143      * @param topic The topic. Is the same as the field stateTopic.
144      * @param payload The byte payload. Must be UTF8 encoded text or binary data.
145      */
146     @Override
147     public void processMessage(String topic, byte[] payload) {
148         final ChannelStateUpdateListener channelStateUpdateListener = this.channelStateUpdateListener;
149         if (channelStateUpdateListener == null) {
150             logger.warn("MQTT message received for topic {}, but MessageSubscriber object hasn't been started!", topic);
151             return;
152         }
153
154         if (cachedValue.isBinary()) {
155             cachedValue.update(payload);
156             channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
157             receivedOrTimeout();
158             return;
159         }
160
161         // String value: Apply transformations
162         String strValue = new String(payload, StandardCharsets.UTF_8);
163         if (incomingTransformation.isPresent()) {
164             Optional<String> transformedValue = incomingTransformation.apply(strValue);
165             if (transformedValue.isEmpty()) {
166                 logger.debug("Transformation '{}' returned null on '{}', discarding message", strValue,
167                         incomingTransformation);
168                 receivedOrTimeout();
169                 return;
170             }
171             strValue = transformedValue.get();
172         }
173
174         // Is trigger?: Special handling
175         if (config.trigger) {
176             channelStateUpdateListener.triggerChannel(channelUID, strValue);
177             receivedOrTimeout();
178             return;
179         }
180
181         Command command = TypeParser.parseCommand(cachedValue.getSupportedCommandTypes(), strValue);
182         if (command == null) {
183             logger.warn("Incoming payload '{}' on '{}' not supported by type '{}'", strValue, topic,
184                     cachedValue.getClass().getSimpleName());
185             receivedOrTimeout();
186             return;
187         }
188
189         Type parsedType;
190         // Map the string to a command, update the cached value and post the command to the framework
191         try {
192             parsedType = cachedValue.parseMessage(command);
193         } catch (IllegalArgumentException | IllegalStateException e) {
194             logger.warn("Command '{}' from channel '{}' not supported by type '{}': {}", strValue, channelUID,
195                     cachedValue.getClass().getSimpleName(), e.getMessage());
196             receivedOrTimeout();
197             return;
198         }
199
200         if (parsedType instanceof State parsedState) {
201             cachedValue.update(parsedState);
202         } else {
203             // things that are only Commands _must_ be posted as a command (like STOP)
204             channelStateUpdateListener.postChannelCommand(channelUID, (Command) parsedType);
205             receivedOrTimeout();
206             return;
207         }
208
209         State newState = cachedValue.getChannelState();
210         // If the user explicitly wants a command sent, not an update, do that. But
211         // we have to check that the state is even possible to send as a command
212         // (i.e. not UNDEF)
213         if (config.postCommand && newState instanceof Command newCommand) {
214             channelStateUpdateListener.postChannelCommand(channelUID, newCommand);
215         } else {
216             channelStateUpdateListener.updateChannelState(channelUID, newState);
217         }
218         receivedOrTimeout();
219     }
220
221     /**
222      * Returns the state topic. Might be an empty string if this is a stateless channel (TRIGGER kind channel).
223      */
224     public String getStateTopic() {
225         return config.stateTopic;
226     }
227
228     /**
229      * Return the command topic. Might be an empty string, if this is a read-only channel.
230      */
231     public String getCommandTopic() {
232         return config.commandTopic;
233     }
234
235     /**
236      * Returns the channelType ID which also happens to be an item-type
237      */
238     public String getItemType() {
239         return cachedValue.getItemType();
240     }
241
242     /**
243      * Returns true if this is a stateful channel.
244      */
245     public boolean isStateful() {
246         return config.retained;
247     }
248
249     /**
250      * Removes the subscription to the state topic and resets the channelStateUpdateListener.
251      *
252      * @return A future that completes with true if unsubscribing from the state topic succeeded.
253      *         It completes with false if no connection is established and completes exceptionally otherwise.
254      */
255     public CompletableFuture<@Nullable Void> stop() {
256         final MqttBrokerConnection connection = this.connection;
257         if (connection != null && !config.stateTopic.isBlank()) {
258             return connection.unsubscribe(config.stateTopic, this).thenRun(this::internalStop);
259         } else {
260             internalStop();
261             return CompletableFuture.completedFuture(null);
262         }
263     }
264
265     private void internalStop() {
266         logger.debug("Unsubscribed channel {} from topic: {}", this.channelUID, config.stateTopic);
267         this.connection = null;
268         this.channelStateUpdateListener = null;
269         hasSubscribed = false;
270         cachedValue.resetState();
271     }
272
273     private void receivedOrTimeout() {
274         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
275         if (scheduledFuture != null) { // Cancel timeout
276             scheduledFuture.cancel(false);
277             this.scheduledFuture = null;
278         }
279         future.complete(null);
280     }
281
282     private @Nullable Void subscribeFail(Throwable e) {
283         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
284         if (scheduledFuture != null) { // Cancel timeout
285             scheduledFuture.cancel(false);
286             this.scheduledFuture = null;
287         }
288         future.completeExceptionally(e);
289         return null;
290     }
291
292     /**
293      * Subscribes to the state topic on the given connection and informs about updates on the given listener.
294      *
295      * @param connection A broker connection
296      * @param scheduler A scheduler to realize the timeout
297      * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
298      * @return A future that completes with true if the subscribing worked, with false if the stateTopic is not set
299      *         and exceptionally otherwise.
300      */
301     public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
302             int timeout) {
303         synchronized (futureLock) {
304             // if the connection is still the same, the subscription is still present, otherwise we need to renew
305             if ((hasSubscribed || !future.isDone()) && connection.equals(this.connection)) {
306                 return future;
307             }
308             hasSubscribed = false;
309
310             this.connection = connection;
311
312             if (config.stateTopic.isBlank()) {
313                 return CompletableFuture.completedFuture(null);
314             }
315
316             this.future = new CompletableFuture<>();
317         }
318         connection.subscribe(config.stateTopic, this).thenRun(() -> {
319             hasSubscribed = true;
320             logger.debug("Subscribed channel {} to topic: {}", this.channelUID, config.stateTopic);
321             if (timeout > 0 && !future.isDone()) {
322                 this.scheduledFuture = scheduler.schedule(this::receivedOrTimeout, timeout, TimeUnit.MILLISECONDS);
323             } else {
324                 receivedOrTimeout();
325             }
326         }).exceptionally(this::subscribeFail);
327         return future;
328     }
329
330     /**
331      * Return true if this channel has subscribed to its MQTT topics.
332      * You need to call {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} and
333      * have a stateTopic set, to subscribe this channel.
334      */
335     public boolean hasSubscribed() {
336         return this.hasSubscribed;
337     }
338
339     /**
340      * Publishes a value on MQTT. A command topic needs to be set in the configuration.
341      *
342      * @param command The command to send
343      * @return A future that completes with true if the publishing worked and false if it is a readonly topic
344      *         and exceptionally otherwise.
345      */
346     public CompletableFuture<Boolean> publishValue(Command command) {
347         final MqttBrokerConnection connection = this.connection;
348
349         if (connection == null) {
350             CompletableFuture<Boolean> f = new CompletableFuture<>();
351             f.completeExceptionally(new IllegalStateException(
352                     "The connection object has not been set. start() should have been called!"));
353             return f;
354         }
355
356         Command mqttCommandValue = cachedValue.parseCommand(command);
357         Value mqttFormatter = cachedValue;
358
359         if (readOnly) {
360             logger.debug(
361                     "You have tried to publish {} to the mqtt topic '{}' that was marked read-only. You can't 'set' anything on a sensor state topic for example.",
362                     mqttCommandValue, config.commandTopic);
363             return CompletableFuture.completedFuture(false);
364         }
365
366         // Outgoing transformations
367         if (outgoingTransformation.isPresent()) {
368             Command cValue = mqttCommandValue;
369             // Only pass numeric value for QuantityType.
370             if (mqttCommandValue instanceof QuantityType<?> qtCommandValue) {
371                 cValue = new DecimalType(qtCommandValue.toBigDecimal());
372             }
373             String commandString = mqttFormatter.getMQTTpublishValue(cValue, "%s");
374             Optional<String> transformedValue = outgoingTransformation.apply(commandString);
375             if (transformedValue.isEmpty()) {
376                 logger.debug("Transformation '{}' returned null on '{}', discarding message", outgoingTransformation,
377                         commandString);
378                 return CompletableFuture.completedFuture(false);
379             }
380
381             mqttFormatter = new TextValue();
382             mqttCommandValue = new StringType(transformedValue.get());
383         }
384
385         String commandString;
386
387         // Formatter: Applied before the channel state value is published to the MQTT broker.
388         if (config.formatBeforePublish.length() > 0) {
389             try {
390                 Command cValue = mqttCommandValue;
391                 // Only pass numeric value for QuantityType of format pattern is %s.
392                 if ((mqttCommandValue instanceof QuantityType<?> qtCommandValue)
393                         && ("%s".equals(config.formatBeforePublish) || "%S".equals(config.formatBeforePublish))) {
394                     cValue = new DecimalType(qtCommandValue.toBigDecimal());
395                 }
396                 commandString = mqttFormatter.getMQTTpublishValue(cValue, config.formatBeforePublish);
397             } catch (IllegalFormatException e) {
398                 logger.debug("Format pattern incorrect for {}", channelUID, e);
399                 commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
400             }
401         } else {
402             commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
403         }
404
405         int qos = (config.qos != null) ? config.qos : connection.getQos();
406
407         String commandTopic;
408         if (command.equals(StopMoveType.STOP) && !config.stopCommandTopic.isEmpty()) {
409             commandTopic = config.stopCommandTopic;
410         } else {
411             commandTopic = config.commandTopic;
412         }
413         return connection.publish(commandTopic, commandString.getBytes(), qos, config.retained);
414     }
415
416     /**
417      * @return The channelStateUpdateListener
418      */
419     public @Nullable ChannelStateUpdateListener getChannelStateUpdateListener() {
420         return channelStateUpdateListener;
421     }
422
423     /**
424      * @param channelStateUpdateListener The channelStateUpdateListener to set
425      */
426     public void setChannelStateUpdateListener(ChannelStateUpdateListener channelStateUpdateListener) {
427         this.channelStateUpdateListener = channelStateUpdateListener;
428     }
429
430     public @Nullable MqttBrokerConnection getConnection() {
431         return connection;
432     }
433
434     /**
435      * This is for tests only to inject a broker connection. Use
436      * {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} instead.
437      *
438      * @param connection MQTT Broker connection
439      */
440     public void setConnection(MqttBrokerConnection connection) {
441         this.connection = connection;
442     }
443 }