]> git.basschouten.com Git - openhab-addons.git/blob
e1d5da2ef590ebe4302b6c5ecc8c561dd1ddfd85
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2024 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.nio.charset.StandardCharsets;
16 import java.util.IllegalFormatException;
17 import java.util.Optional;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.openhab.binding.mqtt.generic.values.TextValue;
26 import org.openhab.binding.mqtt.generic.values.Value;
27 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
28 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
29 import org.openhab.core.library.types.DecimalType;
30 import org.openhab.core.library.types.QuantityType;
31 import org.openhab.core.library.types.StopMoveType;
32 import org.openhab.core.library.types.StringType;
33 import org.openhab.core.thing.ChannelUID;
34 import org.openhab.core.thing.binding.generic.ChannelTransformation;
35 import org.openhab.core.types.Command;
36 import org.openhab.core.types.State;
37 import org.openhab.core.types.Type;
38 import org.openhab.core.types.TypeParser;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * This object consists of a {@link Value}, which is updated on the respective MQTT topic change.
44  * Updates to the value are propagated via the {@link ChannelStateUpdateListener}.
45  *
46  * @author David Graeff - Initial contribution
47  */
48 @NonNullByDefault
49 public class ChannelState implements MqttMessageSubscriber {
50     private final Logger logger = LoggerFactory.getLogger(ChannelState.class);
51
52     // Immutable channel configuration
53     protected final boolean readOnly;
54     protected final ChannelUID channelUID;
55     protected final ChannelConfig config;
56
57     /** Channel value **/
58     protected final Value cachedValue;
59
60     // Runtime variables
61     private @Nullable MqttBrokerConnection connection;
62     protected final ChannelTransformation incomingTransformation;
63     protected final ChannelTransformation outgoingTransformation;
64     private @Nullable ChannelStateUpdateListener channelStateUpdateListener;
65     protected boolean hasSubscribed = false;
66     private @Nullable ScheduledFuture<?> scheduledFuture;
67     private CompletableFuture<@Nullable Void> future = CompletableFuture.completedFuture(null);
68     private final Object futureLock = new Object();
69
70     /**
71      * Creates a new channel state.
72      *
73      * @param config The channel configuration
74      * @param channelUID The channelUID is used for the {@link ChannelStateUpdateListener} to notify about value changes
75      * @param cachedValue MQTT only notifies us once about a value, during the subscribe. The channel state therefore
76      *            needs a cache for the current value.
77      * @param channelStateUpdateListener A channel state update listener
78      */
79     public ChannelState(ChannelConfig config, ChannelUID channelUID, Value cachedValue,
80             @Nullable ChannelStateUpdateListener channelStateUpdateListener) {
81         this.config = config;
82         this.channelStateUpdateListener = channelStateUpdateListener;
83         this.channelUID = channelUID;
84         this.cachedValue = cachedValue;
85         this.readOnly = config.commandTopic.isBlank();
86         this.incomingTransformation = new ChannelTransformation(config.transformationPattern);
87         this.outgoingTransformation = new ChannelTransformation(config.transformationPatternOut);
88     }
89
90     public boolean isReadOnly() {
91         return this.readOnly;
92     }
93
94     /**
95      * Returns the cached value state object of this message subscriber.
96      * <p>
97      * MQTT only notifies us once about a value, during the subscribe.
98      * The channel state therefore needs a cache for the current value.
99      * If MQTT has not yet published a value, the cache might still be in UNDEF state.
100      * </p>
101      */
102     public Value getCache() {
103         return cachedValue;
104     }
105
106     /**
107      * Return the channelUID
108      */
109     public ChannelUID channelUID() {
110         return channelUID;
111     }
112
113     /**
114      * Incoming message from the MqttBrokerConnection
115      *
116      * @param topic The topic. Is the same as the field stateTopic.
117      * @param payload The byte payload. Must be UTF8 encoded text or binary data.
118      */
119     @Override
120     public void processMessage(String topic, byte[] payload) {
121         final ChannelStateUpdateListener channelStateUpdateListener = this.channelStateUpdateListener;
122         if (channelStateUpdateListener == null) {
123             logger.warn("MQTT message received for topic {}, but MessageSubscriber object hasn't been started!", topic);
124             return;
125         }
126
127         if (cachedValue.isBinary()) {
128             cachedValue.update(payload);
129             channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
130             receivedOrTimeout();
131             return;
132         }
133
134         // String value: Apply transformations
135         String strValue = new String(payload, StandardCharsets.UTF_8);
136         if (incomingTransformation.isPresent()) {
137             Optional<String> transformedValue = incomingTransformation.apply(strValue);
138             if (transformedValue.isEmpty()) {
139                 logger.debug("Transformation '{}' returned null on '{}', discarding message", strValue,
140                         incomingTransformation);
141                 receivedOrTimeout();
142                 return;
143             }
144             strValue = transformedValue.get();
145         }
146
147         // Is trigger?: Special handling
148         if (config.trigger) {
149             channelStateUpdateListener.triggerChannel(channelUID, strValue);
150             receivedOrTimeout();
151             return;
152         }
153
154         Command command = TypeParser.parseCommand(cachedValue.getSupportedCommandTypes(), strValue);
155         if (command == null) {
156             logger.warn("Incoming payload '{}' on '{}' not supported by type '{}'", strValue, topic,
157                     cachedValue.getClass().getSimpleName());
158             receivedOrTimeout();
159             return;
160         }
161
162         Type parsedType;
163         // Map the string to a command, update the cached value and post the command to the framework
164         try {
165             parsedType = cachedValue.parseMessage(command);
166         } catch (IllegalArgumentException | IllegalStateException e) {
167             logger.warn("Command '{}' from channel '{}' not supported by type '{}': {}", strValue, channelUID,
168                     cachedValue.getClass().getSimpleName(), e.getMessage());
169             receivedOrTimeout();
170             return;
171         }
172
173         if (parsedType instanceof State parsedState) {
174             cachedValue.update(parsedState);
175         } else {
176             // things that are only Commands _must_ be posted as a command (like STOP)
177             channelStateUpdateListener.postChannelCommand(channelUID, (Command) parsedType);
178             receivedOrTimeout();
179             return;
180         }
181
182         State newState = cachedValue.getChannelState();
183         // If the user explicitly wants a command sent, not an update, do that. But
184         // we have to check that the state is even possible to send as a command
185         // (i.e. not UNDEF)
186         if (config.postCommand && newState instanceof Command newCommand) {
187             channelStateUpdateListener.postChannelCommand(channelUID, newCommand);
188         } else {
189             channelStateUpdateListener.updateChannelState(channelUID, newState);
190         }
191         receivedOrTimeout();
192     }
193
194     /**
195      * Returns the state topic. Might be an empty string if this is a stateless channel (TRIGGER kind channel).
196      */
197     public String getStateTopic() {
198         return config.stateTopic;
199     }
200
201     /**
202      * Return the command topic. Might be an empty string, if this is a read-only channel.
203      */
204     public String getCommandTopic() {
205         return config.commandTopic;
206     }
207
208     /**
209      * Returns the channelType ID which also happens to be an item-type
210      */
211     public String getItemType() {
212         return cachedValue.getItemType();
213     }
214
215     /**
216      * Returns true if this is a stateful channel.
217      */
218     public boolean isStateful() {
219         return config.retained;
220     }
221
222     /**
223      * Removes the subscription to the state topic and resets the channelStateUpdateListener.
224      *
225      * @return A future that completes with true if unsubscribing from the state topic succeeded.
226      *         It completes with false if no connection is established and completes exceptionally otherwise.
227      */
228     public CompletableFuture<@Nullable Void> stop() {
229         final MqttBrokerConnection connection = this.connection;
230         if (connection != null && !config.stateTopic.isBlank()) {
231             return connection.unsubscribe(config.stateTopic, this).thenRun(this::internalStop);
232         } else {
233             internalStop();
234             return CompletableFuture.completedFuture(null);
235         }
236     }
237
238     private void internalStop() {
239         logger.debug("Unsubscribed channel {} from topic: {}", this.channelUID, config.stateTopic);
240         this.connection = null;
241         this.channelStateUpdateListener = null;
242         hasSubscribed = false;
243         cachedValue.resetState();
244     }
245
246     private void receivedOrTimeout() {
247         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
248         if (scheduledFuture != null) { // Cancel timeout
249             scheduledFuture.cancel(false);
250             this.scheduledFuture = null;
251         }
252         future.complete(null);
253     }
254
255     private @Nullable Void subscribeFail(Throwable e) {
256         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
257         if (scheduledFuture != null) { // Cancel timeout
258             scheduledFuture.cancel(false);
259             this.scheduledFuture = null;
260         }
261         future.completeExceptionally(e);
262         return null;
263     }
264
265     /**
266      * Subscribes to the state topic on the given connection and informs about updates on the given listener.
267      *
268      * @param connection A broker connection
269      * @param scheduler A scheduler to realize the timeout
270      * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
271      * @return A future that completes with true if the subscribing worked, with false if the stateTopic is not set
272      *         and exceptionally otherwise.
273      */
274     public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
275             int timeout) {
276         synchronized (futureLock) {
277             // if the connection is still the same, the subscription is still present, otherwise we need to renew
278             if ((hasSubscribed || !future.isDone()) && connection.equals(this.connection)) {
279                 return future;
280             }
281             hasSubscribed = false;
282
283             this.connection = connection;
284
285             if (config.stateTopic.isBlank()) {
286                 return CompletableFuture.completedFuture(null);
287             }
288
289             this.future = new CompletableFuture<>();
290         }
291         connection.subscribe(config.stateTopic, this).thenRun(() -> {
292             hasSubscribed = true;
293             logger.debug("Subscribed channel {} to topic: {}", this.channelUID, config.stateTopic);
294             if (timeout > 0 && !future.isDone()) {
295                 this.scheduledFuture = scheduler.schedule(this::receivedOrTimeout, timeout, TimeUnit.MILLISECONDS);
296             } else {
297                 receivedOrTimeout();
298             }
299         }).exceptionally(this::subscribeFail);
300         return future;
301     }
302
303     /**
304      * Return true if this channel has subscribed to its MQTT topics.
305      * You need to call {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} and
306      * have a stateTopic set, to subscribe this channel.
307      */
308     public boolean hasSubscribed() {
309         return this.hasSubscribed;
310     }
311
312     /**
313      * Publishes a value on MQTT. A command topic needs to be set in the configuration.
314      *
315      * @param command The command to send
316      * @return A future that completes with true if the publishing worked and false if it is a readonly topic
317      *         and exceptionally otherwise.
318      */
319     public CompletableFuture<Boolean> publishValue(Command command) {
320         final MqttBrokerConnection connection = this.connection;
321
322         if (connection == null) {
323             CompletableFuture<Boolean> f = new CompletableFuture<>();
324             f.completeExceptionally(new IllegalStateException(
325                     "The connection object has not been set. start() should have been called!"));
326             return f;
327         }
328
329         Command mqttCommandValue = cachedValue.parseCommand(command);
330         Value mqttFormatter = cachedValue;
331
332         if (readOnly) {
333             logger.debug(
334                     "You have tried to publish {} to the mqtt topic '{}' that was marked read-only. You can't 'set' anything on a sensor state topic for example.",
335                     mqttCommandValue, config.commandTopic);
336             return CompletableFuture.completedFuture(false);
337         }
338
339         // Outgoing transformations
340         if (outgoingTransformation.isPresent()) {
341             Command cValue = mqttCommandValue;
342             // Only pass numeric value for QuantityType.
343             if (mqttCommandValue instanceof QuantityType<?> qtCommandValue) {
344                 cValue = new DecimalType(qtCommandValue.toBigDecimal());
345
346             }
347             String commandString = mqttFormatter.getMQTTpublishValue(cValue, "%s");
348             Optional<String> transformedValue = outgoingTransformation.apply(commandString);
349             if (transformedValue.isEmpty()) {
350                 logger.debug("Transformation '{}' returned null on '{}', discarding message", outgoingTransformation,
351                         commandString);
352                 return CompletableFuture.completedFuture(false);
353             }
354
355             mqttFormatter = new TextValue();
356             mqttCommandValue = new StringType(transformedValue.get());
357         }
358
359         String commandString;
360
361         // Formatter: Applied before the channel state value is published to the MQTT broker.
362         if (config.formatBeforePublish.length() > 0) {
363             try {
364                 Command cValue = mqttCommandValue;
365                 // Only pass numeric value for QuantityType of format pattern is %s.
366                 if ((mqttCommandValue instanceof QuantityType<?> qtCommandValue)
367                         && ("%s".equals(config.formatBeforePublish) || "%S".equals(config.formatBeforePublish))) {
368                     cValue = new DecimalType(qtCommandValue.toBigDecimal());
369                 }
370                 commandString = mqttFormatter.getMQTTpublishValue(cValue, config.formatBeforePublish);
371             } catch (IllegalFormatException e) {
372                 logger.debug("Format pattern incorrect for {}", channelUID, e);
373                 commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
374             }
375         } else {
376             commandString = mqttFormatter.getMQTTpublishValue(mqttCommandValue, null);
377         }
378
379         int qos = (config.qos != null) ? config.qos : connection.getQos();
380
381         String commandTopic;
382         if (command.equals(StopMoveType.STOP) && !config.stopCommandTopic.isEmpty()) {
383             commandTopic = config.stopCommandTopic;
384         } else {
385             commandTopic = config.commandTopic;
386         }
387         return connection.publish(commandTopic, commandString.getBytes(), qos, config.retained);
388     }
389
390     /**
391      * @return The channelStateUpdateListener
392      */
393     public @Nullable ChannelStateUpdateListener getChannelStateUpdateListener() {
394         return channelStateUpdateListener;
395     }
396
397     /**
398      * @param channelStateUpdateListener The channelStateUpdateListener to set
399      */
400     public void setChannelStateUpdateListener(ChannelStateUpdateListener channelStateUpdateListener) {
401         this.channelStateUpdateListener = channelStateUpdateListener;
402     }
403
404     public @Nullable MqttBrokerConnection getConnection() {
405         return connection;
406     }
407
408     /**
409      * This is for tests only to inject a broker connection. Use
410      * {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} instead.
411      *
412      * @param connection MQTT Broker connection
413      */
414     public void setConnection(MqttBrokerConnection connection) {
415         this.connection = connection;
416     }
417 }