]> git.basschouten.com Git - openhab-addons.git/blob
fb2a0213d2b41701012006df472cef1b0a723edf
[openhab-addons.git] /
1 /**
2  * Copyright (c) 2010-2022 Contributors to the openHAB project
3  *
4  * See the NOTICE file(s) distributed with this work for additional
5  * information.
6  *
7  * This program and the accompanying materials are made available under the
8  * terms of the Eclipse Public License 2.0 which is available at
9  * http://www.eclipse.org/legal/epl-2.0
10  *
11  * SPDX-License-Identifier: EPL-2.0
12  */
13 package org.openhab.binding.mqtt.generic;
14
15 import java.nio.charset.StandardCharsets;
16 import java.util.ArrayList;
17 import java.util.IllegalFormatException;
18 import java.util.List;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.ScheduledFuture;
22 import java.util.concurrent.TimeUnit;
23
24 import org.eclipse.jdt.annotation.NonNullByDefault;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.openhab.binding.mqtt.generic.values.TextValue;
27 import org.openhab.binding.mqtt.generic.values.Value;
28 import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
29 import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
30 import org.openhab.core.library.types.StringType;
31 import org.openhab.core.thing.ChannelUID;
32 import org.openhab.core.types.Command;
33 import org.openhab.core.types.TypeParser;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * This object consists of an {@link Value}, which is updated on the respective MQTT topic change.
39  * Updates to the value are propagated via the {@link ChannelStateUpdateListener}.
40  *
41  * @author David Graeff - Initial contribution
42  */
43 @NonNullByDefault
44 public class ChannelState implements MqttMessageSubscriber {
45     private final Logger logger = LoggerFactory.getLogger(ChannelState.class);
46
47     // Immutable channel configuration
48     protected final boolean readOnly;
49     protected final ChannelUID channelUID;
50     protected final ChannelConfig config;
51
52     /** Channel value **/
53     protected final Value cachedValue;
54
55     // Runtime variables
56     private @Nullable MqttBrokerConnection connection;
57     protected final List<ChannelStateTransformation> transformationsIn = new ArrayList<>();
58     protected final List<ChannelStateTransformation> transformationsOut = new ArrayList<>();
59     private @Nullable ChannelStateUpdateListener channelStateUpdateListener;
60     protected boolean hasSubscribed = false;
61     private @Nullable ScheduledFuture<?> scheduledFuture;
62     private CompletableFuture<@Nullable Void> future = CompletableFuture.completedFuture(null);
63     private final Object futureLock = new Object();
64
65     /**
66      * Creates a new channel state.
67      *
68      * @param config The channel configuration
69      * @param channelUID The channelUID is used for the {@link ChannelStateUpdateListener} to notify about value changes
70      * @param cachedValue MQTT only notifies us once about a value, during the subscribe. The channel state therefore
71      *            needs a cache for the current value.
72      * @param channelStateUpdateListener A channel state update listener
73      */
74     public ChannelState(ChannelConfig config, ChannelUID channelUID, Value cachedValue,
75             @Nullable ChannelStateUpdateListener channelStateUpdateListener) {
76         this.config = config;
77         this.channelStateUpdateListener = channelStateUpdateListener;
78         this.channelUID = channelUID;
79         this.cachedValue = cachedValue;
80         this.readOnly = config.commandTopic.isBlank();
81     }
82
83     public boolean isReadOnly() {
84         return this.readOnly;
85     }
86
87     /**
88      * Add a transformation that is applied for each received MQTT topic value.
89      * The transformations are executed in order.
90      *
91      * @param transformation A transformation
92      */
93     public void addTransformation(ChannelStateTransformation transformation) {
94         transformationsIn.add(transformation);
95     }
96
97     /**
98      * Add a transformation that is applied for each value to be published.
99      * The transformations are executed in order.
100      *
101      * @param transformation A transformation
102      */
103     public void addTransformationOut(ChannelStateTransformation transformation) {
104         transformationsOut.add(transformation);
105     }
106
107     /**
108      * Clear transformations
109      */
110     public void clearTransformations() {
111         transformationsIn.clear();
112         transformationsOut.clear();
113     }
114
115     /**
116      * Returns the cached value state object of this message subscriber.
117      * <p>
118      * MQTT only notifies us once about a value, during the subscribe.
119      * The channel state therefore needs a cache for the current value.
120      * If MQTT has not yet published a value, the cache might still be in UNDEF state.
121      * </p>
122      */
123     public Value getCache() {
124         return cachedValue;
125     }
126
127     /**
128      * Return the channelUID
129      */
130     public ChannelUID channelUID() {
131         return channelUID;
132     }
133
134     /**
135      * Incoming message from the MqttBrokerConnection
136      *
137      * @param topic The topic. Is the same as the field stateTopic.
138      * @param payload The byte payload. Must be UTF8 encoded text or binary data.
139      */
140     @Override
141     public void processMessage(String topic, byte[] payload) {
142         final ChannelStateUpdateListener channelStateUpdateListener = this.channelStateUpdateListener;
143         if (channelStateUpdateListener == null) {
144             logger.warn("MQTT message received for topic {}, but MessageSubscriber object hasn't been started!", topic);
145             return;
146         }
147
148         if (cachedValue.isBinary()) {
149             cachedValue.update(payload);
150             channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
151             receivedOrTimeout();
152             return;
153         }
154
155         // String value: Apply transformations
156         String strValue = new String(payload, StandardCharsets.UTF_8);
157         for (ChannelStateTransformation t : transformationsIn) {
158             String transformedValue = t.processValue(strValue);
159             if (transformedValue != null) {
160                 strValue = transformedValue;
161             } else {
162                 logger.debug("Transformation '{}' returned null on '{}', discarding message", strValue, t.serviceName);
163                 receivedOrTimeout();
164                 return;
165             }
166         }
167
168         // Is trigger?: Special handling
169         if (config.trigger) {
170             channelStateUpdateListener.triggerChannel(channelUID, strValue);
171             receivedOrTimeout();
172             return;
173         }
174
175         Command command = TypeParser.parseCommand(cachedValue.getSupportedCommandTypes(), strValue);
176         if (command == null) {
177             logger.warn("Incoming payload '{}' not supported by type '{}'", strValue,
178                     cachedValue.getClass().getSimpleName());
179             receivedOrTimeout();
180             return;
181         }
182
183         Command postOnlyCommand = cachedValue.isPostOnly(command);
184         if (postOnlyCommand != null) {
185             channelStateUpdateListener.postChannelCommand(channelUID, postOnlyCommand);
186             receivedOrTimeout();
187             return;
188         }
189
190         // Map the string to a command, update the cached value and post the command to the framework
191         try {
192             cachedValue.update(command);
193         } catch (IllegalArgumentException | IllegalStateException e) {
194             logger.warn("Command '{}' from channel '{}' not supported by type '{}': {}", strValue, channelUID,
195                     cachedValue.getClass().getSimpleName(), e.getMessage());
196             receivedOrTimeout();
197             return;
198         }
199
200         if (config.postCommand) {
201             channelStateUpdateListener.postChannelCommand(channelUID, (Command) cachedValue.getChannelState());
202         } else {
203             channelStateUpdateListener.updateChannelState(channelUID, cachedValue.getChannelState());
204         }
205         receivedOrTimeout();
206     }
207
208     /**
209      * Returns the state topic. Might be an empty string if this is a stateless channel (TRIGGER kind channel).
210      */
211     public String getStateTopic() {
212         return config.stateTopic;
213     }
214
215     /**
216      * Return the command topic. Might be an empty string, if this is a read-only channel.
217      */
218     public String getCommandTopic() {
219         return config.commandTopic;
220     }
221
222     /**
223      * Returns the channelType ID which also happens to be an item-type
224      */
225     public String getItemType() {
226         return cachedValue.getItemType();
227     }
228
229     /**
230      * Returns true if this is a stateful channel.
231      */
232     public boolean isStateful() {
233         return config.retained;
234     }
235
236     /**
237      * Removes the subscription to the state topic and resets the channelStateUpdateListener.
238      *
239      * @return A future that completes with true if unsubscribing from the state topic succeeded.
240      *         It completes with false if no connection is established and completes exceptionally otherwise.
241      */
242     public CompletableFuture<@Nullable Void> stop() {
243         final MqttBrokerConnection connection = this.connection;
244         if (connection != null && !config.stateTopic.isBlank()) {
245             return connection.unsubscribe(config.stateTopic, this).thenRun(this::internalStop);
246         } else {
247             internalStop();
248             return CompletableFuture.completedFuture(null);
249         }
250     }
251
252     private void internalStop() {
253         logger.debug("Unsubscribed channel {} form topic: {}", this.channelUID, config.stateTopic);
254         this.connection = null;
255         this.channelStateUpdateListener = null;
256         hasSubscribed = false;
257         cachedValue.resetState();
258     }
259
260     private void receivedOrTimeout() {
261         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
262         if (scheduledFuture != null) { // Cancel timeout
263             scheduledFuture.cancel(false);
264             this.scheduledFuture = null;
265         }
266         future.complete(null);
267     }
268
269     private @Nullable Void subscribeFail(Throwable e) {
270         final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
271         if (scheduledFuture != null) { // Cancel timeout
272             scheduledFuture.cancel(false);
273             this.scheduledFuture = null;
274         }
275         future.completeExceptionally(e);
276         return null;
277     }
278
279     /**
280      * Subscribes to the state topic on the given connection and informs about updates on the given listener.
281      *
282      * @param connection A broker connection
283      * @param scheduler A scheduler to realize the timeout
284      * @param timeout A timeout in milliseconds. Can be 0 to disable the timeout and let the future return earlier.
285      * @return A future that completes with true if the subscribing worked, with false if the stateTopic is not set
286      *         and exceptionally otherwise.
287      */
288     public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
289             int timeout) {
290         synchronized (futureLock) {
291             // if the connection is still the same, the subscription is still present, otherwise we need to renew
292             if ((hasSubscribed || !future.isDone()) && connection.equals(this.connection)) {
293                 return future;
294             }
295             hasSubscribed = false;
296
297             this.connection = connection;
298
299             if (config.stateTopic.isBlank()) {
300                 return CompletableFuture.completedFuture(null);
301             }
302
303             this.future = new CompletableFuture<>();
304         }
305         connection.subscribe(config.stateTopic, this).thenRun(() -> {
306             hasSubscribed = true;
307             logger.debug("Subscribed channel {} to topic: {}", this.channelUID, config.stateTopic);
308             if (timeout > 0 && !future.isDone()) {
309                 this.scheduledFuture = scheduler.schedule(this::receivedOrTimeout, timeout, TimeUnit.MILLISECONDS);
310             } else {
311                 receivedOrTimeout();
312             }
313         }).exceptionally(this::subscribeFail);
314         return future;
315     }
316
317     /**
318      * Return true if this channel has subscribed to its MQTT topics.
319      * You need to call {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} and
320      * have a stateTopic set, to subscribe this channel.
321      */
322     public boolean hasSubscribed() {
323         return this.hasSubscribed;
324     }
325
326     /**
327      * Publishes a value on MQTT. A command topic needs to be set in the configuration.
328      *
329      * @param command The command to send
330      * @return A future that completes with true if the publishing worked and false if it is a readonly topic
331      *         and exceptionally otherwise.
332      */
333     public CompletableFuture<Boolean> publishValue(Command command) {
334         cachedValue.update(command);
335
336         Value mqttCommandValue = cachedValue;
337
338         final MqttBrokerConnection connection = this.connection;
339
340         if (connection == null) {
341             CompletableFuture<Boolean> f = new CompletableFuture<>();
342             f.completeExceptionally(new IllegalStateException(
343                     "The connection object has not been set. start() should have been called!"));
344             return f;
345         }
346
347         if (readOnly) {
348             logger.debug(
349                     "You have tried to publish {} to the mqtt topic '{}' that was marked read-only. You can't 'set' anything on a sensor state topic for example.",
350                     mqttCommandValue, config.commandTopic);
351             return CompletableFuture.completedFuture(false);
352         }
353
354         // Outgoing transformations
355         for (ChannelStateTransformation t : transformationsOut) {
356             String commandString = mqttCommandValue.getMQTTpublishValue(null);
357             String transformedValue = t.processValue(commandString);
358             if (transformedValue != null) {
359                 Value textValue = new TextValue();
360                 textValue.update(new StringType(transformedValue));
361                 mqttCommandValue = textValue;
362             } else {
363                 logger.debug("Transformation '{}' returned null on '{}', discarding message", mqttCommandValue,
364                         t.serviceName);
365                 return CompletableFuture.completedFuture(false);
366             }
367         }
368
369         String commandString;
370
371         // Formatter: Applied before the channel state value is published to the MQTT broker.
372         if (config.formatBeforePublish.length() > 0) {
373             try {
374                 commandString = mqttCommandValue.getMQTTpublishValue(config.formatBeforePublish);
375             } catch (IllegalFormatException e) {
376                 logger.debug("Format pattern incorrect for {}", channelUID, e);
377                 commandString = mqttCommandValue.getMQTTpublishValue(null);
378             }
379         } else {
380             commandString = mqttCommandValue.getMQTTpublishValue(null);
381         }
382
383         int qos = (config.qos != null) ? config.qos : connection.getQos();
384
385         return connection.publish(config.commandTopic, commandString.getBytes(), qos, config.retained);
386     }
387
388     /**
389      * @return The channelStateUpdateListener
390      */
391     public @Nullable ChannelStateUpdateListener getChannelStateUpdateListener() {
392         return channelStateUpdateListener;
393     }
394
395     /**
396      * @param channelStateUpdateListener The channelStateUpdateListener to set
397      */
398     public void setChannelStateUpdateListener(ChannelStateUpdateListener channelStateUpdateListener) {
399         this.channelStateUpdateListener = channelStateUpdateListener;
400     }
401
402     public @Nullable MqttBrokerConnection getConnection() {
403         return connection;
404     }
405
406     /**
407      * This is for tests only to inject a broker connection. Use
408      * {@link #start(MqttBrokerConnection, ScheduledExecutorService, int)} instead.
409      *
410      * @param connection MQTT Broker connection
411      */
412     public void setConnection(MqttBrokerConnection connection) {
413         this.connection = connection;
414     }
415 }